Skip to main content

Documentation Index

Fetch the complete documentation index at: https://crsdk.app/llms.txt

Use this file to discover all available pages before exploring further.

Subscribe to real-time camera events: photo taken, property changed, autofocus status, transfer progress, etc. The server exposes GET /api/events (all cameras) and GET /api/cameras/{id}/events (single camera) as Server-Sent Events streams.

When to use

  • You need to react when a photo is captured (downloadComplete) instead of polling
  • You want progress feedback during SD card downloads (transferProgress)
  • You’re building a dashboard that reflects property changes immediately
  • You need to know when a camera is unplugged mid-session (disconnected)

When NOT to use

  • One-shot scripts that take a photo and exit — just call the endpoint, disconnect, done
  • Polling a single property rarely — a periodic GET is simpler

Event catalog

EventPayload
connected{cameraId} — fires once on SSE handshake AND on camera connect (discriminate by context)
disconnected{cameraId, reason?}
propertyChanged{cameraId, codes: number[]} — refresh affected props after
afStatus{state: "focused" | "unlocked" | "tracking"}
downloadComplete{cameraId, filename, savedPath} — auto-transfer landed
transferProgress{cameraId, contentId, fileId, percent, savedPath?} — explicit pull in progress
contentsTransfer{cameraId, files: [...]} — contents-mode file list updates
operationResult{cameraId, operation, result}
error{cameraId, code, message}
Keepalive comments (: keepalive) fire every 30s — ignore them. The dual connected event quirk: connected fires immediately when the SSE channel opens (subscription handshake) AND when a real camera’s OnConnected SDK callback fires. Discriminate by payload shape or ignore the first one after opening.

TypeScript

The real Next.js example app uses a small browser-native EventSource wrapper in:
  • example_app/src/lib/event-stream.ts

Complete recipe

type SSEEventType =
  | "connected"
  | "disconnected"
  | "propertyChanged"
  | "warning"
  | "afStatus"
  | "downloadComplete"
  | "transferProgress"
  | "error"
  | "close";

type EventCallback<T = unknown> = (data: T) => void;

export class EventStream {
  private eventSource: EventSource | null = null;
  private reconnectTimeout: ReturnType<typeof setTimeout> | null = null;
  private listeners = new Map<SSEEventType, Set<EventCallback>>();
  private callbackMap = new Map<EventCallback, EventCallback>();
  private nativeListenerMap = new Map<EventCallback, EventListener>();

  constructor(
    private readonly baseUrl: string,
    private readonly cameraId?: string,
  ) {}

  connect(): void {
    if (this.eventSource) return;

    const url = this.cameraId
      ? `${this.baseUrl}/api/cameras/${this.cameraId}/events`
      : `${this.baseUrl}/api/events`;

    this.eventSource = new EventSource(url);

    this.eventSource.onerror = () => {
      this.eventSource?.close();
      this.eventSource = null;
      this.emit("error", undefined);
      this.reconnectTimeout = setTimeout(() => this.connect(), 3000);
    };

    for (const [event, callbacks] of this.listeners) {
      for (const callback of callbacks) {
        this.addNativeListener(event, callback);
      }
    }
  }

  on<T = unknown>(event: SSEEventType, callback: EventCallback<T>): this {
    const wrapped = (data: unknown) => callback(data as T);
    this.callbackMap.set(callback as EventCallback, wrapped);

    if (!this.listeners.has(event)) {
      this.listeners.set(event, new Set());
    }
    this.listeners.get(event)?.add(wrapped);

    if (this.eventSource) {
      this.addNativeListener(event, wrapped);
    } else {
      this.connect();
    }
    return this;
  }

  off<T = unknown>(event: SSEEventType, callback?: EventCallback<T>): this {
    if (!callback) {
      this.listeners.delete(event);
      return this;
    }

    const wrapped = this.callbackMap.get(callback as EventCallback);
    if (!wrapped) return this;

    this.listeners.get(event)?.delete(wrapped);
    this.callbackMap.delete(callback as EventCallback);

    const native = this.nativeListenerMap.get(wrapped);
    if (native && this.eventSource && event !== "error" && event !== "close") {
      this.eventSource.removeEventListener(event, native);
    }
    if (native) {
      this.nativeListenerMap.delete(wrapped);
    }
    return this;
  }

  close(): void {
    if (this.reconnectTimeout) {
      clearTimeout(this.reconnectTimeout);
      this.reconnectTimeout = null;
    }
    this.eventSource?.close();
    this.eventSource = null;
    this.nativeListenerMap.clear();
    this.emit("close", undefined);
    this.listeners.clear();
  }

