/** * T-1.8 Integration smoke: stream attach, send-keys, drop + reconnect, delta replay. * * Requires a running pi process with remote-control (spawned by the before() hook). * Re-uses the helpers from helpers.mjs. * * Tests: * 1. POST /sessions → creates a tmux session * 2. WS /sessions/:id/stream → attaches, sends { type:"resume"; lastSeq:null } * 3. Receives at least some binary frames (output from the session) * 4. Sends { type:"keys"; data:"echo smoke-marker\n" } via HTTP POST /sessions/:id/input * 5. Observes "smoke-marker" in the stream within timeout * 6. Notes lastSeq, disconnects * 7. Reconnects with { type:"resume"; lastSeq } → receives only delta frames * 8. GET /sessions/:id/thumbnail → text/plain, non-empty * 9. DELETE /sessions/:id → 204 */ import assert from "node:assert/strict"; import path from "node:path"; import { after, before, describe, it } from "node:test"; import { fileURLToPath } from "node:url"; import { baseUrl, closeWebSocket, createSmokeHome, fetchText, killPi, openWebSocket, removeSmokeHome, spawnPi, waitForPort, } from "./helpers.mjs"; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const EXTENSION_PATH = path.resolve( __dirname, "../../extensions/remote-control", ); const SMOKE_PORT = Number(process.env.SMOKE_PORT_STREAM ?? 19877); // different from smoke.mjs (19876) const SMOKE_TOKEN = "stream-test-token-deterministic-567"; const BASE = baseUrl(SMOKE_PORT); const WS_BASE = `ws://127.0.0.1:${SMOKE_PORT}`; const AUTH = `?token=${SMOKE_TOKEN}`; let piProc = null; let tmpHome = null; let sessionId = null; describe("T-1.8 stream integration", () => { before(async () => { tmpHome = await createSmokeHome({ port: SMOKE_PORT, token: SMOKE_TOKEN }); const { proc } = spawnPi({ extensionPath: EXTENSION_PATH, fakeHome: tmpHome, }); piProc = proc; await waitForPort({ port: SMOKE_PORT }); }); after(async () => { if (sessionId) { await fetchText(`${BASE}/sessions/${sessionId}${AUTH}`, { method: "DELETE", }).catch(() => {}); } if (piProc) await killPi(piProc); if (tmpHome) await removeSmokeHome(tmpHome); }); it("POST /sessions → 201 with id + name", async () => { const { res, body } = await fetchText(`${BASE}/sessions${AUTH}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ name: "smoke-stream" }), }); assert.equal(res.status, 201, `Expected 201, got ${res.status}: ${body}`); const json = JSON.parse(body); assert.ok(json.id, "Response should have id"); assert.equal(json.name, "smoke-stream"); sessionId = json.id; }); it("WS stream: attach, receive frames, send keys, observe output", async () => { const ws = await openWebSocket( `${WS_BASE}/sessions/${sessionId}/stream${AUTH}`, ); // Send resume from scratch ws.send(JSON.stringify({ type: "resume", lastSeq: null })); // Collect frames for up to 5 seconds const frames = []; let lastSeq = 0; const receivedMarker = await new Promise((resolve, reject) => { const timeout = setTimeout( () => reject(new Error("Timed out waiting for output frames")), 5000, ); ws.on("message", (data) => { if (Buffer.isBuffer(data) && data.length >= 8) { // Binary frame: [seq: 8 bytes][data] const seq = Number(data.readBigUInt64BE(0)); lastSeq = Math.max(lastSeq, seq); frames.push({ seq, data: data.slice(8) }); clearTimeout(timeout); resolve(true); } }); }); assert.ok(receivedMarker, "Should receive at least one binary frame"); assert.ok(frames.length > 0, "frames array should be non-empty"); assert.ok(lastSeq > 0, "lastSeq should be > 0"); await closeWebSocket(ws); }); it("POST /sessions/:id/input → send keys, observe in stream", async () => { const ws = await openWebSocket( `${WS_BASE}/sessions/${sessionId}/stream${AUTH}`, ); ws.send(JSON.stringify({ type: "resume", lastSeq: null })); // Send a distinctive command via HTTP input endpoint // Send text then Enter as two separate requests (avoids \n in execFile args) const marker = `smoke-${Date.now()}`; const { res: r1, body: b1 } = await fetchText( `${BASE}/sessions/${sessionId}/input${AUTH}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ type: "keys", data: `echo ${marker}` }), }, ); assert.equal( r1.status, 204, `keys POST should return 204, got ${r1.status}. Body: ${b1}`, ); const { res: r2, body: b2 } = await fetchText( `${BASE}/sessions/${sessionId}/input${AUTH}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ type: "key", name: "enter" }), }, ); assert.equal( r2.status, 204, `enter POST should return 204, got ${r2.status}: ${b2}`, ); // Wait for marker in stream output const found = await new Promise((resolve) => { const timeout = setTimeout(() => resolve(false), 6000); let accumulated = ""; ws.on("message", (data) => { if (Buffer.isBuffer(data) && data.length > 8) { accumulated += data.slice(8).toString("utf8", 0, 200); if (accumulated.includes(marker)) { clearTimeout(timeout); resolve(true); } } }); }); await closeWebSocket(ws); assert.ok(found, `Should observe marker "${marker}" in stream output`); }); it("WS stream: send keys via WS, observe in stream output", async () => { const ws = await openWebSocket( `${WS_BASE}/sessions/${sessionId}/stream${AUTH}`, ); ws.send(JSON.stringify({ type: "resume", lastSeq: null })); const marker = `ws-smoke-${Date.now()}`; // Wait a tick to let resume complete, then send keys via WS await new Promise((r) => setTimeout(r, 200)); ws.send(JSON.stringify({ type: "keys", data: `echo ${marker}` })); ws.send(JSON.stringify({ type: "key", name: "enter" })); const found = await new Promise((resolve) => { const timeout = setTimeout(() => resolve(false), 6000); let accumulated = ""; ws.on("message", (data) => { if (Buffer.isBuffer(data) && data.length > 8) { accumulated += data.slice(8).toString("utf8"); if (accumulated.includes(marker)) { clearTimeout(timeout); resolve(true); } } }); }); await closeWebSocket(ws); assert.ok( found, `Should observe marker "${marker}" sent via WS keys message in stream output`, ); }); it("WS stream: reconnect with lastSeq → receives only delta", async () => { // First pass: collect frames and note highest seq const ws1 = await openWebSocket( `${WS_BASE}/sessions/${sessionId}/stream${AUTH}`, ); ws1.send(JSON.stringify({ type: "resume", lastSeq: null })); let highestSeq = 0; await new Promise((resolve) => { const timeout = setTimeout(resolve, 2000); ws1.on("message", (data) => { if (Buffer.isBuffer(data) && data.length >= 8) { const seq = Number(data.readBigUInt64BE(0)); highestSeq = Math.max(highestSeq, seq); clearTimeout(timeout); setTimeout(resolve, 500); // wait a bit for more frames } }); }); await closeWebSocket(ws1); // Send more output (text + enter separately) await fetchText(`${BASE}/sessions/${sessionId}/input${AUTH}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ type: "keys", data: "echo delta-check" }), }); await fetchText(`${BASE}/sessions/${sessionId}/input${AUTH}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ type: "key", name: "enter" }), }); // Small delay to let output be buffered await new Promise((r) => setTimeout(r, 500)); // Reconnect with lastSeq = highestSeq → should only get frames after that const ws2 = await openWebSocket( `${WS_BASE}/sessions/${sessionId}/stream${AUTH}`, ); ws2.send(JSON.stringify({ type: "resume", lastSeq: highestSeq })); const deltaFrames = []; await new Promise((resolve) => { const timeout = setTimeout(resolve, 3000); ws2.on("message", (data) => { if (Buffer.isBuffer(data) && data.length >= 8) { const seq = Number(data.readBigUInt64BE(0)); if (seq > highestSeq) deltaFrames.push(seq); clearTimeout(timeout); setTimeout(resolve, 500); } }); }); await closeWebSocket(ws2); // Delta frames should all have seq > highestSeq for (const seq of deltaFrames) { assert.ok( seq > highestSeq, `Delta frame seq ${seq} should be > ${highestSeq}`, ); } }); it("GET /sessions/:id/thumbnail → text/plain, non-empty", async () => { const { res, body } = await fetchText( `${BASE}/sessions/${sessionId}/thumbnail${AUTH}`, ); assert.equal(res.status, 200); assert.ok( res.headers.get("content-type")?.includes("text/plain"), "Should be text/plain", ); // capture-pane may return whitespace-only for a fresh shell — check length not trim assert.ok(body.length > 0, "Thumbnail response should have content"); }); it("DELETE /sessions/:id → 204", async () => { const { res } = await fetchText(`${BASE}/sessions/${sessionId}${AUTH}`, { method: "DELETE", }); assert.equal(res.status, 204); sessionId = null; // prevent after() from double-deleting }); });