// SSEClient.swift — the pattern used in example_swift_app.
// macOS/iOS/tvOS/watchOS compatible, but especially important on macOS:
// avoid URLSession.bytes(...).lines for SSE because sparse streams can buffer.
import Foundation
struct SSEClient {
let baseURL: String
let cameraId: String
let onEvent: (@Sendable (_ type: String, _ payload: String) -> Void)?
init(
baseURL: String,
cameraId: String,
onEvent: (@Sendable (_ type: String, _ payload: String) -> Void)? = nil
) {
self.baseURL = baseURL
self.cameraId = cameraId
self.onEvent = onEvent
}
struct ConnectedPayload {
let model: String
let id: String
}
func waitForConnected(timeoutSeconds: Double = 60) async -> ConnectedPayload? {
let targetId = cameraId
let payload = await waitForEvent(timeoutSeconds: timeoutSeconds) { name, payload in
guard name == "connected" else { return false }
guard let bytes = payload.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: bytes) as? [String: Any]
else { return false }
// IMPORTANT: the transport itself emits `connected` as a handshake.
// The real SDK callback includes `model` + `id`; the handshake does not.
guard obj["model"] != nil else { return false }
return (obj["id"] as? String) == targetId
}
guard let payload,
let bytes = payload.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: bytes) as? [String: Any],
let model = obj["model"] as? String,
let id = obj["id"] as? String
else { return nil }
return ConnectedPayload(model: model, id: id)
}
func waitForDisconnected(timeoutSeconds: Double = 86_400) async -> Bool {
let payload = await waitForEvent(timeoutSeconds: timeoutSeconds) { name, _ in
name == "disconnected"
}
return payload != nil
}
func waitForTransferComplete(timeoutSeconds: Double = 120) async -> Bool {
let payload = await waitForEvent(timeoutSeconds: timeoutSeconds) { name, payload in
guard name == "transferProgress" else { return false }
guard let bytes = payload.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: bytes) as? [String: Any],
let pct = obj["percent"] as? Int
else { return false }
// Accept both real and synthetic completion events.
return pct >= 100
}
return payload != nil
}
private func waitForEvent(
timeoutSeconds: Double,
match: @escaping @Sendable (_ event: String, _ payload: String) -> Bool
) async -> String? {
guard let url = URL(string: "\(baseURL)/api/cameras/\(cameraId)/events") else {
return nil
}
return await withCheckedContinuation { continuation in
let delegate = SSEDelegate(onEvent: onEvent, match: match) { matchedPayload in
continuation.resume(returning: matchedPayload)
}
let config = URLSessionConfiguration.default
config.requestCachePolicy = .reloadIgnoringLocalAndRemoteCacheData
config.urlCache = nil
config.timeoutIntervalForRequest = timeoutSeconds + 5
config.timeoutIntervalForResource = timeoutSeconds + 5
config.httpAdditionalHeaders = [
"Accept": "text/event-stream",
"Cache-Control": "no-cache"
]
let session = URLSession(configuration: config, delegate: delegate, delegateQueue: nil)
delegate.session = session
delegate.deadline = Date().addingTimeInterval(timeoutSeconds)
var req = URLRequest(url: url)
req.timeoutInterval = timeoutSeconds + 5
req.setValue("text/event-stream", forHTTPHeaderField: "Accept")
req.setValue("no-cache", forHTTPHeaderField: "Cache-Control")
let task = session.dataTask(with: req)
delegate.task = task
task.resume()
DispatchQueue.global().asyncAfter(deadline: .now() + timeoutSeconds) {
delegate.fireTimeout()
}
}
}
}
private final class SSEDelegate: NSObject, URLSessionDataDelegate, @unchecked Sendable {
private let onEvent: (@Sendable (String, String) -> Void)?
private let match: @Sendable (String, String) -> Bool
private let onComplete: (String?) -> Void
private let lock = NSLock()
private var done = false
private var buffer = Data()
private var eventName: String?
private var dataBuffer: String?
var session: URLSession?
var task: URLSessionDataTask?
var deadline: Date = .distantFuture
init(
onEvent: (@Sendable (String, String) -> Void)?,
match: @escaping @Sendable (String, String) -> Bool,
onComplete: @escaping (String?) -> Void
) {
self.onEvent = onEvent
self.match = match
self.onComplete = onComplete
}
func fireTimeout() { finish(nil) }
private func finish(_ value: String?) {
lock.lock()
if done {
lock.unlock()
return
}
done = true
let s = session
let t = task
lock.unlock()
t?.cancel()
s?.invalidateAndCancel()
onComplete(value)
}
func urlSession(
_ session: URLSession,
dataTask: URLSessionDataTask,
didReceive response: URLResponse,
completionHandler: @escaping (URLSession.ResponseDisposition) -> Void
) {
if let http = response as? HTTPURLResponse, http.statusCode != 200 {
onEvent?("__http_error__", "status=\(http.statusCode)")
completionHandler(.cancel)
finish(nil)
return
}
onEvent?("__connected__", "")
completionHandler(.allow)
}
func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
if Date() > deadline {
finish(nil)
return
}
lock.lock()
buffer.append(data)
var lines: [String] = []
while let nl = buffer.firstIndex(of: 0x0a) {
let lineData = buffer[..<nl]
buffer.removeSubrange(...nl)
let trimmed: Data = (lineData.last == 0x0d) ? lineData.dropLast() : lineData
lines.append(String(data: trimmed, encoding: .utf8) ?? "")
}
lock.unlock()
for line in lines {
if line.isEmpty {
let name = eventName ?? "message"
let payload = dataBuffer ?? ""
onEvent?(name, payload)
if match(name, payload) {
finish(payload)
return
}
eventName = nil
dataBuffer = nil
continue
}
if line.hasPrefix(":") { continue }
if line.hasPrefix("event:") {
eventName = String(line.dropFirst("event:".count)).trimmingCharacters(in: .whitespaces)
} else if line.hasPrefix("data:") {
let piece = String(line.dropFirst("data:".count)).trimmingCharacters(in: .whitespaces)
if let existing = dataBuffer {
dataBuffer = existing + "\n" + piece
} else {
dataBuffer = piece
}
}
}
}
func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
if let error {
onEvent?("__error__", error.localizedDescription)
} else {
onEvent?("__stream_closed__", "")
}
finish(nil)
}
}