  private emit(event: SSEEventType, data: unknown): void {
    const callbacks = this.listeners.get(event);
    if (!callbacks) return;
    for (const callback of callbacks) {
      try {
        callback(data);
      } catch {}
    }
  }

  private addNativeListener(event: SSEEventType, callback: EventCallback): void {
    if (!this.eventSource || event === "error" || event === "close") return;

    const nativeListener: EventListener = (rawEvent: Event) => {
      const messageEvent = rawEvent as MessageEvent<string>;
      try {
        callback(JSON.parse(messageEvent.data));
      } catch {
        callback(messageEvent.data);
      }
    };

    this.nativeListenerMap.set(callback, nativeListener);
    this.eventSource.addEventListener(event, nativeListener);
  }
}

Usage

const stream = new EventStream("http://localhost:8080", "camera-id");

stream.on("transferProgress", (data: { percent?: number }) => {
  console.log("Transfer:", data.percent);
});

stream.on("downloadComplete", (data: { savedPath?: string }) => {
  console.log("Saved:", data.savedPath);
});

stream.on("disconnected", () => {
  console.log("Camera disconnected");
});

stream.connect();

// Later:
stream.close();
This is the pattern the real browser app uses. It keeps SSE logic small and lets the lifecycle layer decide which events matter.

Python

Complete recipe

# sse.py — drop into your project, adapt as needed.

from dataclasses import dataclass
from typing import AsyncGenerator, Optional
import asyncio
import json

import httpx


@dataclass
class CameraEvent:
    """Camera event from the SSE stream."""
    event: str
    data: dict


async def stream_camera_events(
    base_url: str,
    camera_id: Optional[str] = None,
    cancel: Optional[asyncio.Event] = None,
) -> AsyncGenerator[CameraEvent, None]:
    """
    Async generator yielding typed camera events.

    Auto-reconnects with exponential backoff on network errors.
    Set the `cancel` event to stop cleanly.

    Args:
        base_url: e.g. "http://localhost:8080"
        camera_id: omit for all-camera stream; pass ID for single-camera stream
        cancel: optional asyncio.Event — set it to break the generator
    """
    url = (
        f"{base_url}/api/cameras/{camera_id}/events"
        if camera_id
        else f"{base_url}/api/events"
    )

    reconnect_delay = 0.5
    initial_connected_seen = False

    async with httpx.AsyncClient(timeout=None) as client:
        while not (cancel and cancel.is_set()):
            try:
                async with client.stream("GET", url) as response:
                    response.raise_for_status()
                    reconnect_delay = 0.5  # reset on success
                    buf = ""

                    async for chunk in response.aiter_text():
                        if cancel and cancel.is_set():
                            return
                        buf += chunk

                        # SSE events separated by "\n\n"
                        while "\n\n" in buf:
                            raw, buf = buf.split("\n\n", 1)

                            # Keepalive lines start with ":"
                            if raw.startswith(":"):
                                continue

                            parsed = {}
                            for line in raw.split("\n"):
                                if ": " not in line:
                                    continue
                                key, _, value = line.partition(": ")
                                parsed[key] = value

                            event_name = parsed.get("event")
                            if not event_name:
                                continue

                            # Skip the handshake "connected" — not a real camera event
                            if event_name == "connected" and not initial_connected_seen:
                                initial_connected_seen = True
                                continue

                            try:
                                data = json.loads(parsed.get("data") or "null")
                                yield CameraEvent(event=event_name, data=data)
                            except json.JSONDecodeError:
                                # Malformed payload — skip, don't kill the stream
                                pass

            except (httpx.HTTPError, httpx.ReadError):
                if cancel and cancel.is_set():
                    return
                await asyncio.sleep(reconnect_delay)
                reconnect_delay = min(reconnect_delay * 2, 10.0)  # cap at 10s

Usage

import asyncio
from sse import stream_camera_events

async def main():
    cancel = asyncio.Event()

    async def consumer():
        async for evt in stream_camera_events("http://localhost:8080", cancel=cancel):
            if evt.event == "downloadComplete":
                print(f"Saved: {evt.data['savedPath']}")
            elif evt.event == "transferProgress":
                print(f"{evt.data['percent']}% — {evt.data['contentId']}/{evt.data['fileId']}")
            elif evt.event == "propertyChanged":
                print(f"Properties changed: {evt.data['codes']}")
            elif evt.event == "afStatus":
                print(f"Autofocus: {evt.data['state']}")

    task = asyncio.create_task(consumer())

    # later...
    await asyncio.sleep(30)
    cancel.set()
    await task

asyncio.run(main())

Typed event variants

