// SessionConnection.swift // IC-2.1 — one WebSocket connection per session. // // Lifecycle: // init → resume(from:) → [stream data] → suspend() → resume(from:) → … // // URL pattern: ws://:/sessions//stream?token= // (TLS pinning is wired in a follow-up task; plain `ws://` for now.) import Combine import Foundation // MARK: - SessionConnection /// Manages a single IC-1 WebSocket session stream. /// /// Conforms to `ObservableObject` so SwiftUI views can react to /// `connectionState` changes without manually subscribing to Combine. /// /// All mutable state is main-actor-isolated. Callers on background contexts /// must dispatch accordingly (normal for `@MainActor` types). @MainActor public final class SessionConnection: ObservableObject { // MARK: - Identity /// The session identifier used in the URL path and scrollback file name. public let id: String // MARK: - Combine publishers /// Emits raw ANSI bytes in arrival order (binary frames, header stripped). public let stream = PassthroughSubject() /// Emits every JSON frame received from the server. public let stateEvents = PassthroughSubject() /// Emits snapshot content ready to feed directly into SwiftTerm: /// ESC[H + ESC[2J (clear+home) followed by the pane's current content. public let snapshots = PassthroughSubject() /// Tracks the WebSocket lifecycle. @Published public private(set) var connectionState: ConnectionState = .disconnected // MARK: - T-2.10 lifecycle flags /// True while the stream is gated during a reconnect window — /// i.e. after `resume(from: nonNil)` until the first delta or snapshot lands. public private(set) var isStreamFrozen = false /// True when the foreground keep-alive heartbeat task is running. public var isKeepAliveActive: Bool { keepAliveTask != nil } // MARK: - Internal test/UI-test hook #if DEBUG /// Set to `true` in UI-test stub mode to bypass the real WebSocket and /// drive connection-state transitions in-process. internal var stubMode = false #endif // MARK: - Scrollback /// Persistent rolling ANSI cache for this session. public private(set) var scrollback: ScrollbackCache // MARK: - Private private let credential: SidecarCredential private let cursor: ResumeCursor private var client: WebSocketClient? private var cancellables = Set() private var keepAliveTask: Task? // MARK: - Init /// Creates a `SessionConnection` for `id` authenticated with `credential`. /// /// Does **not** open a WebSocket. Call `resume(from:)` to connect. /// /// - Parameter cursor: Injected `ResumeCursor` (defaults to the shared /// UserDefaults-backed instance; override in tests for isolation). init(id: String, credential: SidecarCredential, cursor: ResumeCursor = ResumeCursor()) { self.id = id self.credential = credential self.cursor = cursor self.scrollback = ScrollbackCache(sessionId: id) } // MARK: - Public API /// Opens (or re-opens) the WebSocket and sends a `resume` frame. /// /// - Parameter lastSeq: The last acknowledged sequence number, or `nil` /// to request replay from the beginning. public func resume(from lastSeq: UInt64?) async { // Tear down any existing connection cleanly before reconnecting. await suspend() // T-2.10: freeze stream if this is a reconnect (lastSeq != nil). isStreamFrozen = (lastSeq != nil) // T-2.10: start foreground-only keep-alive heartbeat. keepAliveTask = Task { while !Task.isCancelled { do { try await Task.sleep(nanoseconds: 30_000_000_000) } // 30 s catch { break } } } #if DEBUG // T-2.10: stub mode — drive states in-process without a real WebSocket. if stubMode { connectionState = .connecting Task { @MainActor [weak self] in // 800 ms: long enough for XCUI to detect "Reconnecting…", // short enough to recover within the 2 s test window. try? await Task.sleep(nanoseconds: 800_000_000) guard let self, self.keepAliveTask != nil else { return } // suspended? self.isStreamFrozen = false self.connectionState = .connected } return } #endif guard let url = streamURL else { #if DEBUG print("[SessionConnection] Could not construct stream URL for session \(id) — aborting resume.") #endif keepAliveTask?.cancel() keepAliveTask = nil return } let ws = WebSocketClient() client = ws // Mirror WebSocketClient's connection state into our @Published property. ws.connectionState .receive(on: DispatchQueue.main) .sink { [weak self] state in self?.connectionState = state } .store(in: &cancellables) // Binary frames → scrollback + cursor + downstream `stream` subject. ws.incomingBinary .sink { [weak self] frame in guard let self else { return } self.handleBinaryFrame(frame) } .store(in: &cancellables) // JSON frames → `stateEvents` + snapshot converter. ws.incomingJSON .sink { [weak self] frame in guard let self else { return } if case .snapshot(_, let base64) = frame { self.isStreamFrozen = false // T-2.10: snapshot clears freeze // Decode base64 → text, prepend clear+home, normalise line endings. if let raw = Data(base64Encoded: base64), let text = String(data: raw, encoding: .utf8) { let header = "\u{1B}[H\u{1B}[2J" // cursor home + clear screen let body = text.replacingOccurrences(of: "\n", with: "\r\n") self.snapshots.send(Data((header + body).utf8)) } } self.stateEvents.send(frame) } .store(in: &cancellables) // Once connected, send the appropriate opening frame. // // • lastSeq == nil → fresh attach: live output already flows; caller // will send resize then snapshotRequest. No frame sent here. // • lastSeq != nil → reconnect after gap: replay missed output. ws.connectionState .filter { $0 == .connected } .first() .sink { [weak self, weak ws, lastSeq] _ in guard let self, let ws else { return } if let seq = lastSeq { Task { @MainActor [self, ws, seq] in try? await ws.send(.resume(lastSeq: seq)) _ = self } } // fresh connect: caller drives resize → snapshotRequest } .store(in: &cancellables) ws.connect(url: url) } /// Sends a frame to the server. /// /// - Throws: `WebSocketClientError.notConnected` if there is no active /// socket, or `WebSocketClientError.encodingFailed` on serialisation /// failure. public func send(_ frame: ClientToServer) async throws { guard let client else { throw WebSocketClientError.notConnected } try await client.send(frame) } /// Closes the WebSocket but keeps local state (scrollback + cursor). public func suspend() async { // T-2.10: cancel keep-alive heartbeat and clear freeze flag keepAliveTask?.cancel() keepAliveTask = nil isStreamFrozen = false client?.disconnect() client = nil cancellables.removeAll() connectionState = .disconnected } // MARK: - Binary frame processing /// Central handler for incoming binary frames. /// /// Called from the `incomingBinary` sink (production) and from the /// `#if DEBUG` test helper below (unit tests). /// /// **CG-3 decision — informational, not blocking:** /// `isStreamFrozen` is cleared on the first binary frame and the frame IS /// forwarded to `stream`. In the IC-1 protocol the server only starts /// sending data after it has processed our `resume` frame, so there are no /// "stale" bytes that arrive while frozen — the first binary frame IS the /// first meaningful delta. Stream delivery is therefore never blocked; /// `isStreamFrozen` serves the status bar ("Reconnecting…") and the /// `isStreamFrozen` unit tests, not as a gate on `stream`. private func handleBinaryFrame(_ frame: BinaryFrame) { isStreamFrozen = false // T-2.10: first delta thaws freeze scrollback.append(frame.data) cursor.update(sessionId: id, seq: frame.seq) // B-1: persist seq stream.send(frame.data) } #if DEBUG /// Test-only: simulate a binary frame arriving via the full production /// code path (`handleBinaryFrame`), without needing a real WebSocket. /// /// This drives the same `cursor.update` + `stream.send` logic that the /// `incomingBinary` sink uses, so a regression in either is caught by /// both production and test paths. func _testOnly_receiveBinaryFrame(_ frame: BinaryFrame) { handleBinaryFrame(frame) } #endif // MARK: - URL construction /// Builds `ws://:/sessions//stream?token=`. /// /// Returns `nil` if `URLComponents` cannot produce a valid URL (should /// never happen in practice with well-formed credentials). /// /// Note: plain `ws://` is used for now; TLS + cert-pinning wired in /// the T-2.5 follow-up task. private var streamURL: URL? { var components = URLComponents() components.scheme = "ws" components.host = credential.host components.port = credential.port components.path = "/sessions/\(id)/stream" components.queryItems = [ URLQueryItem(name: "token", value: credential.bearerToken) ] return components.url } }