pi-remote-ios/Sources/Core/Sessions/SessionConnection.swift

275 lines
10 KiB
Swift

// SessionConnection.swift
// IC-2.1 one WebSocket connection per session.
//
// Lifecycle:
// init resume(from:) [stream data] suspend() resume(from:)
//
// URL pattern: ws://<host>:<port>/sessions/<id>/stream?token=<bearerToken>
// (TLS pinning is wired in a follow-up task; plain `ws://` for now.)
import Combine
import Foundation
// MARK: - SessionConnection
/// Manages a single IC-1 WebSocket session stream.
///
/// Conforms to `ObservableObject` so SwiftUI views can react to
/// `connectionState` changes without manually subscribing to Combine.
///
/// All mutable state is main-actor-isolated. Callers on background contexts
/// must dispatch accordingly (normal for `@MainActor` types).
@MainActor
public final class SessionConnection: ObservableObject {
// MARK: - Identity
/// The session identifier used in the URL path and scrollback file name.
public let id: String
// MARK: - Combine publishers
/// Emits raw ANSI bytes in arrival order (binary frames, header stripped).
public let stream = PassthroughSubject<Data, Never>()
/// Emits every JSON frame received from the server.
public let stateEvents = PassthroughSubject<ServerToClient, Never>()
/// Emits snapshot content ready to feed directly into SwiftTerm:
/// ESC[H + ESC[2J (clear+home) followed by the pane's current content.
public let snapshots = PassthroughSubject<Data, Never>()
/// Tracks the WebSocket lifecycle.
@Published public private(set) var connectionState: ConnectionState = .disconnected
// MARK: - T-2.10 lifecycle flags
/// True while the stream is gated during a reconnect window
/// i.e. after `resume(from: nonNil)` until the first delta or snapshot lands.
public private(set) var isStreamFrozen = false
/// True when the foreground keep-alive heartbeat task is running.
public var isKeepAliveActive: Bool { keepAliveTask != nil }
// MARK: - Internal test/UI-test hook
#if DEBUG
/// Set to `true` in UI-test stub mode to bypass the real WebSocket and
/// drive connection-state transitions in-process.
internal var stubMode = false
#endif
// MARK: - Scrollback
/// Persistent rolling ANSI cache for this session.
public private(set) var scrollback: ScrollbackCache
// MARK: - Private
private let credential: SidecarCredential
private let cursor: ResumeCursor
private var client: WebSocketClient?
private var cancellables = Set<AnyCancellable>()
private var keepAliveTask: Task<Void, Never>?
// MARK: - Init
/// Creates a `SessionConnection` for `id` authenticated with `credential`.
///
/// Does **not** open a WebSocket. Call `resume(from:)` to connect.
///
/// - Parameter cursor: Injected `ResumeCursor` (defaults to the shared
/// UserDefaults-backed instance; override in tests for isolation).
init(id: String, credential: SidecarCredential, cursor: ResumeCursor = ResumeCursor()) {
self.id = id
self.credential = credential
self.cursor = cursor
self.scrollback = ScrollbackCache(sessionId: id)
}
// MARK: - Public API
/// Opens (or re-opens) the WebSocket and sends a `resume` frame.
///
/// - Parameter lastSeq: The last acknowledged sequence number, or `nil`
/// to request replay from the beginning.
public func resume(from lastSeq: UInt64?) async {
// Tear down any existing connection cleanly before reconnecting.
await suspend()
// T-2.10: freeze stream if this is a reconnect (lastSeq != nil).
isStreamFrozen = (lastSeq != nil)
// T-2.10: start foreground-only keep-alive heartbeat.
keepAliveTask = Task {
while !Task.isCancelled {
do { try await Task.sleep(nanoseconds: 30_000_000_000) } // 30 s
catch { break }
}
}
#if DEBUG
// T-2.10: stub mode drive states in-process without a real WebSocket.
if stubMode {
connectionState = .connecting
Task { @MainActor [weak self] in
// 800 ms: long enough for XCUI to detect "Reconnecting",
// short enough to recover within the 2 s test window.
try? await Task.sleep(nanoseconds: 800_000_000)
guard let self, self.keepAliveTask != nil else { return } // suspended?
self.isStreamFrozen = false
self.connectionState = .connected
}
return
}
#endif
guard let url = streamURL else {
#if DEBUG
print("[SessionConnection] Could not construct stream URL for session \(id) — aborting resume.")
#endif
keepAliveTask?.cancel()
keepAliveTask = nil
return
}
let ws = WebSocketClient()
client = ws
// Mirror WebSocketClient's connection state into our @Published property.
ws.connectionState
.receive(on: DispatchQueue.main)
.sink { [weak self] state in
self?.connectionState = state
}
.store(in: &cancellables)
// Binary frames scrollback + cursor + downstream `stream` subject.
ws.incomingBinary
.sink { [weak self] frame in
guard let self else { return }
self.handleBinaryFrame(frame)
}
.store(in: &cancellables)
// JSON frames `stateEvents` + snapshot converter.
ws.incomingJSON
.sink { [weak self] frame in
guard let self else { return }
if case .snapshot(_, let base64) = frame {
self.isStreamFrozen = false // T-2.10: snapshot clears freeze
// Decode base64 text, prepend clear+home, normalise line endings.
if let raw = Data(base64Encoded: base64),
let text = String(data: raw, encoding: .utf8) {
let header = "\u{1B}[H\u{1B}[2J" // cursor home + clear screen
let body = text.replacingOccurrences(of: "\n", with: "\r\n")
self.snapshots.send(Data((header + body).utf8))
}
}
self.stateEvents.send(frame)
}
.store(in: &cancellables)
// Once connected, send the appropriate opening frame.
//
// lastSeq == nil fresh attach: live output already flows; caller
// will send resize then snapshotRequest. No frame sent here.
// lastSeq != nil reconnect after gap: replay missed output.
ws.connectionState
.filter { $0 == .connected }
.first()
.sink { [weak self, weak ws, lastSeq] _ in
guard let self, let ws else { return }
if let seq = lastSeq {
Task { @MainActor [self, ws, seq] in
try? await ws.send(.resume(lastSeq: seq))
_ = self
}
}
// fresh connect: caller drives resize snapshotRequest
}
.store(in: &cancellables)
ws.connect(url: url)
}
/// Sends a frame to the server.
///
/// - Throws: `WebSocketClientError.notConnected` if there is no active
/// socket, or `WebSocketClientError.encodingFailed` on serialisation
/// failure.
public func send(_ frame: ClientToServer) async throws {
guard let client else {
throw WebSocketClientError.notConnected
}
try await client.send(frame)
}
/// Closes the WebSocket but keeps local state (scrollback + cursor).
public func suspend() async {
// T-2.10: cancel keep-alive heartbeat and clear freeze flag
keepAliveTask?.cancel()
keepAliveTask = nil
isStreamFrozen = false
client?.disconnect()
client = nil
cancellables.removeAll()
connectionState = .disconnected
}
// MARK: - Binary frame processing
/// Central handler for incoming binary frames.
///
/// Called from the `incomingBinary` sink (production) and from the
/// `#if DEBUG` test helper below (unit tests).
///
/// **CG-3 decision informational, not blocking:**
/// `isStreamFrozen` is cleared on the first binary frame and the frame IS
/// forwarded to `stream`. In the IC-1 protocol the server only starts
/// sending data after it has processed our `resume` frame, so there are no
/// "stale" bytes that arrive while frozen the first binary frame IS the
/// first meaningful delta. Stream delivery is therefore never blocked;
/// `isStreamFrozen` serves the status bar ("Reconnecting") and the
/// `isStreamFrozen` unit tests, not as a gate on `stream`.
private func handleBinaryFrame(_ frame: BinaryFrame) {
isStreamFrozen = false // T-2.10: first delta thaws freeze
scrollback.append(frame.data)
cursor.update(sessionId: id, seq: frame.seq) // B-1: persist seq
stream.send(frame.data)
}
#if DEBUG
/// Test-only: simulate a binary frame arriving via the full production
/// code path (`handleBinaryFrame`), without needing a real WebSocket.
///
/// This drives the same `cursor.update` + `stream.send` logic that the
/// `incomingBinary` sink uses, so a regression in either is caught by
/// both production and test paths.
func _testOnly_receiveBinaryFrame(_ frame: BinaryFrame) {
handleBinaryFrame(frame)
}
#endif
// MARK: - URL construction
/// Builds `ws://<host>:<port>/sessions/<id>/stream?token=<bearerToken>`.
///
/// Returns `nil` if `URLComponents` cannot produce a valid URL (should
/// never happen in practice with well-formed credentials).
///
/// Note: plain `ws://` is used for now; TLS + cert-pinning wired in
/// the T-2.5 follow-up task.
private var streamURL: URL? {
var components = URLComponents()
components.scheme = "ws"
components.host = credential.host
components.port = credential.port
components.path = "/sessions/\(id)/stream"
components.queryItems = [
URLQueryItem(name: "token", value: credential.bearerToken)
]
return components.url
}
}