diff --git a/Sources/Core/Persistence/ScrollbackCache.swift b/Sources/Core/Persistence/ScrollbackCache.swift new file mode 100644 index 0000000..47e800f --- /dev/null +++ b/Sources/Core/Persistence/ScrollbackCache.swift @@ -0,0 +1,133 @@ +// ScrollbackCache.swift +// Rolling on-disk cache of raw ANSI bytes per session. +// +// Design: +// • Storage: /pi-remote/scrollback/.bin +// • Cap: 5 MB. When an append would exceed the cap the oldest bytes are +// dropped from the front (slice off the head) so the file stays below cap. +// • Thread safety: a serial DispatchQueue guards all reads and writes. +// No async/await — this is called from both main and background contexts. + +import Foundation + +/// Rolling on-disk ANSI scrollback cache for a single session. +/// +/// - `append(_:)` is O(n) on the file size only when the 5 MB cap is hit +/// (the head-trim path). In the common case (data fits) it is a simple +/// `FileHandle.seekToEnd` + `write`. +/// - All public methods are safe to call from any thread or queue. +public final class ScrollbackCache: @unchecked Sendable { + + // MARK: - Constants + + private static let maxBytes = 5 * 1024 * 1024 // 5 MB + + // MARK: - State + + private let fileURL: URL + private let queue = DispatchQueue(label: "pi.scrollback", qos: .utility) + + // Cached file size tracked in memory to avoid repeated stat() calls. + private var _sizeBytes: Int = 0 + + // MARK: - Init + + /// Creates (or reopens) a cache for `sessionId` stored at + /// `/pi-remote/scrollback/.bin`. + public init(sessionId: String) { + let cachesDir = FileManager.default.urls( + for: .cachesDirectory, in: .userDomainMask + ).first ?? URL(fileURLWithPath: NSTemporaryDirectory()) + + let dir = cachesDir + .appendingPathComponent("pi-remote", isDirectory: true) + .appendingPathComponent("scrollback", isDirectory: true) + + // Best-effort directory creation — ignore errors, writes will surface + // any real problem. + try? FileManager.default.createDirectory( + at: dir, withIntermediateDirectories: true + ) + + fileURL = dir.appendingPathComponent("\(sessionId).bin") + + // Seed the in-memory size counter from the existing file (if any). + _sizeBytes = (try? fileURL.resourceValues(forKeys: [.fileSizeKey])) + .flatMap { $0.fileSize } ?? 0 + } + + // MARK: - Public API + + /// Appends `data` to the cache, trimming the head when the 5 MB cap + /// would be exceeded. + public func append(_ data: Data) { + guard !data.isEmpty else { return } + queue.sync { + _append(data) + } + } + + /// Returns the full current cache contents (may be empty). + public func read() -> Data { + queue.sync { + (try? Data(contentsOf: fileURL)) ?? Data() + } + } + + /// Deletes the cache file and resets the in-memory size counter. + public func clear() { + queue.sync { + try? FileManager.default.removeItem(at: fileURL) + _sizeBytes = 0 + } + } + + /// Current cache size in bytes (in-memory approximation, always accurate + /// after any `append` / `clear` call). + public var sizeBytes: Int { + queue.sync { _sizeBytes } + } + + // MARK: - Private (always called on `queue`) + + private func _append(_ data: Data) { + let newSize = _sizeBytes + data.count + + if newSize <= Self.maxBytes { + // Fast path: just append. + _write(data) + } else { + // Slow path: we need to drop bytes from the head. + // Read the existing content, combine with new data, then keep + // only the last `maxBytes` bytes so the result fits within cap. + let existing = (try? Data(contentsOf: fileURL)) ?? Data() + var combined = existing + combined.append(data) + + let excess = combined.count - Self.maxBytes + if excess > 0 { + combined = combined.dropFirst(excess).withUnsafeBytes { Data($0) } + } + + // Overwrite the file with the trimmed data. + try? combined.write(to: fileURL, options: .atomic) + _sizeBytes = combined.count + } + } + + private func _write(_ data: Data) { + if FileManager.default.fileExists(atPath: fileURL.path) { + // Append to existing file via FileHandle. + if let handle = try? FileHandle(forWritingTo: fileURL) { + defer { try? handle.close() } + handle.seekToEndOfFile() + handle.write(data) + _sizeBytes += data.count + } + } else { + // Create new file. + try? data.write(to: fileURL, options: .atomic) + _sizeBytes = data.count + } + } +} diff --git a/Sources/Core/Sessions/SessionConnection.swift b/Sources/Core/Sessions/SessionConnection.swift new file mode 100644 index 0000000..e504f31 --- /dev/null +++ b/Sources/Core/Sessions/SessionConnection.swift @@ -0,0 +1,163 @@ +// SessionConnection.swift +// IC-2.1 — one WebSocket connection per session. +// +// Lifecycle: +// init → resume(from:) → [stream data] → suspend() → resume(from:) → … +// +// URL pattern: ws://:/sessions//stream?token= +// (TLS pinning is wired in a follow-up task; plain `ws://` for now.) + +import Combine +import Foundation + +// MARK: - SessionConnection + +/// Manages a single IC-1 WebSocket session stream. +/// +/// Conforms to `ObservableObject` so SwiftUI views can react to +/// `connectionState` changes without manually subscribing to Combine. +/// +/// All mutable state is main-actor-isolated. Callers on background contexts +/// must dispatch accordingly (normal for `@MainActor` types). +@MainActor +public final class SessionConnection: ObservableObject { + + // MARK: - Identity + + /// The session identifier used in the URL path and scrollback file name. + public let id: String + + // MARK: - Combine publishers + + /// Emits raw ANSI bytes in arrival order (binary frames, header stripped). + public let stream = PassthroughSubject() + + /// Emits every JSON frame received from the server. + public let stateEvents = PassthroughSubject() + + /// Tracks the WebSocket lifecycle. + @Published public private(set) var connectionState: WebSocketClient.ConnectionState = .disconnected + + // MARK: - Scrollback + + /// Persistent rolling ANSI cache for this session. + public private(set) var scrollback: ScrollbackCache + + // MARK: - Private + + private let credential: SidecarCredential + private var client: WebSocketClient? + private var cancellables = Set() + + // MARK: - Init + + /// Creates a `SessionConnection` for `id` authenticated with `credential`. + /// + /// Does **not** open a WebSocket. Call `resume(from:)` to connect. + public init(id: String, credential: SidecarCredential) { + self.id = id + self.credential = credential + self.scrollback = ScrollbackCache(sessionId: id) + } + + // MARK: - Public API + + /// Opens (or re-opens) the WebSocket and sends a `resume` frame. + /// + /// - Parameter lastSeq: The last acknowledged sequence number, or `nil` + /// to request replay from the beginning. + public func resume(from lastSeq: UInt64?) async { + guard let url = streamURL else { + #if DEBUG + print("[SessionConnection] Could not construct stream URL for session \(id) — aborting resume.") + #endif + return + } + + // Tear down any existing connection cleanly before reconnecting. + await suspend() + + let ws = WebSocketClient() + client = ws + + // Mirror WebSocketClient's connection state into our @Published property. + ws.connectionState + .receive(on: DispatchQueue.main) + .sink { [weak self] state in + self?.connectionState = state + } + .store(in: &cancellables) + + // Binary frames → scrollback + downstream `stream` subject. + ws.incomingBinary + .sink { [weak self] frame in + guard let self else { return } + self.scrollback.append(frame.data) + self.stream.send(frame.data) + } + .store(in: &cancellables) + + // JSON frames → `stateEvents` subject. + ws.incomingJSON + .sink { [weak self] frame in + self?.stateEvents.send(frame) + } + .store(in: &cancellables) + + // Once connected, send the resume frame. + ws.connectionState + .filter { $0 == .connected } + .first() + .sink { [weak self, weak ws, lastSeq] _ in + guard let self, let ws else { return } + Task { @MainActor [self, ws, lastSeq] in + try? await ws.send(.resume(lastSeq: lastSeq)) + _ = self // silence unused-capture warning + } + } + .store(in: &cancellables) + + ws.connect(url: url) + } + + /// Sends a frame to the server. + /// + /// - Throws: `WebSocketClientError.notConnected` if there is no active + /// socket, or `WebSocketClientError.encodingFailed` on serialisation + /// failure. + public func send(_ frame: ClientToServer) async throws { + guard let client else { + throw WebSocketClientError.notConnected + } + try await client.send(frame) + } + + /// Closes the WebSocket but keeps local state (scrollback + cursor). + public func suspend() async { + client?.disconnect() + client = nil + cancellables.removeAll() + connectionState = .disconnected + } + + // MARK: - URL construction + + /// Builds `ws://:/sessions//stream?token=`. + /// + /// Returns `nil` if `URLComponents` cannot produce a valid URL (should + /// never happen in practice with well-formed credentials). + /// + /// Note: plain `ws://` is used for now; TLS + cert-pinning wired in + /// the T-2.5 follow-up task. + private var streamURL: URL? { + var components = URLComponents() + components.scheme = "ws" + components.host = credential.host + components.port = credential.port + components.path = "/sessions/\(id)/stream" + components.queryItems = [ + URLQueryItem(name: "token", value: credential.bearerToken) + ] + return components.url + } +}