pi-remote-control/extensions/remote-control/spike.ts

209 lines
5.5 KiB
TypeScript

/**
* spike.ts — Phase 0 Spike: tmux Stream PoC
*
* Spawns a tmux session running pi, pipes the output via pipe-pane to a FIFO,
* and streams it over WebSocket.
*
* This is throwaway PoC code to verify the foundational assumption:
* - pi runs cleanly in tmux
* - pipe-pane captures ANSI output accurately
* - WebSocket streaming has acceptable latency
* - SSH attach and WS stream stay in sync
*/
import * as fs from "node:fs";
import * as path from "node:path";
import * as os from "node:os";
import { spawn, execSync } from "node:child_process";
import { WebSocketServer } from "ws";
import type { ExtensionAPI, ExtensionContext } from "@earendil-works/pi-coding-agent";
const SPIKE_SESSION = "pi-spike";
const WS_PORT = 7799;
const FIFO_PATH = path.join(os.tmpdir(), `${SPIKE_SESSION}.fifo`);
/**
* Check if a tmux session exists
*/
function sessionExists(sessionName: string): boolean {
try {
execSync(`tmux has-session -t ${sessionName} 2>/dev/null`);
return true;
} catch {
return false;
}
}
/**
* Create a new tmux session running pi
*/
function createSession(sessionName: string): void {
console.log(`[spike] Creating tmux session: ${sessionName}`);
execSync(`tmux new-session -d -s ${sessionName} -x 120 -y 40 'pi'`);
}
/**
* Setup pipe-pane to stream to a FIFO
*/
function setupPipePane(sessionName: string, fifoPath: string): void {
// Remove existing FIFO if present
if (fs.existsSync(fifoPath)) {
fs.unlinkSync(fifoPath);
}
// Create new FIFO
execSync(`mkfifo ${fifoPath}`);
console.log(`[spike] Created FIFO: ${fifoPath}`);
// Setup pipe-pane
execSync(`tmux pipe-pane -t ${sessionName} -o "cat > ${fifoPath}"`);
console.log(`[spike] Attached pipe-pane to session ${sessionName}`);
}
/**
* Start the WebSocket server and stream from FIFO
* Uses a single FIFO reader that broadcasts to all connected clients
*/
function startWebSocketServer(fifoPath: string): { wss: WebSocketServer, cleanup: () => void } {
const wss = new WebSocketServer({ port: WS_PORT, host: "127.0.0.1" });
const clients = new Set<any>();
console.log(`[spike] WebSocket server listening on ws://127.0.0.1:${WS_PORT}/spike`);
// Single FIFO reader that broadcasts to all clients
const stream = fs.createReadStream(fifoPath);
stream.on("data", (chunk: Buffer) => {
for (const ws of clients) {
if (ws.readyState === 1) { // WebSocket.OPEN
ws.send(chunk, { binary: true });
}
}
});
stream.on("error", (err) => {
console.error(`[spike] FIFO stream error:`, err);
});
stream.on("end", () => {
console.log("[spike] FIFO stream ended");
});
wss.on("connection", (ws, req) => {
const clientAddr = req.socket.remoteAddress;
console.log(`[spike] Client connected: ${clientAddr}`);
clients.add(ws);
ws.on("close", () => {
console.log(`[spike] Client disconnected: ${clientAddr}`);
clients.delete(ws);
});
ws.on("error", (err) => {
console.error(`[spike] WebSocket error:`, err);
});
});
const cleanup = () => {
stream.destroy();
wss.close();
};
return { wss, cleanup };
}
/**
* Attach to the tmux session in the current terminal
*/
function attachToSession(sessionName: string): void {
console.log(`[spike] Attaching to tmux session: ${sessionName}`);
console.log(`[spike] To detach: Ctrl+B, then D`);
console.log(`[spike] WebSocket available at: ws://127.0.0.1:${WS_PORT}/spike`);
console.log("");
// Spawn tmux attach in the foreground
// This will take over the terminal until the user detaches
const attach = spawn("tmux", ["attach", "-t", sessionName], {
stdio: "inherit",
});
attach.on("exit", (code) => {
console.log(`\n[spike] Detached from session (exit code: ${code})`);
});
}
/**
* Cleanup function
*/
function cleanup(cleanupFn: (() => void) | null): void {
console.log("\n[spike] Cleaning up...");
if (cleanupFn) {
cleanupFn();
}
// Remove FIFO
if (fs.existsSync(FIFO_PATH)) {
try {
fs.unlinkSync(FIFO_PATH);
console.log("[spike] Removed FIFO");
} catch (err) {
console.error("[spike] Failed to remove FIFO:", err);
}
}
console.log("[spike] Cleanup complete");
process.exit(0);
}
/**
* Main spike entry point
*/
export async function runSpike(_ctx: ExtensionContext): Promise<void> {
console.log("=== Phase 0 Spike: tmux Stream PoC ===\n");
let cleanupFn: (() => void) | null = null;
// Setup cleanup handlers
process.on("SIGINT", () => cleanup(cleanupFn));
process.on("SIGTERM", () => cleanup(cleanupFn));
try {
// Step 1: Create or reuse tmux session
if (sessionExists(SPIKE_SESSION)) {
console.log(`[spike] Session ${SPIKE_SESSION} already exists, reusing it`);
} else {
createSession(SPIKE_SESSION);
}
// Step 2: Setup pipe-pane to FIFO
setupPipePane(SPIKE_SESSION, FIFO_PATH);
// Step 3: Start WebSocket server
const server = startWebSocketServer(FIFO_PATH);
cleanupFn = server.cleanup;
// Give the server a moment to start
await new Promise(resolve => setTimeout(resolve, 500));
// Step 4: Attach to session
attachToSession(SPIKE_SESSION);
} catch (err) {
console.error("[spike] Error:", err);
cleanup(cleanupFn);
}
}
/**
* Register the spike command with pi
*/
export function registerSpikeCommand(pi: ExtensionAPI): void {
pi.registerCommand("spike", {
description: "Phase 0 Spike: Start tmux stream PoC (ws://127.0.0.1:7799/spike)",
handler: async (_args, ctx) => {
await runSpike(ctx);
},
});
}