feat(T-2.5): SessionConnection (IC-2.1) + ScrollbackCache (5MB ring)

This commit is contained in:
jay 2026-05-15 18:54:56 +02:00
parent a5c937ad75
commit 048036d6a7
2 changed files with 296 additions and 0 deletions

View File

@ -0,0 +1,133 @@
// ScrollbackCache.swift
// Rolling on-disk cache of raw ANSI bytes per session.
//
// Design:
// Storage: <caches>/pi-remote/scrollback/<sessionId>.bin
// Cap: 5 MB. When an append would exceed the cap the oldest bytes are
// dropped from the front (slice off the head) so the file stays below cap.
// Thread safety: a serial DispatchQueue guards all reads and writes.
// No async/await this is called from both main and background contexts.
import Foundation
/// Rolling on-disk ANSI scrollback cache for a single session.
///
/// - `append(_:)` is O(n) on the file size only when the 5 MB cap is hit
/// (the head-trim path). In the common case (data fits) it is a simple
/// `FileHandle.seekToEnd` + `write`.
/// - All public methods are safe to call from any thread or queue.
public final class ScrollbackCache: @unchecked Sendable {
// MARK: - Constants
private static let maxBytes = 5 * 1024 * 1024 // 5 MB
// MARK: - State
private let fileURL: URL
private let queue = DispatchQueue(label: "pi.scrollback", qos: .utility)
// Cached file size tracked in memory to avoid repeated stat() calls.
private var _sizeBytes: Int = 0
// MARK: - Init
/// Creates (or reopens) a cache for `sessionId` stored at
/// `<caches>/pi-remote/scrollback/<sessionId>.bin`.
public init(sessionId: String) {
let cachesDir = FileManager.default.urls(
for: .cachesDirectory, in: .userDomainMask
).first ?? URL(fileURLWithPath: NSTemporaryDirectory())
let dir = cachesDir
.appendingPathComponent("pi-remote", isDirectory: true)
.appendingPathComponent("scrollback", isDirectory: true)
// Best-effort directory creation ignore errors, writes will surface
// any real problem.
try? FileManager.default.createDirectory(
at: dir, withIntermediateDirectories: true
)
fileURL = dir.appendingPathComponent("\(sessionId).bin")
// Seed the in-memory size counter from the existing file (if any).
_sizeBytes = (try? fileURL.resourceValues(forKeys: [.fileSizeKey]))
.flatMap { $0.fileSize } ?? 0
}
// MARK: - Public API
/// Appends `data` to the cache, trimming the head when the 5 MB cap
/// would be exceeded.
public func append(_ data: Data) {
guard !data.isEmpty else { return }
queue.sync {
_append(data)
}
}
/// Returns the full current cache contents (may be empty).
public func read() -> Data {
queue.sync {
(try? Data(contentsOf: fileURL)) ?? Data()
}
}
/// Deletes the cache file and resets the in-memory size counter.
public func clear() {
queue.sync {
try? FileManager.default.removeItem(at: fileURL)
_sizeBytes = 0
}
}
/// Current cache size in bytes (in-memory approximation, always accurate
/// after any `append` / `clear` call).
public var sizeBytes: Int {
queue.sync { _sizeBytes }
}
// MARK: - Private (always called on `queue`)
private func _append(_ data: Data) {
let newSize = _sizeBytes + data.count
if newSize <= Self.maxBytes {
// Fast path: just append.
_write(data)
} else {
// Slow path: we need to drop bytes from the head.
// Read the existing content, combine with new data, then keep
// only the last `maxBytes` bytes so the result fits within cap.
let existing = (try? Data(contentsOf: fileURL)) ?? Data()
var combined = existing
combined.append(data)
let excess = combined.count - Self.maxBytes
if excess > 0 {
combined = combined.dropFirst(excess).withUnsafeBytes { Data($0) }
}
// Overwrite the file with the trimmed data.
try? combined.write(to: fileURL, options: .atomic)
_sizeBytes = combined.count
}
}
private func _write(_ data: Data) {
if FileManager.default.fileExists(atPath: fileURL.path) {
// Append to existing file via FileHandle.
if let handle = try? FileHandle(forWritingTo: fileURL) {
defer { try? handle.close() }
handle.seekToEndOfFile()
handle.write(data)
_sizeBytes += data.count
}
} else {
// Create new file.
try? data.write(to: fileURL, options: .atomic)
_sizeBytes = data.count
}
}
}

View File

