206 lines
5.6 KiB
TypeScript
206 lines
5.6 KiB
TypeScript
/**
|
|
* S-02 binary stream + S-04 sequence cursor resume + S-05 snapshot route.
|
|
*
|
|
* WS endpoint: GET /sessions/:id/stream
|
|
*
|
|
* Protocol (IC-1):
|
|
* - On connect: client sends { type: "resume"; lastSeq: number | null }
|
|
* - Server replays buffer chunks after lastSeq (binary frames with 8-byte seq header)
|
|
* - Then live output arrives as binary frames
|
|
* - Client may send { type: "snapshot-request" } → server sends { type: "snapshot"; ... }
|
|
* - State events pushed unsolicited: { type: "state"; ... }
|
|
* - Session meta pushed on connect: { type: "session-meta"; ... }
|
|
*
|
|
* Binary frame format: [seq: 8 bytes BE uint64][data: N bytes]
|
|
*
|
|
* Owner: T-1.5
|
|
*/
|
|
|
|
import type { IncomingMessage } from "node:http";
|
|
import type { Socket } from "node:net";
|
|
import { readChunks } from "../../buffer/reader.js";
|
|
import type { StateEvent } from "../../pi/events.js";
|
|
import { SequenceCounter } from "../../sequence.js";
|
|
import { ControlClient } from "../../tmux/control.js";
|
|
import { capturePane } from "../../tmux/snapshot.js";
|
|
import type { WsClient, WsServer } from "../types.js";
|
|
|
|
export interface StreamRouteOptions {
|
|
wss: WsServer;
|
|
isAuthenticated: (req: IncomingMessage) => boolean;
|
|
/** Called to get the current agent state for new connections */
|
|
getCurrentState?: () => StateEvent | null;
|
|
stateDir?: string;
|
|
}
|
|
|
|
// Per-session registry of active ControlClients and sequence counters
|
|
const _clients = new Map<
|
|
string,
|
|
{ control: ControlClient; seq: SequenceCounter }
|
|
>();
|
|
|
|
/**
|
|
* Get or create a ControlClient + SequenceCounter for a session.
|
|
*/
|
|
function getOrCreateSession(sessionId: string): {
|
|
control: ControlClient;
|
|
seq: SequenceCounter;
|
|
} {
|
|
const existing = _clients.get(sessionId);
|
|
if (existing) return existing;
|
|
|
|
const seq = new SequenceCounter();
|
|
const control = new ControlClient({
|
|
session: sessionId,
|
|
onClose: (reason) => {
|
|
console.error(
|
|
`[stream] control client closed for ${sessionId}: ${reason}`,
|
|
);
|
|
_clients.delete(sessionId);
|
|
},
|
|
});
|
|
control.start();
|
|
|
|
const entry = { control, seq };
|
|
_clients.set(sessionId, entry);
|
|
return entry;
|
|
}
|
|
|
|
/**
|
|
* Stop and remove a session's ControlClient (call on session kill).
|
|
*/
|
|
export function stopSession(sessionId: string): void {
|
|
const entry = _clients.get(sessionId);
|
|
if (entry) {
|
|
entry.control.stop();
|
|
_clients.delete(sessionId);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Handle a WebSocket upgrade for /sessions/:id/stream.
|
|
*/
|
|
export function handleStreamUpgrade(
|
|
sessionId: string,
|
|
request: IncomingMessage,
|
|
socket: Socket,
|
|
head: Buffer,
|
|
opts: StreamRouteOptions,
|
|
): void {
|
|
opts.wss.handleUpgrade(request, socket, head, (ws: WsClient) => {
|
|
handleStreamConnection(sessionId, ws, opts);
|
|
});
|
|
}
|
|
|
|
function handleStreamConnection(
|
|
sessionId: string,
|
|
ws: WsClient,
|
|
opts: StreamRouteOptions,
|
|
): void {
|
|
const { control, seq } = getOrCreateSession(sessionId);
|
|
let resumed = false;
|
|
|
|
// Push session-meta immediately
|
|
sendJson(ws, {
|
|
type: "session-meta",
|
|
name: sessionId,
|
|
createdAt: new Date().toISOString(),
|
|
});
|
|
|
|
// Push current state if available
|
|
const currentState = opts.getCurrentState?.();
|
|
if (currentState) {
|
|
sendJson(ws, {
|
|
type: "state",
|
|
value: currentState.value,
|
|
tool: currentState.tool,
|
|
ts: currentState.ts,
|
|
});
|
|
}
|
|
|
|
// Subscribe to live output
|
|
const unsubscribe = control.subscribe((chunk: Buffer) => {
|
|
if (ws.readyState !== 1 /* OPEN */) return;
|
|
const seqNum = seq.next();
|
|
const frame = buildBinaryFrame(seqNum, chunk);
|
|
sendBinary(ws, frame);
|
|
});
|
|
|
|
// Handle client messages
|
|
ws.on("message", (data: Buffer) => {
|
|
let msg: unknown;
|
|
try {
|
|
msg = JSON.parse(data.toString());
|
|
} catch {
|
|
sendJson(ws, {
|
|
type: "error",
|
|
code: "bad_message",
|
|
message: "Invalid JSON",
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (!msg || typeof msg !== "object") return;
|
|
const m = msg as Record<string, unknown>;
|
|
|
|
if (m.type === "resume" && !resumed) {
|
|
resumed = true;
|
|
const lastSeq = typeof m.lastSeq === "number" ? m.lastSeq : 0;
|
|
// Replay buffered chunks after lastSeq
|
|
const chunks = readChunks(sessionId, {
|
|
afterSeq: lastSeq,
|
|
cfg: opts.stateDir ? { stateDir: opts.stateDir } : undefined,
|
|
});
|
|
for (const chunk of chunks) {
|
|
if (ws.readyState !== 1) break;
|
|
sendBinary(ws, buildBinaryFrame(chunk.seq, chunk.data));
|
|
}
|
|
} else if (m.type === "snapshot-request") {
|
|
capturePane({ session: sessionId })
|
|
.then((text) => {
|
|
const data = Buffer.from(text).toString("base64");
|
|
const s = seq.next();
|
|
sendJson(ws, { type: "snapshot", seq: s, data });
|
|
})
|
|
.catch(() => {
|
|
sendJson(ws, {
|
|
type: "error",
|
|
code: "snapshot_failed",
|
|
message: "Failed to capture pane",
|
|
});
|
|
});
|
|
}
|
|
});
|
|
|
|
ws.on("close", () => {
|
|
unsubscribe();
|
|
});
|
|
|
|
ws.on("error", () => {
|
|
unsubscribe();
|
|
});
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** Build a binary frame: [seq: 8 bytes BE][data] */
|
|
function buildBinaryFrame(seqNum: number, data: Buffer): Buffer {
|
|
const header = Buffer.allocUnsafe(8);
|
|
header.writeBigUInt64BE(BigInt(seqNum), 0);
|
|
return Buffer.concat([header, data]);
|
|
}
|
|
|
|
function sendJson(ws: WsClient, msg: object): void {
|
|
if (ws.readyState === 1 /* OPEN */) {
|
|
ws.send(JSON.stringify(msg));
|
|
}
|
|
}
|
|
|
|
function sendBinary(ws: WsClient, data: Buffer): void {
|
|
if (ws.readyState === 1 /* OPEN */) {
|
|
(ws as unknown as { send(data: Buffer): void }).send(data);
|
|
}
|
|
}
|