pi-remote-control/scripts/smoke/stream.test.mjs

262 lines
8.6 KiB
JavaScript

/**
* T-1.8 Integration smoke: stream attach, send-keys, drop + reconnect, delta replay.
*
* Requires a running pi process with remote-control (spawned by the before() hook).
* Re-uses the helpers from helpers.mjs.
*
* Tests:
* 1. POST /sessions → creates a tmux session
* 2. WS /sessions/:id/stream → attaches, sends { type:"resume"; lastSeq:null }
* 3. Receives at least some binary frames (output from the session)
* 4. Sends { type:"keys"; data:"echo smoke-marker\n" } via HTTP POST /sessions/:id/input
* 5. Observes "smoke-marker" in the stream within timeout
* 6. Notes lastSeq, disconnects
* 7. Reconnects with { type:"resume"; lastSeq } → receives only delta frames
* 8. GET /sessions/:id/thumbnail → text/plain, non-empty
* 9. DELETE /sessions/:id → 204
*/
import assert from "node:assert/strict";
import path from "node:path";
import { after, before, describe, it } from "node:test";
import { fileURLToPath } from "node:url";
import {
baseUrl,
closeWebSocket,
createSmokeHome,
fetchText,
killPi,
openWebSocket,
removeSmokeHome,
spawnPi,
waitForPort,
} from "./helpers.mjs";
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const EXTENSION_PATH = path.resolve(
__dirname,
"../../extensions/remote-control",
);
const SMOKE_PORT = Number(process.env.SMOKE_PORT_STREAM ?? 19877); // different from smoke.mjs (19876)
const SMOKE_TOKEN = "stream-test-token-deterministic-567";
const BASE = baseUrl(SMOKE_PORT);
const WS_BASE = `ws://127.0.0.1:${SMOKE_PORT}`;
const AUTH = `?token=${SMOKE_TOKEN}`;
let piProc = null;
let tmpHome = null;
let sessionId = null;
describe("T-1.8 stream integration", () => {
before(async () => {
tmpHome = await createSmokeHome({ port: SMOKE_PORT, token: SMOKE_TOKEN });
const { proc } = spawnPi({
extensionPath: EXTENSION_PATH,
fakeHome: tmpHome,
});
piProc = proc;
await waitForPort({ port: SMOKE_PORT });
});
after(async () => {
if (sessionId) {
await fetchText(`${BASE}/sessions/${sessionId}${AUTH}`, {
method: "DELETE",
}).catch(() => {});
}
if (piProc) await killPi(piProc);
if (tmpHome) await removeSmokeHome(tmpHome);
});
it("POST /sessions → 201 with id + name", async () => {
const { res, body } = await fetchText(`${BASE}/sessions${AUTH}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ name: "smoke-stream" }),
});
assert.equal(res.status, 201, `Expected 201, got ${res.status}: ${body}`);
const json = JSON.parse(body);
assert.ok(json.id, "Response should have id");
assert.equal(json.name, "smoke-stream");
sessionId = json.id;
});
it("WS stream: attach, receive frames, send keys, observe output", async () => {
const ws = await openWebSocket(
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
);
// Send resume from scratch
ws.send(JSON.stringify({ type: "resume", lastSeq: null }));
// Collect frames for up to 5 seconds
const frames = [];
let lastSeq = 0;
const receivedMarker = await new Promise((resolve, reject) => {
const timeout = setTimeout(
() => reject(new Error("Timed out waiting for output frames")),
5000,
);
ws.on("message", (data) => {
if (Buffer.isBuffer(data) && data.length >= 8) {
// Binary frame: [seq: 8 bytes][data]
const seq = Number(data.readBigUInt64BE(0));
lastSeq = Math.max(lastSeq, seq);
frames.push({ seq, data: data.slice(8) });
clearTimeout(timeout);
resolve(true);
}
});
});
assert.ok(receivedMarker, "Should receive at least one binary frame");
assert.ok(frames.length > 0, "frames array should be non-empty");
assert.ok(lastSeq > 0, "lastSeq should be > 0");
await closeWebSocket(ws);
});
it("POST /sessions/:id/input → send keys, observe in stream", async () => {
const ws = await openWebSocket(
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
);
ws.send(JSON.stringify({ type: "resume", lastSeq: null }));
// Send a distinctive command via HTTP input endpoint
// Send text then Enter as two separate requests (avoids \n in execFile args)
const marker = `smoke-${Date.now()}`;
const { res: r1, body: b1 } = await fetchText(
`${BASE}/sessions/${sessionId}/input${AUTH}`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ type: "keys", data: `echo ${marker}` }),
},
);
assert.equal(
r1.status,
204,
`keys POST should return 204, got ${r1.status}. Body: ${b1}`,
);
const { res: r2, body: b2 } = await fetchText(
`${BASE}/sessions/${sessionId}/input${AUTH}`,
{
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ type: "key", name: "enter" }),
},
);
assert.equal(
r2.status,
204,
`enter POST should return 204, got ${r2.status}: ${b2}`,
);
// Wait for marker in stream output
const found = await new Promise((resolve) => {
const timeout = setTimeout(() => resolve(false), 6000);
let accumulated = "";
ws.on("message", (data) => {
if (Buffer.isBuffer(data) && data.length > 8) {
accumulated += data.slice(8).toString("utf8", 0, 200);
if (accumulated.includes(marker)) {
clearTimeout(timeout);
resolve(true);
}
}
});
});
await closeWebSocket(ws);
assert.ok(found, `Should observe marker "${marker}" in stream output`);
});
it("WS stream: reconnect with lastSeq → receives only delta", async () => {
// First pass: collect frames and note highest seq
const ws1 = await openWebSocket(
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
);
ws1.send(JSON.stringify({ type: "resume", lastSeq: null }));
let highestSeq = 0;
await new Promise((resolve) => {
const timeout = setTimeout(resolve, 2000);
ws1.on("message", (data) => {
if (Buffer.isBuffer(data) && data.length >= 8) {
const seq = Number(data.readBigUInt64BE(0));
highestSeq = Math.max(highestSeq, seq);
clearTimeout(timeout);
setTimeout(resolve, 500); // wait a bit for more frames
}
});
});
await closeWebSocket(ws1);
// Send more output (text + enter separately)
await fetchText(`${BASE}/sessions/${sessionId}/input${AUTH}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ type: "keys", data: "echo delta-check" }),
});
await fetchText(`${BASE}/sessions/${sessionId}/input${AUTH}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ type: "key", name: "enter" }),
});
// Small delay to let output be buffered
await new Promise((r) => setTimeout(r, 500));
// Reconnect with lastSeq = highestSeq → should only get frames after that
const ws2 = await openWebSocket(
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
);
ws2.send(JSON.stringify({ type: "resume", lastSeq: highestSeq }));
const deltaFrames = [];
await new Promise((resolve) => {
const timeout = setTimeout(resolve, 3000);
ws2.on("message", (data) => {
if (Buffer.isBuffer(data) && data.length >= 8) {
const seq = Number(data.readBigUInt64BE(0));
if (seq > highestSeq) deltaFrames.push(seq);
clearTimeout(timeout);
setTimeout(resolve, 500);
}
});
});
await closeWebSocket(ws2);
// Delta frames should all have seq > highestSeq
for (const seq of deltaFrames) {
assert.ok(
seq > highestSeq,
`Delta frame seq ${seq} should be > ${highestSeq}`,
);
}
});
it("GET /sessions/:id/thumbnail → text/plain, non-empty", async () => {
const { res, body } = await fetchText(
`${BASE}/sessions/${sessionId}/thumbnail${AUTH}`,
);
assert.equal(res.status, 200);
assert.ok(
res.headers.get("content-type")?.includes("text/plain"),
"Should be text/plain",
);
// capture-pane may return whitespace-only for a fresh shell — check length not trim
assert.ok(body.length > 0, "Thumbnail response should have content");
});
it("DELETE /sessions/:id → 204", async () => {
const { res } = await fetchText(`${BASE}/sessions/${sessionId}${AUTH}`, {
method: "DELETE",
});
assert.equal(res.status, 204);
sessionId = null; // prevent after() from double-deleting
});
});