/** * S-02 binary stream + S-04 sequence cursor resume + S-05 snapshot route. * * WS endpoint: GET /sessions/:id/stream * * Protocol (IC-1): * - On connect: client sends { type: "resume"; lastSeq: number | null } * - Server replays buffer chunks after lastSeq (binary frames with 8-byte seq header) * - Then live output arrives as binary frames * - Client may send { type: "snapshot-request" } → server sends { type: "snapshot"; ... } * - State events pushed unsolicited: { type: "state"; ... } * - Session meta pushed on connect: { type: "session-meta"; ... } * * Binary frame format: [seq: 8 bytes BE uint64][data: N bytes] * * Owner: T-1.5 */ import type { IncomingMessage } from "node:http"; import type { Socket } from "node:net"; import { readChunks } from "../../buffer/reader.js"; import type { StateEvent } from "../../pi/events.js"; import { SequenceCounter } from "../../sequence.js"; import { ControlClient } from "../../tmux/control.js"; import { resizeSession } from "../../tmux/manager.js"; import { capturePane } from "../../tmux/snapshot.js"; import type { WsClient, WsServer } from "../types.js"; export interface StreamRouteOptions { wss: WsServer; isAuthenticated: (req: IncomingMessage) => boolean; /** Called to get the current agent state for new connections */ getCurrentState?: () => StateEvent | null; stateDir?: string; } // Per-session registry of active ControlClients and sequence counters const _clients = new Map< string, { control: ControlClient; seq: SequenceCounter } >(); /** * Get or create a ControlClient + SequenceCounter for a session. */ function getOrCreateSession(sessionId: string): { control: ControlClient; seq: SequenceCounter; } { const existing = _clients.get(sessionId); if (existing) return existing; const seq = new SequenceCounter(); const control = new ControlClient({ session: sessionId, onClose: (reason) => { console.error( `[stream] control client closed for ${sessionId}: ${reason}`, ); _clients.delete(sessionId); }, }); control.start(); const entry = { control, seq }; _clients.set(sessionId, entry); return entry; } /** * Stop and remove a session's ControlClient (call on session kill). */ export function stopSession(sessionId: string): void { const entry = _clients.get(sessionId); if (entry) { entry.control.stop(); _clients.delete(sessionId); } } /** * Handle a WebSocket upgrade for /sessions/:id/stream. */ export function handleStreamUpgrade( sessionId: string, request: IncomingMessage, socket: Socket, head: Buffer, opts: StreamRouteOptions, ): void { opts.wss.handleUpgrade(request, socket, head, (ws: WsClient) => { handleStreamConnection(sessionId, ws, opts); }); } function handleStreamConnection( sessionId: string, ws: WsClient, opts: StreamRouteOptions, ): void { const { control, seq } = getOrCreateSession(sessionId); let resumed = false; // Push session-meta immediately sendJson(ws, { type: "session-meta", name: sessionId, createdAt: new Date().toISOString(), }); // Push current state if available const currentState = opts.getCurrentState?.(); if (currentState) { sendJson(ws, { type: "state", value: currentState.value, tool: currentState.tool, ts: currentState.ts, }); } // Subscribe to live output const unsubscribe = control.subscribe((chunk: Buffer) => { if (ws.readyState !== 1 /* OPEN */) return; const seqNum = seq.next(); const frame = buildBinaryFrame(seqNum, chunk); sendBinary(ws, frame); }); // Handle client messages ws.on("message", (data: Buffer) => { let msg: unknown; try { msg = JSON.parse(data.toString()); } catch { sendJson(ws, { type: "error", code: "bad_message", message: "Invalid JSON", }); return; } if (!msg || typeof msg !== "object") return; const m = msg as Record; if (m.type === "resume" && !resumed) { resumed = true; const lastSeq = typeof m.lastSeq === "number" ? m.lastSeq : 0; // Replay buffered chunks after lastSeq const chunks = readChunks(sessionId, { afterSeq: lastSeq, cfg: opts.stateDir ? { stateDir: opts.stateDir } : undefined, }); for (const chunk of chunks) { if (ws.readyState !== 1) break; sendBinary(ws, buildBinaryFrame(chunk.seq, chunk.data)); } } else if (m.type === "resize") { // IC-1 extension: client reports its actual terminal dimensions. // Resize the tmux window so line-wrapping matches what the client sees. const cols = typeof m.cols === "number" ? m.cols : 80; const rows = typeof m.rows === "number" ? m.rows : 24; resizeSession(sessionId, cols, rows).catch(() => {}); } else if (m.type === "snapshot-request") { capturePane({ session: sessionId }) .then((text) => { const data = Buffer.from(text).toString("base64"); const s = seq.next(); sendJson(ws, { type: "snapshot", seq: s, data }); }) .catch(() => { sendJson(ws, { type: "error", code: "snapshot_failed", message: "Failed to capture pane", }); }); } }); ws.on("close", () => { unsubscribe(); }); ws.on("error", () => { unsubscribe(); }); } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- /** Build a binary frame: [seq: 8 bytes BE][data] */ function buildBinaryFrame(seqNum: number, data: Buffer): Buffer { const header = Buffer.allocUnsafe(8); header.writeBigUInt64BE(BigInt(seqNum), 0); return Buffer.concat([header, data]); } function sendJson(ws: WsClient, msg: object): void { if (ws.readyState === 1 /* OPEN */) { ws.send(JSON.stringify(msg)); } } function sendBinary(ws: WsClient, data: Buffer): void { if (ws.readyState === 1 /* OPEN */) { (ws as unknown as { send(data: Buffer): void }).send(data); } }