@ -0,0 +1,163 @@
// SessionConnection.swift
// IC-2.1 one WebSocket connection per session.
//
// Lifecycle:
// init resume(from:) [stream data] suspend() resume(from:)
//
// URL pattern: ws://<host>:<port>/sessions/<id>/stream?token=<bearerToken>
// (TLS pinning is wired in a follow-up task; plain `ws://` for now.)
import Combine
import Foundation
// MARK: - SessionConnection
/// Manages a single IC-1 WebSocket session stream.
///
/// Conforms to `ObservableObject` so SwiftUI views can react to
/// `connectionState` changes without manually subscribing to Combine.
///
/// All mutable state is main-actor-isolated. Callers on background contexts
/// must dispatch accordingly (normal for `@MainActor` types).
@MainActor
public final class SessionConnection: ObservableObject {
// MARK: - Identity
/// The session identifier used in the URL path and scrollback file name.
public let id: String
// MARK: - Combine publishers
/// Emits raw ANSI bytes in arrival order (binary frames, header stripped).
public let stream = PassthroughSubject<Data, Never>()
/// Emits every JSON frame received from the server.
public let stateEvents = PassthroughSubject<ServerToClient, Never>()
/// Tracks the WebSocket lifecycle.
@Published public private(set) var connectionState: WebSocketClient.ConnectionState = .disconnected
// MARK: - Scrollback
/// Persistent rolling ANSI cache for this session.
public private(set) var scrollback: ScrollbackCache
// MARK: - Private
private let credential: SidecarCredential
private var client: WebSocketClient?
private var cancellables = Set<AnyCancellable>()
// MARK: - Init
/// Creates a `SessionConnection` for `id` authenticated with `credential`.
///
/// Does **not** open a WebSocket. Call `resume(from:)` to connect.
public init(id: String, credential: SidecarCredential) {
self.id = id
self.credential = credential
self.scrollback = ScrollbackCache(sessionId: id)
}
// MARK: - Public API
/// Opens (or re-opens) the WebSocket and sends a `resume` frame.
///
/// - Parameter lastSeq: The last acknowledged sequence number, or `nil`
/// to request replay from the beginning.
public func resume(from lastSeq: UInt64?) async {
guard let url = streamURL else {
#if DEBUG
print("[SessionConnection] Could not construct stream URL for session \(id) — aborting resume.")
#endif
return
}
// Tear down any existing connection cleanly before reconnecting.
await suspend()
let ws = WebSocketClient()
client = ws
// Mirror WebSocketClient's connection state into our @Published property.
ws.connectionState
.receive(on: DispatchQueue.main)
.sink { [weak self] state in
self?.connectionState = state
}
.store(in: &cancellables)
// Binary frames scrollback + downstream `stream` subject.
ws.incomingBinary
.sink { [weak self] frame in
guard let self else { return }
self.scrollback.append(frame.data)
self.stream.send(frame.data)
}
.store(in: &cancellables)
// JSON frames `stateEvents` subject.
ws.incomingJSON
.sink { [weak self] frame in
self?.stateEvents.send(frame)
}
.store(in: &cancellables)
// Once connected, send the resume frame.
ws.connectionState
.filter { $0 == .connected }
.first()
.sink { [weak self, weak ws, lastSeq] _ in
guard let self, let ws else { return }
Task { @MainActor [self, ws, lastSeq] in
try? await ws.send(.resume(lastSeq: lastSeq))
_ = self // silence unused-capture warning
}
}
.store(in: &cancellables)
ws.connect(url: url)
}
/// Sends a frame to the server.
///
/// - Throws: `WebSocketClientError.notConnected` if there is no active
/// socket, or `WebSocketClientError.encodingFailed` on serialisation
/// failure.
public func send(_ frame: ClientToServer) async throws {
guard let client else {
throw WebSocketClientError.notConnected
}
try await client.send(frame)
}
/// Closes the WebSocket but keeps local state (scrollback + cursor).
public func suspend() async {
client?.disconnect()
client = nil
cancellables.removeAll()
connectionState = .disconnected
}
// MARK: - URL construction
/// Builds `ws://<host>:<port>/sessions/<id>/stream?token=<bearerToken>`.
///
/// Returns `nil` if `URLComponents` cannot produce a valid URL (should
/// never happen in practice with well-formed credentials).
///
/// Note: plain `ws://` is used for now; TLS + cert-pinning wired in
/// the T-2.5 follow-up task.
private var streamURL: URL? {
var components = URLComponents()
components.scheme = "ws"
components.host = credential.host
components.port = credential.port
components.path = "/sessions/\(id)/stream"
components.queryItems = [
URLQueryItem(name: "token", value: credential.bearerToken)
]
return components.url
}
}