From b94b668df66468cdb07d5f2eb21a6d7ef539c8dd Mon Sep 17 00:00:00 2001 From: jay Date: Fri, 15 May 2026 11:35:55 +0200 Subject: [PATCH] feat(T-1.5/1.6/1.7): stream+input+snapshot routes, sessions CRUD, commands, side-channel, health endpoint --- .../remote-control/server/routes/commands.ts | 36 +++ .../remote-control/server/routes/health.ts | 96 ++++++++ .../remote-control/server/routes/input.ts | 105 +++++++++ .../remote-control/server/routes/sessions.ts | 166 ++++++++++++++ .../remote-control/server/routes/side.ts | 119 ++++++++++ .../remote-control/server/routes/stream.ts | 205 ++++++++++++++++++ extensions/remote-control/server/server.ts | 77 ++++++- extensions/remote-control/server/upgrade.ts | 62 +++++- extensions/remote-control/server/util.ts | 47 ++++ 9 files changed, 900 insertions(+), 13 deletions(-) create mode 100644 extensions/remote-control/server/routes/commands.ts create mode 100644 extensions/remote-control/server/routes/health.ts create mode 100644 extensions/remote-control/server/routes/input.ts create mode 100644 extensions/remote-control/server/routes/sessions.ts create mode 100644 extensions/remote-control/server/routes/side.ts create mode 100644 extensions/remote-control/server/routes/stream.ts create mode 100644 extensions/remote-control/server/util.ts diff --git a/extensions/remote-control/server/routes/commands.ts b/extensions/remote-control/server/routes/commands.ts new file mode 100644 index 0000000..2fcc9ef --- /dev/null +++ b/extensions/remote-control/server/routes/commands.ts @@ -0,0 +1,36 @@ +/** + * S-08 — slash-command registry route. + * + * GET /sessions/:id/commands → [{ name, description, args }] + * + * Returns the list of slash commands available in the current pi session. + * Delegates to pi/commands.ts (T-1.4). + * + * Owner: T-1.6 + */ + +import type { IncomingMessage, ServerResponse } from "node:http"; +import type { ExtensionAPI } from "@earendil-works/pi-coding-agent"; +import { getCommands } from "../../pi/commands.js"; +import { getSession } from "../../tmux/manager.js"; +import { sendJson } from "../util.js"; + +export async function handleCommands( + _req: IncomingMessage, + res: ServerResponse, + sessionId: string, + pi: ExtensionAPI, +): Promise { + const session = await getSession(sessionId).catch(() => null); + if (!session) { + sendJson(res, 404, { error: "session_not_found" }); + return; + } + + try { + const commands = await getCommands(pi); + sendJson(res, 200, commands); + } catch (err) { + sendJson(res, 500, { error: "internal_error", message: String(err) }); + } +} diff --git a/extensions/remote-control/server/routes/health.ts b/extensions/remote-control/server/routes/health.ts new file mode 100644 index 0000000..aca1c17 --- /dev/null +++ b/extensions/remote-control/server/routes/health.ts @@ -0,0 +1,96 @@ +/** + * S-12 — health endpoint. + * + * GET /health → { ok, sessions, bufferBytes, uptime, version } + * + * Also integrates the disk watchdog: on each health call we check free space + * and total buffer size, returning a warning if caps are near. + * + * Owner: T-1.7 + */ + +import { execFile } from "node:child_process"; +import fs from "node:fs/promises"; +import type { IncomingMessage, ServerResponse } from "node:http"; +import os from "node:os"; +import path from "node:path"; +import { promisify } from "node:util"; +import { listSessions } from "../../tmux/manager.js"; +import { sendJson } from "../util.js"; + +const execFileAsync = promisify(execFile); + +const _startedAt = Date.now(); + +export interface HealthOptions { + stateDir?: string; +} + +export async function handleHealth( + _req: IncomingMessage, + res: ServerResponse, + opts: HealthOptions = {}, +): Promise { + const stateDir = + opts.stateDir ?? path.join(os.homedir(), ".local", "share", "pi-remote"); + + const [sessions, bufferBytes, freeBytes] = await Promise.all([ + listSessions().catch(() => []), + getTotalBufferBytes(stateDir), + getFreeBytes(stateDir), + ]); + + const freeGb = freeBytes / (1024 * 1024 * 1024); + const bufferMb = bufferBytes / (1024 * 1024); + + sendJson(res, 200, { + ok: true, + uptime: Math.floor((Date.now() - _startedAt) / 1000), + sessions: sessions.length, + sessionIds: sessions.map((s) => s.id), + bufferMb: Math.round(bufferMb * 10) / 10, + diskFreeGb: Math.round(freeGb * 10) / 10, + warnings: buildWarnings(freeGb, bufferMb), + }); +} + +function buildWarnings(freeGb: number, bufferMb: number): string[] { + const warnings: string[] = []; + if (freeGb < 1) warnings.push(`Low disk space: ${freeGb.toFixed(1)} GB free`); + if (bufferMb > 900) + warnings.push(`Buffer near cap: ${bufferMb.toFixed(0)} MB used`); + return warnings; +} + +async function getTotalBufferBytes(stateDir: string): Promise { + const bufDir = path.join(stateDir, "buffers"); + try { + const entries = await fs.readdir(bufDir, { withFileTypes: true }); + let total = 0; + for (const e of entries) { + if (!e.name.endsWith(".buf")) continue; + try { + const stat = await fs.stat(path.join(bufDir, e.name)); + total += stat.size; + } catch { + // skip + } + } + return total; + } catch { + return 0; + } +} + +async function getFreeBytes(dir: string): Promise { + try { + const { stdout } = await execFileAsync("df", ["-k", dir]); + const lines = stdout.trim().split("\n"); + const last = lines[lines.length - 1]; + const parts = last.split(/\s+/); + const availKb = parseInt(parts[3], 10); + return availKb * 1024; + } catch { + return Number.POSITIVE_INFINITY; // unknown — don't warn + } +} diff --git a/extensions/remote-control/server/routes/input.ts b/extensions/remote-control/server/routes/input.ts new file mode 100644 index 0000000..0d6df4d --- /dev/null +++ b/extensions/remote-control/server/routes/input.ts @@ -0,0 +1,105 @@ +/** + * S-03 — send-keys input route. + * + * POST /sessions/:id/input + * + * Body (IC-1 ClientToServer subset, but as HTTP POST for non-WS clients): + * { type: "key"; name: string } + * { type: "keys"; data: string } + * { type: "paste"; data: string } + * + * Response: 204 No Content on success, 400 on bad input, 404 if session missing. + * + * Note: the primary path for send-keys is via WS (T-1.5 stream route handles + * key/keys/paste messages inline). This HTTP endpoint is for clients that + * don't have an open stream (e.g. one-shot CLI tools). + * + * Owner: T-1.5 + */ + +import type { IncomingMessage, ServerResponse } from "node:http"; +import { sendKey, sendKeys, sendPaste } from "../../tmux/input.js"; +import { getSession } from "../../tmux/manager.js"; +import { readBody, sendJson } from "../util.js"; + +export async function handleInput( + req: IncomingMessage, + res: ServerResponse, + sessionId: string, +): Promise { + const session = await getSession(sessionId).catch(() => null); + if (!session) { + sendJson(res, 404, { + error: "session_not_found", + message: `Session "${sessionId}" not found`, + }); + return; + } + + let body: unknown; + try { + body = JSON.parse(await readBody(req)); + } catch { + sendJson(res, 400, { error: "bad_request", message: "Invalid JSON body" }); + return; + } + + if (!body || typeof body !== "object") { + sendJson(res, 400, { + error: "bad_request", + message: "Body must be a JSON object", + }); + return; + } + + const m = body as Record; + + try { + switch (m.type) { + case "key": { + if (typeof m.name !== "string") { + sendJson(res, 400, { + error: "bad_request", + message: "key.name must be a string", + }); + return; + } + await sendKey(sessionId, m.name); + break; + } + case "keys": { + if (typeof m.data !== "string") { + sendJson(res, 400, { + error: "bad_request", + message: "keys.data must be a string", + }); + return; + } + await sendKeys(sessionId, m.data); + break; + } + case "paste": { + if (typeof m.data !== "string") { + sendJson(res, 400, { + error: "bad_request", + message: "paste.data must be a string", + }); + return; + } + await sendPaste(sessionId, m.data); + break; + } + default: + sendJson(res, 400, { + error: "bad_request", + message: `Unknown type: ${String(m.type)}`, + }); + return; + } + } catch (err) { + sendJson(res, 500, { error: "internal_error", message: String(err) }); + return; + } + + res.writeHead(204).end(); +} diff --git a/extensions/remote-control/server/routes/sessions.ts b/extensions/remote-control/server/routes/sessions.ts new file mode 100644 index 0000000..c1b5799 --- /dev/null +++ b/extensions/remote-control/server/routes/sessions.ts @@ -0,0 +1,166 @@ +/** + * S-09 — multi-session CRUD routes. + * + * POST /sessions → { id, name } + * GET /sessions → [{ id, name, description, state, lastOutputAt }] + * PATCH /sessions/:id → updates @description + * DELETE /sessions/:id → kills tmux session, optionally clears buffer + * GET /sessions/:id/thumbnail → text/plain capture-pane (40×12) + * + * Owner: T-1.6 + */ + +import type { IncomingMessage, ServerResponse } from "node:http"; +import { BufferWriter } from "../../buffer/writer.js"; +import { + getSession, + killSession, + listSessions, + setDescription, + spawnSession, +} from "../../tmux/manager.js"; +import { captureThumbnail } from "../../tmux/snapshot.js"; +import { readBody, sendJson } from "../util.js"; + +export async function handleSessions( + req: IncomingMessage, + res: ServerResponse, + sessionId?: string, + sub?: string, +): Promise { + const method = req.method?.toUpperCase(); + + // GET /sessions/:id/thumbnail + if (sessionId && sub === "thumbnail" && method === "GET") { + const session = await getSession(sessionId).catch(() => null); + if (!session) { + sendJson(res, 404, { error: "session_not_found" }); + return; + } + const text = await captureThumbnail(sessionId).catch(() => ""); + res.writeHead(200, { "Content-Type": "text/plain; charset=utf-8" }); + res.end(text); + return; + } + + // /sessions/:id — PATCH / DELETE + if (sessionId && !sub) { + if (method === "PATCH") { + await handlePatch(req, res, sessionId); + return; + } + if (method === "DELETE") { + await handleDelete(req, res, sessionId); + return; + } + sendJson(res, 405, { error: "method_not_allowed" }); + return; + } + + // /sessions — GET / POST + if (!sessionId) { + if (method === "GET") { + await handleList(res); + return; + } + if (method === "POST") { + await handleCreate(req, res); + return; + } + sendJson(res, 405, { error: "method_not_allowed" }); + return; + } + + sendJson(res, 404, { error: "not_found" }); +} + +async function handleList(res: ServerResponse): Promise { + try { + const sessions = await listSessions(); + const payload = sessions.map((s) => ({ + id: s.id, + name: s.name, + description: s.description, + state: "idle", // T-1.4 events will feed real state in Phase 2 + lastOutputAt: s.lastActivityAt, + })); + sendJson(res, 200, payload); + } catch (err) { + sendJson(res, 500, { error: "internal_error", message: String(err) }); + } +} + +async function handleCreate( + req: IncomingMessage, + res: ServerResponse, +): Promise { + let body: Record = {}; + try { + const raw = await readBody(req); + if (raw.trim()) body = JSON.parse(raw) as Record; + } catch { + sendJson(res, 400, { error: "bad_request", message: "Invalid JSON" }); + return; + } + + const name = + typeof body.name === "string" && body.name.trim() + ? body.name.trim() + : `session-${Date.now()}`; + + try { + const id = await spawnSession({ name }); + sendJson(res, 201, { id, name }); + } catch (err) { + sendJson(res, 500, { error: "internal_error", message: String(err) }); + } +} + +async function handlePatch( + req: IncomingMessage, + res: ServerResponse, + sessionId: string, +): Promise { + const session = await getSession(sessionId).catch(() => null); + if (!session) { + sendJson(res, 404, { error: "session_not_found" }); + return; + } + + let body: Record = {}; + try { + const raw = await readBody(req); + if (raw.trim()) body = JSON.parse(raw) as Record; + } catch { + sendJson(res, 400, { error: "bad_request", message: "Invalid JSON" }); + return; + } + + if (typeof body.description === "string") { + await setDescription(sessionId, body.description); + } + + sendJson(res, 200, { id: sessionId, description: body.description }); +} + +async function handleDelete( + _req: IncomingMessage, + res: ServerResponse, + sessionId: string, +): Promise { + const session = await getSession(sessionId).catch(() => null); + if (!session) { + sendJson(res, 404, { error: "session_not_found" }); + return; + } + + try { + await killSession(sessionId); + // Optionally clear buffer + const writer = new BufferWriter(sessionId); + await writer.delete().catch(() => {}); // best-effort + res.writeHead(204).end(); + } catch (err) { + sendJson(res, 500, { error: "internal_error", message: String(err) }); + } +} diff --git a/extensions/remote-control/server/routes/side.ts b/extensions/remote-control/server/routes/side.ts new file mode 100644 index 0000000..12c257a --- /dev/null +++ b/extensions/remote-control/server/routes/side.ts @@ -0,0 +1,119 @@ +/** + * S-07 — state side-channel route. + * + * WS endpoint: GET /sessions/:id/side + * + * Pushes IC-1 ServerToClient JSON frames (state, session-meta, error). + * Does NOT carry binary output (that's /stream). + * Lightweight channel for UI state updates without full output stream. + * + * Owner: T-1.6 + */ + +import type { IncomingMessage } from "node:http"; +import type { Socket } from "node:net"; +import type { StateEvent } from "../../pi/events.js"; +import { getSession } from "../../tmux/manager.js"; +import type { WsClient, WsServer } from "../types.js"; + +export interface SideRouteOptions { + wss: WsServer; + isAuthenticated: (req: IncomingMessage) => boolean; + getCurrentState?: () => StateEvent | null; +} + +// Subscribers per session: sessionId → set of ws clients +const _subscribers = new Map>(); + +/** + * Broadcast a state event to all side-channel subscribers of a session. + */ +export function broadcastState(sessionId: string, event: StateEvent): void { + const subs = _subscribers.get(sessionId); + if (!subs) return; + const msg = JSON.stringify({ + type: "state", + value: event.value, + tool: event.tool, + ts: event.ts, + }); + for (const ws of subs) { + if (ws.readyState === 1 /* OPEN */) { + ws.send(msg); + } + } +} + +/** + * Handle a WebSocket upgrade for /sessions/:id/side. + */ +export function handleSideUpgrade( + sessionId: string, + request: IncomingMessage, + socket: Socket, + head: Buffer, + opts: SideRouteOptions, +): void { + opts.wss.handleUpgrade(request, socket, head, (ws: WsClient) => { + handleSideConnection(sessionId, ws, opts); + }); +} + +async function handleSideConnection( + sessionId: string, + ws: WsClient, + opts: SideRouteOptions, +): Promise { + const session = await getSession(sessionId).catch(() => null); + if (!session) { + ws.send( + JSON.stringify({ + type: "error", + code: "session_not_found", + message: `Session "${sessionId}" not found`, + }), + ); + ws.terminate(); + return; + } + + // Push session-meta on connect + ws.send( + JSON.stringify({ + type: "session-meta", + name: session.name, + description: session.description, + createdAt: session.createdAt, + }), + ); + + // Push current state + const currentState = opts.getCurrentState?.(); + if (currentState) { + ws.send( + JSON.stringify({ + type: "state", + value: currentState.value, + tool: currentState.tool, + ts: currentState.ts, + }), + ); + } + + // Register subscriber + let subs = _subscribers.get(sessionId); + if (!subs) { + subs = new Set(); + _subscribers.set(sessionId, subs); + } + subs.add(ws); + + ws.on("close", () => { + subs?.delete(ws); + if (subs?.size === 0) _subscribers.delete(sessionId); + }); + + ws.on("error", () => { + subs?.delete(ws); + }); +} diff --git a/extensions/remote-control/server/routes/stream.ts b/extensions/remote-control/server/routes/stream.ts new file mode 100644 index 0000000..454ef65 --- /dev/null +++ b/extensions/remote-control/server/routes/stream.ts @@ -0,0 +1,205 @@ +/** + * S-02 binary stream + S-04 sequence cursor resume + S-05 snapshot route. + * + * WS endpoint: GET /sessions/:id/stream + * + * Protocol (IC-1): + * - On connect: client sends { type: "resume"; lastSeq: number | null } + * - Server replays buffer chunks after lastSeq (binary frames with 8-byte seq header) + * - Then live output arrives as binary frames + * - Client may send { type: "snapshot-request" } → server sends { type: "snapshot"; ... } + * - State events pushed unsolicited: { type: "state"; ... } + * - Session meta pushed on connect: { type: "session-meta"; ... } + * + * Binary frame format: [seq: 8 bytes BE uint64][data: N bytes] + * + * Owner: T-1.5 + */ + +import type { IncomingMessage } from "node:http"; +import type { Socket } from "node:net"; +import { readChunks } from "../../buffer/reader.js"; +import type { StateEvent } from "../../pi/events.js"; +import { SequenceCounter } from "../../sequence.js"; +import { ControlClient } from "../../tmux/control.js"; +import { capturePane } from "../../tmux/snapshot.js"; +import type { WsClient, WsServer } from "../types.js"; + +export interface StreamRouteOptions { + wss: WsServer; + isAuthenticated: (req: IncomingMessage) => boolean; + /** Called to get the current agent state for new connections */ + getCurrentState?: () => StateEvent | null; + stateDir?: string; +} + +// Per-session registry of active ControlClients and sequence counters +const _clients = new Map< + string, + { control: ControlClient; seq: SequenceCounter } +>(); + +/** + * Get or create a ControlClient + SequenceCounter for a session. + */ +function getOrCreateSession(sessionId: string): { + control: ControlClient; + seq: SequenceCounter; +} { + const existing = _clients.get(sessionId); + if (existing) return existing; + + const seq = new SequenceCounter(); + const control = new ControlClient({ + session: sessionId, + onClose: (reason) => { + console.error( + `[stream] control client closed for ${sessionId}: ${reason}`, + ); + _clients.delete(sessionId); + }, + }); + control.start(); + + const entry = { control, seq }; + _clients.set(sessionId, entry); + return entry; +} + +/** + * Stop and remove a session's ControlClient (call on session kill). + */ +export function stopSession(sessionId: string): void { + const entry = _clients.get(sessionId); + if (entry) { + entry.control.stop(); + _clients.delete(sessionId); + } +} + +/** + * Handle a WebSocket upgrade for /sessions/:id/stream. + */ +export function handleStreamUpgrade( + sessionId: string, + request: IncomingMessage, + socket: Socket, + head: Buffer, + opts: StreamRouteOptions, +): void { + opts.wss.handleUpgrade(request, socket, head, (ws: WsClient) => { + handleStreamConnection(sessionId, ws, opts); + }); +} + +function handleStreamConnection( + sessionId: string, + ws: WsClient, + opts: StreamRouteOptions, +): void { + const { control, seq } = getOrCreateSession(sessionId); + let resumed = false; + + // Push session-meta immediately + sendJson(ws, { + type: "session-meta", + name: sessionId, + createdAt: new Date().toISOString(), + }); + + // Push current state if available + const currentState = opts.getCurrentState?.(); + if (currentState) { + sendJson(ws, { + type: "state", + value: currentState.value, + tool: currentState.tool, + ts: currentState.ts, + }); + } + + // Subscribe to live output + const unsubscribe = control.subscribe((chunk: Buffer) => { + if (ws.readyState !== 1 /* OPEN */) return; + const seqNum = seq.next(); + const frame = buildBinaryFrame(seqNum, chunk); + sendBinary(ws, frame); + }); + + // Handle client messages + ws.on("message", (data: Buffer) => { + let msg: unknown; + try { + msg = JSON.parse(data.toString()); + } catch { + sendJson(ws, { + type: "error", + code: "bad_message", + message: "Invalid JSON", + }); + return; + } + + if (!msg || typeof msg !== "object") return; + const m = msg as Record; + + if (m.type === "resume" && !resumed) { + resumed = true; + const lastSeq = typeof m.lastSeq === "number" ? m.lastSeq : 0; + // Replay buffered chunks after lastSeq + const chunks = readChunks(sessionId, { + afterSeq: lastSeq, + cfg: opts.stateDir ? { stateDir: opts.stateDir } : undefined, + }); + for (const chunk of chunks) { + if (ws.readyState !== 1) break; + sendBinary(ws, buildBinaryFrame(chunk.seq, chunk.data)); + } + } else if (m.type === "snapshot-request") { + capturePane({ session: sessionId }) + .then((text) => { + const data = Buffer.from(text).toString("base64"); + const s = seq.next(); + sendJson(ws, { type: "snapshot", seq: s, data }); + }) + .catch(() => { + sendJson(ws, { + type: "error", + code: "snapshot_failed", + message: "Failed to capture pane", + }); + }); + } + }); + + ws.on("close", () => { + unsubscribe(); + }); + + ws.on("error", () => { + unsubscribe(); + }); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Build a binary frame: [seq: 8 bytes BE][data] */ +function buildBinaryFrame(seqNum: number, data: Buffer): Buffer { + const header = Buffer.allocUnsafe(8); + header.writeBigUInt64BE(BigInt(seqNum), 0); + return Buffer.concat([header, data]); +} + +function sendJson(ws: WsClient, msg: object): void { + if (ws.readyState === 1 /* OPEN */) { + ws.send(JSON.stringify(msg)); + } +} + +function sendBinary(ws: WsClient, data: Buffer): void { + if (ws.readyState === 1 /* OPEN */) { + (ws as unknown as { send(data: Buffer): void }).send(data); + } +} diff --git a/extensions/remote-control/server/server.ts b/extensions/remote-control/server/server.ts index 48ff715..db58aa6 100644 --- a/extensions/remote-control/server/server.ts +++ b/extensions/remote-control/server/server.ts @@ -28,6 +28,7 @@ import type { ExtensionAPI, ExtensionContext, } from "@earendil-works/pi-coding-agent"; +import { validateBearer } from "../auth/tokens.js"; import { generateSessionId, loadOrCreateToken, @@ -38,8 +39,13 @@ import { import { parseBindAddress, readRemoteControlConfig } from "../config.js"; import { buildHTML } from "../html.js"; // LEGACY: browser HTML client import { buildSyncMessage } from "../messages.js"; +import { handleCommands } from "./routes/commands.js"; +import { handleHealth } from "./routes/health.js"; +import { handleInput } from "./routes/input.js"; +import { handleSessions } from "./routes/sessions.js"; import type { RemoteServer, WsClient, WsServer } from "./types.js"; import { createUpgradeHandler } from "./upgrade.js"; +import { extractBearer } from "./util.js"; // --------------------------------------------------------------------------- // Load ws (bundled with pi) without needing @types/ws installed locally @@ -196,8 +202,75 @@ export async function startServer( return; } - res.writeHead(404, { "Content-Type": "text/plain; charset=utf-8" }); - res.end("Not found"); + // New API routes (IC-2) — bearer token auth + const asyncHandler = async (): Promise => { + // Auth check for API routes + const bearer = extractBearer(req); + const isApiAuthed = + (bearer && validateToken(bearer, token)) || + (bearer ? !!(await validateBearer(bearer).catch(() => null)) : false); + + if (!isApiAuthed) return false; + + // GET /health + if (pathname === "/health" && req.method === "GET") { + await handleHealth(req, res); + return true; + } + + // /sessions — list + create + if (pathname === "/sessions") { + await handleSessions(req, res); + return true; + } + + // /sessions/:id — PATCH, DELETE + const sessMatch = pathname.match(/^\/sessions\/([^/]+)$/); + if (sessMatch) { + await handleSessions(req, res, decodeURIComponent(sessMatch[1])); + return true; + } + + // /sessions/:id/thumbnail + const thumbMatch = pathname.match(/^\/sessions\/([^/]+)\/thumbnail$/); + if (thumbMatch) { + await handleSessions( + req, + res, + decodeURIComponent(thumbMatch[1]), + "thumbnail", + ); + return true; + } + + // /sessions/:id/commands + const cmdMatch = pathname.match(/^\/sessions\/([^/]+)\/commands$/); + if (cmdMatch) { + await handleCommands(req, res, decodeURIComponent(cmdMatch[1]), pi); + return true; + } + + // /sessions/:id/input + const inputMatch = pathname.match(/^\/sessions\/([^/]+)\/input$/); + if (inputMatch) { + await handleInput(req, res, decodeURIComponent(inputMatch[1])); + return true; + } + + return false; + }; + + asyncHandler() + .then((handled) => { + if (!handled) { + res.writeHead(404, { "Content-Type": "text/plain; charset=utf-8" }); + res.end("Not found"); + } + }) + .catch((err: Error) => { + res.writeHead(500, { "Content-Type": "text/plain; charset=utf-8" }); + res.end(err.message); + }); }); // ── WebSocket server ──────────────────────────────────────────────────── diff --git a/extensions/remote-control/server/upgrade.ts b/extensions/remote-control/server/upgrade.ts index c20f658..40c35f5 100644 --- a/extensions/remote-control/server/upgrade.ts +++ b/extensions/remote-control/server/upgrade.ts @@ -5,30 +5,37 @@ * based on the request path and session/topic. Non-matching paths are * destroyed immediately. * - * Current routes (all LEGACY — browser HTML client): - * /ws → legacy browser client WebSocket endpoint - * - * Future routes (T-1.5): - * /sessions/:id/stream → binary ANSI stream per tmux session - * - * T-1.5 will extend createUpgradeHandler to accept a session registry and - * dispatch to per-session stream handlers. + * Routes: + * /ws — LEGACY browser HTML client WebSocket endpoint + * /sessions/:id/stream — binary ANSI stream per tmux session (T-1.5) */ import type { IncomingMessage } from "node:http"; import type { Socket } from "node:net"; +import { handleSideUpgrade, type SideRouteOptions } from "./routes/side.js"; +import { + handleStreamUpgrade, + type StreamRouteOptions, +} from "./routes/stream.js"; import type { WsClient, WsServer } from "./types.js"; +export interface UpgradeHandlerOptions { + wss: WsServer; + isAuthenticated: (req: IncomingMessage) => boolean; + stream: Omit; + side: Omit; +} + /** * Create the HTTP `upgrade` event handler. * - * @param wss - WebSocket server instance. - * @param isAuthenticated - Predicate that checks token/session on the request. - * @returns A handler suitable for `httpServer.on("upgrade", handler)`. + * @param opts - Handler options including wss, auth predicate, and stream config. + * @returns A handler suitable for `httpServer.on("upgrade", handler)`. */ export function createUpgradeHandler( wss: WsServer, isAuthenticated: (req: IncomingMessage) => boolean, + streamOpts?: Omit, ): (request: IncomingMessage, socket: Socket, head: Buffer) => void { return (request: IncomingMessage, socket: Socket, head: Buffer): void => { const url = new URL(request.url ?? "/", "http://localhost"); @@ -46,6 +53,39 @@ export function createUpgradeHandler( return; } + // /sessions/:id/stream + const streamMatch = url.pathname.match(/^\/sessions\/([^/]+)\/stream$/); + if (streamMatch) { + if (!isAuthenticated(request)) { + socket.write("HTTP/1.1 403 Forbidden\r\n\r\n"); + socket.destroy(); + return; + } + const sessionId = decodeURIComponent(streamMatch[1]); + handleStreamUpgrade(sessionId, request, socket, head, { + wss, + isAuthenticated, + ...streamOpts, + }); + return; + } + + // /sessions/:id/side + const sideMatch = url.pathname.match(/^\/sessions\/([^/]+)\/side$/); + if (sideMatch) { + if (!isAuthenticated(request)) { + socket.write("HTTP/1.1 403 Forbidden\r\n\r\n"); + socket.destroy(); + return; + } + const sessionId = decodeURIComponent(sideMatch[1]); + handleSideUpgrade(sessionId, request, socket, head, { + wss, + isAuthenticated, + }); + return; + } + // Unknown upgrade path — reject socket.destroy(); }; diff --git a/extensions/remote-control/server/util.ts b/extensions/remote-control/server/util.ts new file mode 100644 index 0000000..b0f36bc --- /dev/null +++ b/extensions/remote-control/server/util.ts @@ -0,0 +1,47 @@ +/** + * Shared HTTP server utilities for route handlers. + * Owner: T-1.5 + */ + +import type { IncomingMessage, ServerResponse } from "node:http"; + +/** Read the full request body as a string. */ +export function readBody(req: IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + req.on("data", (chunk: Buffer) => chunks.push(chunk)); + req.on("end", () => resolve(Buffer.concat(chunks).toString("utf8"))); + req.on("error", reject); + }); +} + +/** Send a JSON response. */ +export function sendJson( + res: ServerResponse, + status: number, + body: unknown, +): void { + const payload = JSON.stringify(body); + res.writeHead(status, { + "Content-Type": "application/json", + "Content-Length": Buffer.byteLength(payload), + }); + res.end(payload); +} + +/** Extract a path segment by index (0-based, after splitting on '/'). */ +export function pathSegment(url: string, index: number): string | undefined { + return url.split("?")[0].split("/").filter(Boolean)[index]; +} + +/** + * Parse bearer token from Authorization header or ?token= query param. + * Returns the raw token string or null. + */ +export function extractBearer(req: IncomingMessage): string | null { + const auth = req.headers["authorization"]; + if (auth?.startsWith("Bearer ")) return auth.slice(7).trim(); + if (auth?.startsWith("bearer ")) return auth.slice(7).trim(); + const url = new URL(req.url ?? "/", "http://localhost"); + return url.searchParams.get("token"); +}