For stricter type safety, replace the dict payload with Pydantic models per event:
from pydantic import BaseModel

class DownloadComplete(BaseModel):
    cameraId: str
    filename: str
    savedPath: str

# In the generator:
if event_name == "downloadComplete":
    yield DownloadComplete(**data)
This gives you isinstance checks + autocomplete in consumers. Optional — the plain dict works too.

Swift

Complete recipe

// SSEClient.swift — the pattern used in example_swift_app.
// macOS/iOS/tvOS/watchOS compatible, but especially important on macOS:
// avoid URLSession.bytes(...).lines for SSE because sparse streams can buffer.

import Foundation

struct SSEClient {
    let baseURL: String
    let cameraId: String
    let onEvent: (@Sendable (_ type: String, _ payload: String) -> Void)?

    init(
        baseURL: String,
        cameraId: String,
        onEvent: (@Sendable (_ type: String, _ payload: String) -> Void)? = nil
    ) {
        self.baseURL = baseURL
        self.cameraId = cameraId
        self.onEvent = onEvent
    }

    struct ConnectedPayload {
        let model: String
        let id: String
    }

    func waitForConnected(timeoutSeconds: Double = 60) async -> ConnectedPayload? {
        let targetId = cameraId
        let payload = await waitForEvent(timeoutSeconds: timeoutSeconds) { name, payload in
            guard name == "connected" else { return false }
            guard let bytes = payload.data(using: .utf8),
                  let obj = try? JSONSerialization.jsonObject(with: bytes) as? [String: Any]
            else { return false }
            // IMPORTANT: the transport itself emits `connected` as a handshake.
            // The real SDK callback includes `model` + `id`; the handshake does not.
            guard obj["model"] != nil else { return false }
            return (obj["id"] as? String) == targetId
        }
        guard let payload,
              let bytes = payload.data(using: .utf8),
              let obj = try? JSONSerialization.jsonObject(with: bytes) as? [String: Any],
              let model = obj["model"] as? String,
              let id = obj["id"] as? String
        else { return nil }
        return ConnectedPayload(model: model, id: id)
    }

    func waitForDisconnected(timeoutSeconds: Double = 86_400) async -> Bool {
        let payload = await waitForEvent(timeoutSeconds: timeoutSeconds) { name, _ in
            name == "disconnected"
        }
        return payload != nil
    }

    func waitForTransferComplete(timeoutSeconds: Double = 120) async -> Bool {
        let payload = await waitForEvent(timeoutSeconds: timeoutSeconds) { name, payload in
            guard name == "transferProgress" else { return false }
            guard let bytes = payload.data(using: .utf8),
                  let obj = try? JSONSerialization.jsonObject(with: bytes) as? [String: Any],
                  let pct = obj["percent"] as? Int
            else { return false }
            // Accept both real and synthetic completion events.
            return pct >= 100
        }
        return payload != nil
    }

    private func waitForEvent(
        timeoutSeconds: Double,
        match: @escaping @Sendable (_ event: String, _ payload: String) -> Bool
    ) async -> String? {
        guard let url = URL(string: "\(baseURL)/api/cameras/\(cameraId)/events") else {
            return nil
        }

        return await withCheckedContinuation { continuation in
            let delegate = SSEDelegate(onEvent: onEvent, match: match) { matchedPayload in
                continuation.resume(returning: matchedPayload)
            }
            let config = URLSessionConfiguration.default
            config.requestCachePolicy = .reloadIgnoringLocalAndRemoteCacheData
            config.urlCache = nil
            config.timeoutIntervalForRequest = timeoutSeconds + 5
            config.timeoutIntervalForResource = timeoutSeconds + 5
            config.httpAdditionalHeaders = [
                "Accept": "text/event-stream",
                "Cache-Control": "no-cache"
            ]

            let session = URLSession(configuration: config, delegate: delegate, delegateQueue: nil)
            delegate.session = session
            delegate.deadline = Date().addingTimeInterval(timeoutSeconds)

            var req = URLRequest(url: url)
            req.timeoutInterval = timeoutSeconds + 5
            req.setValue("text/event-stream", forHTTPHeaderField: "Accept")
            req.setValue("no-cache", forHTTPHeaderField: "Cache-Control")

            let task = session.dataTask(with: req)
            delegate.task = task
            task.resume()

            DispatchQueue.global().asyncAfter(deadline: .now() + timeoutSeconds) {
                delegate.fireTimeout()
            }
        }
    }
}

private final class SSEDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {
    private let onEvent: (@Sendable (String, String) -> Void)?
    private let match: @Sendable (String, String) -> Bool
    private let onComplete: (String?) -> Void
    private let lock = NSLock()
    private var done = false
    private var buffer = Data()
    private var eventName: String?
    private var dataBuffer: String?

