pi-remote-control/extensions/remote-control/server/routes/stream.ts

262 lines
7.4 KiB
TypeScript

/**
* S-02 binary stream + S-04 sequence cursor resume + S-05 snapshot route.
*
* WS endpoint: GET /sessions/:id/stream
*
* Protocol (IC-1):
* - On connect: client sends { type: "resume"; lastSeq: number | null }
* - Server replays buffer chunks after lastSeq (binary frames with 8-byte seq header)
* - Then live output arrives as binary frames
* - Client may send { type: "snapshot-request" } → server sends { type: "snapshot"; ... }
* - State events pushed unsolicited: { type: "state"; ... }
* - Session meta pushed on connect: { type: "session-meta"; ... }
*
* Binary frame format: [seq: 8 bytes BE uint64][data: N bytes]
*
* Owner: T-1.5
*/
import type { IncomingMessage } from "node:http";
import type { Socket } from "node:net";
import { readChunks } from "../../buffer/reader.js";
import type { StateEvent } from "../../pi/events.js";
import { SequenceCounter } from "../../sequence.js";
import { ControlClient } from "../../tmux/control.js";
import { sendKey, sendKeys, sendPaste } from "../../tmux/input.js";
import { resizeSession } from "../../tmux/manager.js";
import { capturePane } from "../../tmux/snapshot.js";
import type { WsClient, WsServer } from "../types.js";
export interface StreamRouteOptions {
wss: WsServer;
isAuthenticated: (req: IncomingMessage) => boolean;
/** Called to get the current agent state for new connections */
getCurrentState?: () => StateEvent | null;
stateDir?: string;
}
// Per-session registry of active ControlClients and sequence counters
const _clients = new Map<
string,
{ control: ControlClient; seq: SequenceCounter }
>();
/**
* Get or create a ControlClient + SequenceCounter for a session.
*/
function getOrCreateSession(sessionId: string): {
control: ControlClient;
seq: SequenceCounter;
} {
const existing = _clients.get(sessionId);
if (existing) return existing;
const seq = new SequenceCounter();
const control = new ControlClient({
session: sessionId,
onClose: (reason) => {
console.error(
`[stream] control client closed for ${sessionId}: ${reason}`,
);
_clients.delete(sessionId);
},
});
control.start();
const entry = { control, seq };
_clients.set(sessionId, entry);
return entry;
}
/**
* Stop and remove a session's ControlClient (call on session kill).
*/
export function stopSession(sessionId: string): void {
const entry = _clients.get(sessionId);
if (entry) {
entry.control.stop();
_clients.delete(sessionId);
}
}
/**
* Handle a WebSocket upgrade for /sessions/:id/stream.
*/
export function handleStreamUpgrade(
sessionId: string,
request: IncomingMessage,
socket: Socket,
head: Buffer,
opts: StreamRouteOptions,
): void {
opts.wss.handleUpgrade(request, socket, head, (ws: WsClient) => {
handleStreamConnection(sessionId, ws, opts);
});
}
function handleStreamConnection(
sessionId: string,
ws: WsClient,
opts: StreamRouteOptions,
): void {
const { control, seq } = getOrCreateSession(sessionId);
let resumed = false;
// Push session-meta immediately
sendJson(ws, {
type: "session-meta",
name: sessionId,
createdAt: new Date().toISOString(),
});
// Push current state if available
const currentState = opts.getCurrentState?.();
if (currentState) {
sendJson(ws, {
type: "state",
value: currentState.value,
tool: currentState.tool,
ts: currentState.ts,
});
}
// Subscribe to live output
const unsubscribe = control.subscribe((chunk: Buffer) => {
if (ws.readyState !== 1 /* OPEN */) return;
const seqNum = seq.next();
const frame = buildBinaryFrame(seqNum, chunk);
sendBinary(ws, frame);
});
// Handle client messages
ws.on("message", (data: Buffer) => {
let msg: unknown;
try {
msg = JSON.parse(data.toString());
} catch {
sendJson(ws, {
type: "error",
code: "bad_message",
message: "Invalid JSON",
});
return;
}
if (!msg || typeof msg !== "object") return;
const m = msg as Record<string, unknown>;
if (m.type === "resume" && !resumed) {
resumed = true;
const lastSeq = typeof m.lastSeq === "number" ? m.lastSeq : 0;
// Replay buffered chunks after lastSeq
const chunks = readChunks(sessionId, {
afterSeq: lastSeq,
cfg: opts.stateDir ? { stateDir: opts.stateDir } : undefined,
});
for (const chunk of chunks) {
if (ws.readyState !== 1) break;
sendBinary(ws, buildBinaryFrame(chunk.seq, chunk.data));
}
} else if (m.type === "resize") {
// IC-1 extension: client reports its actual terminal dimensions.
// Resize the tmux window so line-wrapping matches what the client sees.
const cols = typeof m.cols === "number" ? m.cols : 80;
const rows = typeof m.rows === "number" ? m.rows : 24;
resizeSession(sessionId, cols, rows).catch(() => {});
} else if (m.type === "keys") {
if (typeof m.data !== "string") {
sendJson(ws, {
type: "error",
code: "bad_input",
message: "keys.data must be a string",
});
return;
}
sendKeys(sessionId, m.data).catch((err) => {
sendJson(ws, {
type: "error",
code: "bad_input",
message: `Failed to send keys: ${String(err)}`,
});
});
} else if (m.type === "key") {
if (typeof m.name !== "string") {
sendJson(ws, {
type: "error",
code: "bad_input",
message: "key.name must be a string",
});
return;
}
sendKey(sessionId, m.name).catch((err) => {
sendJson(ws, {
type: "error",
code: "bad_input",
message: `Failed to send key: ${String(err)}`,
});
});
} else if (m.type === "paste") {
if (typeof m.data !== "string") {
sendJson(ws, {
type: "error",
code: "bad_input",
message: "paste.data must be a string",
});
return;
}
sendPaste(sessionId, m.data).catch((err) => {
sendJson(ws, {
type: "error",
code: "bad_input",
message: `Failed to send paste: ${String(err)}`,
});
});
} else if (m.type === "snapshot-request") {
capturePane({ session: sessionId, escapes: true })
.then((text) => {
const data = Buffer.from(text).toString("base64");
const s = seq.next();
sendJson(ws, { type: "snapshot", seq: s, data });
})
.catch(() => {
sendJson(ws, {
type: "error",
code: "snapshot_failed",
message: "Failed to capture pane",
});
});
}
});
ws.on("close", () => {
unsubscribe();
});
ws.on("error", () => {
unsubscribe();
});
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Build a binary frame: [seq: 8 bytes BE][data] */
function buildBinaryFrame(seqNum: number, data: Buffer): Buffer {
const header = Buffer.allocUnsafe(8);
header.writeBigUInt64BE(BigInt(seqNum), 0);
return Buffer.concat([header, data]);
}
function sendJson(ws: WsClient, msg: object): void {
if (ws.readyState === 1 /* OPEN */) {
ws.send(JSON.stringify(msg));
}
}
function sendBinary(ws: WsClient, data: Buffer): void {
if (ws.readyState === 1 /* OPEN */) {
(ws as unknown as { send(data: Buffer): void }).send(data);
}
}