262 lines
8.6 KiB
JavaScript
262 lines
8.6 KiB
JavaScript
/**
|
|
* T-1.8 Integration smoke: stream attach, send-keys, drop + reconnect, delta replay.
|
|
*
|
|
* Requires a running pi process with remote-control (spawned by the before() hook).
|
|
* Re-uses the helpers from helpers.mjs.
|
|
*
|
|
* Tests:
|
|
* 1. POST /sessions → creates a tmux session
|
|
* 2. WS /sessions/:id/stream → attaches, sends { type:"resume"; lastSeq:null }
|
|
* 3. Receives at least some binary frames (output from the session)
|
|
* 4. Sends { type:"keys"; data:"echo smoke-marker\n" } via HTTP POST /sessions/:id/input
|
|
* 5. Observes "smoke-marker" in the stream within timeout
|
|
* 6. Notes lastSeq, disconnects
|
|
* 7. Reconnects with { type:"resume"; lastSeq } → receives only delta frames
|
|
* 8. GET /sessions/:id/thumbnail → text/plain, non-empty
|
|
* 9. DELETE /sessions/:id → 204
|
|
*/
|
|
|
|
import assert from "node:assert/strict";
|
|
import path from "node:path";
|
|
import { after, before, describe, it } from "node:test";
|
|
import { fileURLToPath } from "node:url";
|
|
import {
|
|
baseUrl,
|
|
closeWebSocket,
|
|
createSmokeHome,
|
|
fetchText,
|
|
killPi,
|
|
openWebSocket,
|
|
removeSmokeHome,
|
|
spawnPi,
|
|
waitForPort,
|
|
} from "./helpers.mjs";
|
|
|
|
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
|
const EXTENSION_PATH = path.resolve(
|
|
__dirname,
|
|
"../../extensions/remote-control",
|
|
);
|
|
|
|
const SMOKE_PORT = Number(process.env.SMOKE_PORT_STREAM ?? 19877); // different from smoke.mjs (19876)
|
|
const SMOKE_TOKEN = "stream-test-token-deterministic-567";
|
|
const BASE = baseUrl(SMOKE_PORT);
|
|
const WS_BASE = `ws://127.0.0.1:${SMOKE_PORT}`;
|
|
const AUTH = `?token=${SMOKE_TOKEN}`;
|
|
|
|
let piProc = null;
|
|
let tmpHome = null;
|
|
let sessionId = null;
|
|
|
|
describe("T-1.8 stream integration", () => {
|
|
before(async () => {
|
|
tmpHome = await createSmokeHome({ port: SMOKE_PORT, token: SMOKE_TOKEN });
|
|
const { proc } = spawnPi({
|
|
extensionPath: EXTENSION_PATH,
|
|
fakeHome: tmpHome,
|
|
});
|
|
piProc = proc;
|
|
await waitForPort({ port: SMOKE_PORT });
|
|
});
|
|
|
|
after(async () => {
|
|
if (sessionId) {
|
|
await fetchText(`${BASE}/sessions/${sessionId}${AUTH}`, {
|
|
method: "DELETE",
|
|
}).catch(() => {});
|
|
}
|
|
if (piProc) await killPi(piProc);
|
|
if (tmpHome) await removeSmokeHome(tmpHome);
|
|
});
|
|
|
|
it("POST /sessions → 201 with id + name", async () => {
|
|
const { res, body } = await fetchText(`${BASE}/sessions${AUTH}`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ name: "smoke-stream" }),
|
|
});
|
|
assert.equal(res.status, 201, `Expected 201, got ${res.status}: ${body}`);
|
|
const json = JSON.parse(body);
|
|
assert.ok(json.id, "Response should have id");
|
|
assert.equal(json.name, "smoke-stream");
|
|
sessionId = json.id;
|
|
});
|
|
|
|
it("WS stream: attach, receive frames, send keys, observe output", async () => {
|
|
const ws = await openWebSocket(
|
|
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
|
|
);
|
|
|
|
// Send resume from scratch
|
|
ws.send(JSON.stringify({ type: "resume", lastSeq: null }));
|
|
|
|
// Collect frames for up to 5 seconds
|
|
const frames = [];
|
|
let lastSeq = 0;
|
|
const receivedMarker = await new Promise((resolve, reject) => {
|
|
const timeout = setTimeout(
|
|
() => reject(new Error("Timed out waiting for output frames")),
|
|
5000,
|
|
);
|
|
|
|
ws.on("message", (data) => {
|
|
if (Buffer.isBuffer(data) && data.length >= 8) {
|
|
// Binary frame: [seq: 8 bytes][data]
|
|
const seq = Number(data.readBigUInt64BE(0));
|
|
lastSeq = Math.max(lastSeq, seq);
|
|
frames.push({ seq, data: data.slice(8) });
|
|
clearTimeout(timeout);
|
|
resolve(true);
|
|
}
|
|
});
|
|
});
|
|
|
|
assert.ok(receivedMarker, "Should receive at least one binary frame");
|
|
assert.ok(frames.length > 0, "frames array should be non-empty");
|
|
assert.ok(lastSeq > 0, "lastSeq should be > 0");
|
|
|
|
await closeWebSocket(ws);
|
|
});
|
|
|
|
it("POST /sessions/:id/input → send keys, observe in stream", async () => {
|
|
const ws = await openWebSocket(
|
|
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
|
|
);
|
|
ws.send(JSON.stringify({ type: "resume", lastSeq: null }));
|
|
|
|
// Send a distinctive command via HTTP input endpoint
|
|
// Send text then Enter as two separate requests (avoids \n in execFile args)
|
|
const marker = `smoke-${Date.now()}`;
|
|
const { res: r1, body: b1 } = await fetchText(
|
|
`${BASE}/sessions/${sessionId}/input${AUTH}`,
|
|
{
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ type: "keys", data: `echo ${marker}` }),
|
|
},
|
|
);
|
|
assert.equal(
|
|
r1.status,
|
|
204,
|
|
`keys POST should return 204, got ${r1.status}. Body: ${b1}`,
|
|
);
|
|
const { res: r2, body: b2 } = await fetchText(
|
|
`${BASE}/sessions/${sessionId}/input${AUTH}`,
|
|
{
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ type: "key", name: "enter" }),
|
|
},
|
|
);
|
|
assert.equal(
|
|
r2.status,
|
|
204,
|
|
`enter POST should return 204, got ${r2.status}: ${b2}`,
|
|
);
|
|
|
|
// Wait for marker in stream output
|
|
const found = await new Promise((resolve) => {
|
|
const timeout = setTimeout(() => resolve(false), 6000);
|
|
let accumulated = "";
|
|
ws.on("message", (data) => {
|
|
if (Buffer.isBuffer(data) && data.length > 8) {
|
|
accumulated += data.slice(8).toString("utf8", 0, 200);
|
|
if (accumulated.includes(marker)) {
|
|
clearTimeout(timeout);
|
|
resolve(true);
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
await closeWebSocket(ws);
|
|
assert.ok(found, `Should observe marker "${marker}" in stream output`);
|
|
});
|
|
|
|
it("WS stream: reconnect with lastSeq → receives only delta", async () => {
|
|
// First pass: collect frames and note highest seq
|
|
const ws1 = await openWebSocket(
|
|
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
|
|
);
|
|
ws1.send(JSON.stringify({ type: "resume", lastSeq: null }));
|
|
|
|
let highestSeq = 0;
|
|
await new Promise((resolve) => {
|
|
const timeout = setTimeout(resolve, 2000);
|
|
ws1.on("message", (data) => {
|
|
if (Buffer.isBuffer(data) && data.length >= 8) {
|
|
const seq = Number(data.readBigUInt64BE(0));
|
|
highestSeq = Math.max(highestSeq, seq);
|
|
clearTimeout(timeout);
|
|
setTimeout(resolve, 500); // wait a bit for more frames
|
|
}
|
|
});
|
|
});
|
|
await closeWebSocket(ws1);
|
|
|
|
// Send more output (text + enter separately)
|
|
await fetchText(`${BASE}/sessions/${sessionId}/input${AUTH}`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ type: "keys", data: "echo delta-check" }),
|
|
});
|
|
await fetchText(`${BASE}/sessions/${sessionId}/input${AUTH}`, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify({ type: "key", name: "enter" }),
|
|
});
|
|
|
|
// Small delay to let output be buffered
|
|
await new Promise((r) => setTimeout(r, 500));
|
|
|
|
// Reconnect with lastSeq = highestSeq → should only get frames after that
|
|
const ws2 = await openWebSocket(
|
|
`${WS_BASE}/sessions/${sessionId}/stream${AUTH}`,
|
|
);
|
|
ws2.send(JSON.stringify({ type: "resume", lastSeq: highestSeq }));
|
|
|
|
const deltaFrames = [];
|
|
await new Promise((resolve) => {
|
|
const timeout = setTimeout(resolve, 3000);
|
|
ws2.on("message", (data) => {
|
|
if (Buffer.isBuffer(data) && data.length >= 8) {
|
|
const seq = Number(data.readBigUInt64BE(0));
|
|
if (seq > highestSeq) deltaFrames.push(seq);
|
|
clearTimeout(timeout);
|
|
setTimeout(resolve, 500);
|
|
}
|
|
});
|
|
});
|
|
await closeWebSocket(ws2);
|
|
|
|
// Delta frames should all have seq > highestSeq
|
|
for (const seq of deltaFrames) {
|
|
assert.ok(
|
|
seq > highestSeq,
|
|
`Delta frame seq ${seq} should be > ${highestSeq}`,
|
|
);
|
|
}
|
|
});
|
|
|
|
it("GET /sessions/:id/thumbnail → text/plain, non-empty", async () => {
|
|
const { res, body } = await fetchText(
|
|
`${BASE}/sessions/${sessionId}/thumbnail${AUTH}`,
|
|
);
|
|
assert.equal(res.status, 200);
|
|
assert.ok(
|
|
res.headers.get("content-type")?.includes("text/plain"),
|
|
"Should be text/plain",
|
|
);
|
|
// capture-pane may return whitespace-only for a fresh shell — check length not trim
|
|
assert.ok(body.length > 0, "Thumbnail response should have content");
|
|
});
|
|
|
|
it("DELETE /sessions/:id → 204", async () => {
|
|
const { res } = await fetchText(`${BASE}/sessions/${sessionId}${AUTH}`, {
|
|
method: "DELETE",
|
|
});
|
|
assert.equal(res.status, 204);
|
|
sessionId = null; // prevent after() from double-deleting
|
|
});
|
|
});
|