    var session: URLSession?
    var task: URLSessionDataTask?
    var deadline: Date = .distantFuture

    init(
        onEvent: (@Sendable (String, String) -> Void)?,
        match: @escaping @Sendable (String, String) -> Bool,
        onComplete: @escaping (String?) -> Void
    ) {
        self.onEvent = onEvent
        self.match = match
        self.onComplete = onComplete
    }

    func fireTimeout() { finish(nil) }

    private func finish(_ value: String?) {
        lock.lock()
        if done {
            lock.unlock()
            return
        }
        done = true
        let s = session
        let t = task
        lock.unlock()
        t?.cancel()
        s?.invalidateAndCancel()
        onComplete(value)
    }

    func urlSession(
        _ session: URLSession,
        dataTask: URLSessionDataTask,
        didReceive response: URLResponse,
        completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
    ) {
        if let http = response as? HTTPURLResponse, http.statusCode != 200 {
            onEvent?("__http_error__", "status=\(http.statusCode)")
            completionHandler(.cancel)
            finish(nil)
            return
        }
        onEvent?("__connected__", "")
        completionHandler(.allow)
    }

    func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
        if Date() > deadline {
            finish(nil)
            return
        }

        lock.lock()
        buffer.append(data)
        var lines: [String] = []

        while let nl = buffer.firstIndex(of: 0x0a) {
            let lineData = buffer[..<nl]
            buffer.removeSubrange(...nl)
            let trimmed: Data = (lineData.last == 0x0d) ? lineData.dropLast() : lineData
            lines.append(String(data: trimmed, encoding: .utf8) ?? "")
        }
        lock.unlock()

        for line in lines {
            if line.isEmpty {
                let name = eventName ?? "message"
                let payload = dataBuffer ?? ""
                onEvent?(name, payload)
                if match(name, payload) {
                    finish(payload)
                    return
                }
                eventName = nil
                dataBuffer = nil
                continue
            }
            if line.hasPrefix(":") { continue }
            if line.hasPrefix("event:") {
                eventName = String(line.dropFirst("event:".count)).trimmingCharacters(in: .whitespaces)
            } else if line.hasPrefix("data:") {
                let piece = String(line.dropFirst("data:".count)).trimmingCharacters(in: .whitespaces)
                if let existing = dataBuffer {
                    dataBuffer = existing + "\n" + piece
                } else {
                    dataBuffer = piece
                }
            }
        }
    }

    func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
        if let error {
            onEvent?("__error__", error.localizedDescription)
        } else {
            onEvent?("__stream_closed__", "")
        }
        finish(nil)
    }
}

Usage

let sse = SSEClient(baseURL: "http://localhost:8080", cameraId: "<camera-id>")

// Fast-path connect confirmation
if let payload = await sse.waitForConnected(timeoutSeconds: 60) {
    print("Connected: \(payload.model) (\(payload.id))")
}

// Long-lived disconnect listener
let disconnectTask = Task {
    let dropped = await sse.waitForDisconnected()
    if dropped {
        print("Camera disconnected")
    }
}

// Explicit download completion
let transferDone = await sse.waitForTransferComplete(timeoutSeconds: 120)
print("Transfer finished: \(transferDone)")

disconnectTask.cancel()

Why this differs from a generic AsyncSequence SSE client

  • The real app does not keep a single catch-all event stream on the main actor.
  • It uses purpose-built waits:
    • waitForConnected() during connect
    • waitForDisconnected() for the lifetime of a connection
    • waitForTransferComplete() around each SD-card download
  • This keeps the state machine explicit and avoids flooding the UI with propertyChanged traffic.
  • Most importantly on Apple platforms, this recipe uses URLSessionDataDelegate because URLSession.bytes(...).lines was not reliable enough for sparse SSE traffic.

Common pitfalls

  • Don’t forget the keepalive skip. The server sends : keepalive\n\n every 30 seconds. If your parser treats these as events, you’ll get garbage data.
  • The first connected event is the handshake, not a camera connecting. All three recipes skip the first occurrence.
  • JSON parse failures shouldn’t kill the stream. Log and continue.
  • Reconnect without cap = infinite loop on bad config. All three recipes cap backoff at 10 seconds.
  • Browser EventSource vs Node fetch: native EventSource auto-reconnects; if you use the fetch variant in Node, the manual reconnect loop in this recipe handles it.

Verified against

test-app/sdk-comparison/debug-autotransfer.ts uses this exact fetch-streaming pattern to verify downloadComplete events fire during shooting. See that file for a working in-repo example.