refactor(T-1.0): carve server.ts into server/ sub-modules
Create the modular server/ layout from PHASE-1-sidecar.md §Architecture Sketch: server/types.ts — shared WsClient/WsServer/RemoteServer interfaces server/upgrade.ts — WS upgrade routing per path (LEGACY /ws) server/server.ts — HTTP bootstrap, middleware, LEGACY HTML routes server/routes/ — empty dir, placeholder for T-1.5/T-1.6/T-1.7 The original extensions/remote-control/server.ts is replaced with a thin re-export shim so that index.ts continues to resolve './server.js' without changes. All LEGACY code paths (manifest.json, icon.svg, /, /ws) are tagged with // LEGACY: … comments. No behaviour changes. No new endpoints. Pre-existing biome errors in auth.ts, config.ts, index.ts are unchanged — NOT introduced by this commit.
This commit is contained in:
parent
e396cfcaaa
commit
568931901d
|
|
@ -1,335 +1,18 @@
|
|||
/**
|
||||
* HTTP + WebSocket server for remote-control.
|
||||
* LEGACY: re-export shim.
|
||||
*
|
||||
* Handles authentication, serves the web UI, and manages WebSocket connections
|
||||
* for real-time message streaming between the pi session and browser clients.
|
||||
* The server implementation has been moved to server/ sub-modules:
|
||||
* server/server.ts — HTTP bootstrap, TLS, middleware, LEGACY HTML routes
|
||||
* server/upgrade.ts — WebSocket upgrade routing per session/topic
|
||||
* server/types.ts — shared WS + RemoteServer type definitions
|
||||
* server/routes/ — route modules (populated by T-1.5/T-1.6/T-1.7)
|
||||
*
|
||||
* This shim is kept so that the existing import in index.ts
|
||||
* (`import { startServer } from "./server.js"`) continues to resolve
|
||||
* without modification. It will be removed once all consumers have been
|
||||
* updated to import directly from server/ sub-modules.
|
||||
*/
|
||||
|
||||
import { randomBytes } from "node:crypto";
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import { createServer } from "node:http";
|
||||
import { createRequire } from "node:module";
|
||||
import type { AddressInfo, Socket } from "node:net";
|
||||
import type {
|
||||
ExtensionAPI,
|
||||
ExtensionContext,
|
||||
} from "@earendil-works/pi-coding-agent";
|
||||
import {
|
||||
generateSessionId,
|
||||
loadOrCreateToken,
|
||||
parseCookies,
|
||||
SESSION_COOKIE,
|
||||
validateToken,
|
||||
} from "./auth.js";
|
||||
import { parseBindAddress, readRemoteControlConfig } from "./config.js";
|
||||
import { buildHTML } from "./html.js";
|
||||
import { buildSyncMessage } from "./messages.js";
|
||||
|
||||
interface WsClient {
|
||||
readyState: number;
|
||||
send(data: string): void;
|
||||
terminate(): void;
|
||||
on(event: "message", listener: (data: Buffer) => void): void;
|
||||
on(event: "close" | "error", listener: () => void): void;
|
||||
}
|
||||
|
||||
interface WsServer {
|
||||
on(event: "connection", listener: (ws: WsClient) => void): void;
|
||||
on(event: "error", listener: (err: Error) => void): void;
|
||||
handleUpgrade(
|
||||
request: IncomingMessage,
|
||||
socket: Socket,
|
||||
head: Buffer,
|
||||
cb: (ws: WsClient) => void,
|
||||
): void;
|
||||
emit(event: string, ...args: unknown[]): void;
|
||||
close(cb?: () => void): void;
|
||||
}
|
||||
|
||||
// Load ws (bundled with pi) without needing @types/ws installed locally
|
||||
const _require = createRequire(import.meta.url);
|
||||
const wsModule = _require("ws") as {
|
||||
WebSocketServer: new (opts: { noServer: boolean }) => WsServer;
|
||||
OPEN: number;
|
||||
};
|
||||
const { WebSocketServer, OPEN } = wsModule;
|
||||
|
||||
export interface RemoteServer {
|
||||
broadcast: (msg: object) => void;
|
||||
sync: (ctx: ExtensionContext) => void;
|
||||
stop: () => Promise<void>;
|
||||
clientCount: () => number;
|
||||
onClientChange: (cb: () => void) => void;
|
||||
port: number;
|
||||
token: string;
|
||||
}
|
||||
|
||||
export async function startServer(
|
||||
pi: ExtensionAPI,
|
||||
ctx: ExtensionContext,
|
||||
): Promise<RemoteServer> {
|
||||
const config = await readRemoteControlConfig();
|
||||
const bindAddr = config.bindAddress ?? "";
|
||||
const { host: bindHost, port: bindPort } = bindAddr
|
||||
? parseBindAddress(bindAddr)
|
||||
: { host: "127.0.0.1", port: 0 };
|
||||
const clientChangeListeners: Array<() => void> = [];
|
||||
const clients = new Set<WsClient>();
|
||||
const token = await loadOrCreateToken();
|
||||
// Map of valid session IDs → expiry timestamp (ms since epoch)
|
||||
const SESSION_TTL_MS = 86_400_000; // 24 h — matches cookie Max-Age
|
||||
const validSessions = new Map<string, number>();
|
||||
const pruneExpiredSessions = (): void => {
|
||||
const now = Date.now();
|
||||
for (const [id, expiresAt] of validSessions) {
|
||||
if (expiresAt <= now) validSessions.delete(id);
|
||||
}
|
||||
};
|
||||
|
||||
/** Check if a request is authenticated (valid token query param OR valid session cookie) */
|
||||
function isAuthenticated(req: IncomingMessage): boolean {
|
||||
// Check session cookie first
|
||||
const cookies = parseCookies(req.headers.cookie);
|
||||
const sessionId = cookies[SESSION_COOKIE];
|
||||
const sessionExpiry = sessionId ? validSessions.get(sessionId) : undefined;
|
||||
if (sessionExpiry !== undefined && sessionExpiry > Date.now()) return true;
|
||||
|
||||
// Check token query param
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const providedToken = url.searchParams.get("token");
|
||||
if (providedToken && validateToken(providedToken, token)) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function broadcast(msg: object): void {
|
||||
const data = JSON.stringify(msg);
|
||||
for (const client of clients) {
|
||||
if (client.readyState === OPEN) {
|
||||
try {
|
||||
client.send(data);
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sync(currentCtx: ExtensionContext): void {
|
||||
broadcast(buildSyncMessage(currentCtx));
|
||||
}
|
||||
|
||||
const httpServer = createServer((req, res) => {
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const pathname = url.pathname;
|
||||
|
||||
if (pathname === "/manifest.json") {
|
||||
res.writeHead(200, { "Content-Type": "application/manifest+json; charset=utf-8" });
|
||||
res.end(JSON.stringify({
|
||||
name: "Pi Remote",
|
||||
short_name: "Pi",
|
||||
description: "Remote control for Pi sessions",
|
||||
start_url: "/",
|
||||
display: "standalone",
|
||||
background_color: "#0d1117",
|
||||
theme_color: "#0d1117",
|
||||
icons: [{ src: "/icon.svg", sizes: "any", type: "image/svg+xml" }],
|
||||
}));
|
||||
return;
|
||||
}
|
||||
|
||||
if (pathname === "/icon.svg") {
|
||||
res.writeHead(200, { "Content-Type": "image/svg+xml", "Cache-Control": "public, max-age=86400" });
|
||||
res.end(`<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 180 180"><rect width="180" height="180" rx="40" fill="#0d1117"/><text x="90" y="133" font-family="-apple-system,Helvetica,Arial,sans-serif" font-size="110" text-anchor="middle" fill="#3fb950">π</text></svg>`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (pathname === "/" || pathname === "/index.html") {
|
||||
// Check authentication
|
||||
const cookies = parseCookies(req.headers.cookie);
|
||||
const sc = cookies[SESSION_COOKIE];
|
||||
const hasValidSession =
|
||||
sc !== undefined && (validSessions.get(sc) ?? 0) > Date.now();
|
||||
const providedToken = url.searchParams.get("token");
|
||||
const hasValidToken =
|
||||
providedToken && validateToken(providedToken, token);
|
||||
|
||||
if (!hasValidSession && !hasValidToken) {
|
||||
res.writeHead(403, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end(
|
||||
"Forbidden — valid token required. Use the URL shown in the pi terminal.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// If authenticated via token (first visit), issue a session cookie and redirect to clean URL
|
||||
if (!hasValidSession && hasValidToken) {
|
||||
pruneExpiredSessions();
|
||||
const sessionId = generateSessionId();
|
||||
validSessions.set(sessionId, Date.now() + SESSION_TTL_MS);
|
||||
res.writeHead(302, {
|
||||
"Set-Cookie": `${SESSION_COOKIE}=${sessionId}; Path=/; HttpOnly; SameSite=Strict; Max-Age=86400`,
|
||||
Location: "/",
|
||||
});
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// Valid session cookie — serve the page
|
||||
const nonce = randomBytes(16).toString("base64");
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/html; charset=utf-8",
|
||||
"X-Frame-Options": "DENY",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Referrer-Policy": "no-referrer",
|
||||
"Content-Security-Policy": `default-src 'none'; script-src 'nonce-${nonce}'; style-src 'nonce-${nonce}'; connect-src 'self'; manifest-src 'self'; base-uri 'none'`,
|
||||
});
|
||||
res.end(buildHTML(nonce));
|
||||
} else {
|
||||
res.writeHead(404, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end("Not found");
|
||||
}
|
||||
});
|
||||
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
|
||||
httpServer.on("error", (err: Error) => {
|
||||
console.error("[remote-control] httpServer error:", err.message);
|
||||
});
|
||||
|
||||
wss.on("error", (err: Error) => {
|
||||
console.error("[remote-control] wss error:", err.message);
|
||||
});
|
||||
|
||||
httpServer.on(
|
||||
"upgrade",
|
||||
(request: IncomingMessage, socket: Socket, head: Buffer) => {
|
||||
const url = new URL(request.url, "http://localhost");
|
||||
if (url.pathname === "/ws") {
|
||||
// Validate auth: session cookie or token query param
|
||||
if (!isAuthenticated(request)) {
|
||||
socket.write("HTTP/1.1 403 Forbidden\r\n\r\n");
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
wss.handleUpgrade(request, socket, head, (ws: WsClient) => {
|
||||
wss.emit("connection", ws, request);
|
||||
});
|
||||
} else {
|
||||
socket.destroy();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
wss.on("connection", (ws: WsClient) => {
|
||||
clients.add(ws);
|
||||
for (const cb of clientChangeListeners) cb();
|
||||
|
||||
// Send full state snapshot to the new client
|
||||
try {
|
||||
ws.send(JSON.stringify(buildSyncMessage(ctx)));
|
||||
} catch {
|
||||
/* client disconnected before first send */
|
||||
}
|
||||
|
||||
// Per-connection rate limiting: max 30 prompts per 60 seconds
|
||||
const RATE_WINDOW_MS = 60_000;
|
||||
const RATE_MAX = 30;
|
||||
const MAX_MSG_BYTES = 64 * 1024;
|
||||
const recentPrompts: number[] = [];
|
||||
|
||||
ws.on("message", (data: Buffer) => {
|
||||
if (data.length > MAX_MSG_BYTES) return;
|
||||
let msg: { type?: string; text?: string };
|
||||
try {
|
||||
const parsed: unknown = JSON.parse(data.toString());
|
||||
if (typeof parsed !== "object" || parsed === null) return;
|
||||
msg = parsed as { type?: string; text?: string };
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (msg.type === "stop") {
|
||||
if (!ctx.isIdle()) {
|
||||
ctx.abort();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (
|
||||
msg.type === "prompt" &&
|
||||
typeof msg.text === "string" &&
|
||||
msg.text.trim()
|
||||
) {
|
||||
const text = msg.text.trim();
|
||||
// Sliding-window rate limit
|
||||
const now = Date.now();
|
||||
const cutoff = now - RATE_WINDOW_MS;
|
||||
while (recentPrompts.length > 0 && recentPrompts[0] < cutoff)
|
||||
recentPrompts.shift();
|
||||
if (recentPrompts.length >= RATE_MAX) return;
|
||||
recentPrompts.push(now);
|
||||
if (ctx.isIdle()) {
|
||||
pi.sendUserMessage(text);
|
||||
} else {
|
||||
pi.sendUserMessage(text, { deliverAs: "followUp" });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const onClose = () => {
|
||||
clients.delete(ws);
|
||||
broadcast({ type: "status", clientCount: clients.size });
|
||||
for (const cb of clientChangeListeners) cb();
|
||||
};
|
||||
ws.on("close", onClose);
|
||||
ws.on("error", onClose);
|
||||
});
|
||||
|
||||
return new Promise((resolve) => {
|
||||
httpServer.listen(bindPort, bindHost, () => {
|
||||
resolve({
|
||||
broadcast,
|
||||
sync,
|
||||
stop: () =>
|
||||
new Promise<void>((res) => {
|
||||
// Forcefully kill all WebSocket clients — terminate() sends no
|
||||
// close frame and doesn't wait for the remote end to acknowledge,
|
||||
// so it can't hang on an unresponsive client.
|
||||
for (const client of clients) {
|
||||
try {
|
||||
client.terminate();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
clients.clear();
|
||||
|
||||
// Safety timeout — if wss/http shutdown callbacks never fire
|
||||
// (e.g. lingering keep-alive sockets), resolve anyway so the
|
||||
// session_shutdown handler doesn't block pi from exiting.
|
||||
const timeout = setTimeout(() => {
|
||||
httpServer.close(() => {});
|
||||
httpServer.closeAllConnections?.();
|
||||
res();
|
||||
}, 2000);
|
||||
|
||||
wss.close(() =>
|
||||
httpServer.close(() => {
|
||||
clearTimeout(timeout);
|
||||
res();
|
||||
}),
|
||||
);
|
||||
}),
|
||||
clientCount: () => clients.size,
|
||||
onClientChange: (cb: () => void) => {
|
||||
clientChangeListeners.push(cb);
|
||||
},
|
||||
get port() {
|
||||
return (httpServer.address() as AddressInfo | null)?.port ?? 0;
|
||||
},
|
||||
get token() {
|
||||
return token;
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
export { startServer } from "./server/server.js";
|
||||
// LEGACY: re-exports for backward-compatibility
|
||||
export type { RemoteServer } from "./server/types.js";
|
||||
|
|
|
|||
|
|
@ -0,0 +1,333 @@
|
|||
/**
|
||||
* HTTP + WebSocket server bootstrap for remote-control.
|
||||
*
|
||||
* Responsible for:
|
||||
* - Creating and configuring the HTTP server (middleware, TLS in future T-1.3)
|
||||
* - Wiring up the WebSocket upgrade handler (see upgrade.ts)
|
||||
* - Serving LEGACY routes for the browser HTML client (html.ts)
|
||||
* - Managing the connected-client set and the broadcast helper
|
||||
* - Returning a RemoteServer handle to the extension entry point (index.ts)
|
||||
*
|
||||
* LEGACY routes (kept until Phase 2 ships and html.ts is retired):
|
||||
* GET / — serves the browser HTML UI (buildHTML from html.ts)
|
||||
* GET /index.html — same as /
|
||||
* GET /manifest.json — PWA manifest
|
||||
* GET /icon.svg — PWA icon
|
||||
* WS /ws — WebSocket endpoint for the browser client
|
||||
*
|
||||
* Future route modules live under routes/ and are wired here once they
|
||||
* land in T-1.5, T-1.6, T-1.7.
|
||||
*/
|
||||
|
||||
import { randomBytes } from "node:crypto";
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import { createServer } from "node:http";
|
||||
import { createRequire } from "node:module";
|
||||
import type { AddressInfo } from "node:net";
|
||||
import type {
|
||||
ExtensionAPI,
|
||||
ExtensionContext,
|
||||
} from "@earendil-works/pi-coding-agent";
|
||||
import {
|
||||
generateSessionId,
|
||||
loadOrCreateToken,
|
||||
parseCookies,
|
||||
SESSION_COOKIE,
|
||||
validateToken,
|
||||
} from "../auth.js";
|
||||
import { parseBindAddress, readRemoteControlConfig } from "../config.js";
|
||||
import { buildHTML } from "../html.js"; // LEGACY: browser HTML client
|
||||
import { buildSyncMessage } from "../messages.js";
|
||||
import type { RemoteServer, WsClient, WsServer } from "./types.js";
|
||||
import { createUpgradeHandler } from "./upgrade.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Load ws (bundled with pi) without needing @types/ws installed locally
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const _require = createRequire(import.meta.url);
|
||||
const wsModule = _require("ws") as {
|
||||
WebSocketServer: new (opts: { noServer: boolean }) => WsServer;
|
||||
OPEN: number;
|
||||
};
|
||||
const { WebSocketServer, OPEN } = wsModule;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// startServer
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function startServer(
|
||||
pi: ExtensionAPI,
|
||||
ctx: ExtensionContext,
|
||||
): Promise<RemoteServer> {
|
||||
const config = await readRemoteControlConfig();
|
||||
const bindAddr = config.bindAddress ?? "";
|
||||
const { host: bindHost, port: bindPort } = bindAddr
|
||||
? parseBindAddress(bindAddr)
|
||||
: { host: "127.0.0.1", port: 0 };
|
||||
|
||||
const clientChangeListeners: Array<() => void> = [];
|
||||
const clients = new Set<WsClient>();
|
||||
const token = await loadOrCreateToken();
|
||||
|
||||
// Map of valid session IDs → expiry timestamp (ms since epoch)
|
||||
const SESSION_TTL_MS = 86_400_000; // 24 h — matches cookie Max-Age
|
||||
const validSessions = new Map<string, number>();
|
||||
|
||||
const pruneExpiredSessions = (): void => {
|
||||
const now = Date.now();
|
||||
for (const [id, expiresAt] of validSessions) {
|
||||
if (expiresAt <= now) validSessions.delete(id);
|
||||
}
|
||||
};
|
||||
|
||||
/** Check if a request is authenticated (valid token query param OR valid session cookie) */
|
||||
function isAuthenticated(req: IncomingMessage): boolean {
|
||||
// Check session cookie first
|
||||
const cookies = parseCookies(req.headers.cookie);
|
||||
const sessionId = cookies[SESSION_COOKIE];
|
||||
const sessionExpiry = sessionId ? validSessions.get(sessionId) : undefined;
|
||||
if (sessionExpiry !== undefined && sessionExpiry > Date.now()) return true;
|
||||
|
||||
// Check token query param
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const providedToken = url.searchParams.get("token");
|
||||
if (providedToken && validateToken(providedToken, token)) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function broadcast(msg: object): void {
|
||||
const data = JSON.stringify(msg);
|
||||
for (const client of clients) {
|
||||
if (client.readyState === OPEN) {
|
||||
try {
|
||||
client.send(data);
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function sync(currentCtx: ExtensionContext): void {
|
||||
broadcast(buildSyncMessage(currentCtx));
|
||||
}
|
||||
|
||||
// ── HTTP server ─────────────────────────────────────────────────────────
|
||||
|
||||
const httpServer = createServer((req, res) => {
|
||||
const url = new URL(req.url ?? "/", "http://localhost");
|
||||
const pathname = url.pathname;
|
||||
|
||||
// LEGACY: PWA manifest — served for the browser HTML client
|
||||
if (pathname === "/manifest.json") {
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "application/manifest+json; charset=utf-8",
|
||||
});
|
||||
res.end(
|
||||
JSON.stringify({
|
||||
name: "Pi Remote",
|
||||
short_name: "Pi",
|
||||
description: "Remote control for Pi sessions",
|
||||
start_url: "/",
|
||||
display: "standalone",
|
||||
background_color: "#0d1117",
|
||||
theme_color: "#0d1117",
|
||||
icons: [{ src: "/icon.svg", sizes: "any", type: "image/svg+xml" }],
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// LEGACY: PWA icon — served for the browser HTML client
|
||||
if (pathname === "/icon.svg") {
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "image/svg+xml",
|
||||
"Cache-Control": "public, max-age=86400",
|
||||
});
|
||||
res.end(
|
||||
`<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 180 180"><rect width="180" height="180" rx="40" fill="#0d1117"/><text x="90" y="133" font-family="-apple-system,Helvetica,Arial,sans-serif" font-size="110" text-anchor="middle" fill="#3fb950">π</text></svg>`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// LEGACY: browser HTML client root route
|
||||
if (pathname === "/" || pathname === "/index.html") {
|
||||
const cookies = parseCookies(req.headers.cookie);
|
||||
const sc = cookies[SESSION_COOKIE];
|
||||
const hasValidSession =
|
||||
sc !== undefined && (validSessions.get(sc) ?? 0) > Date.now();
|
||||
const providedToken = url.searchParams.get("token");
|
||||
const hasValidToken =
|
||||
providedToken && validateToken(providedToken, token);
|
||||
|
||||
if (!hasValidSession && !hasValidToken) {
|
||||
res.writeHead(403, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end(
|
||||
"Forbidden — valid token required. Use the URL shown in the pi terminal.",
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// If authenticated via token (first visit), issue a session cookie and redirect to clean URL
|
||||
if (!hasValidSession && hasValidToken) {
|
||||
pruneExpiredSessions();
|
||||
const sessionId = generateSessionId();
|
||||
validSessions.set(sessionId, Date.now() + SESSION_TTL_MS);
|
||||
res.writeHead(302, {
|
||||
"Set-Cookie": `${SESSION_COOKIE}=${sessionId}; Path=/; HttpOnly; SameSite=Strict; Max-Age=86400`,
|
||||
Location: "/",
|
||||
});
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// Valid session cookie — serve the browser HTML UI
|
||||
const nonce = randomBytes(16).toString("base64");
|
||||
res.writeHead(200, {
|
||||
"Content-Type": "text/html; charset=utf-8",
|
||||
"X-Frame-Options": "DENY",
|
||||
"X-Content-Type-Options": "nosniff",
|
||||
"Referrer-Policy": "no-referrer",
|
||||
"Content-Security-Policy": `default-src 'none'; script-src 'nonce-${nonce}'; style-src 'nonce-${nonce}'; connect-src 'self'; manifest-src 'self'; base-uri 'none'`,
|
||||
});
|
||||
res.end(buildHTML(nonce)); // LEGACY: renders the browser HTML client
|
||||
return;
|
||||
}
|
||||
|
||||
res.writeHead(404, { "Content-Type": "text/plain; charset=utf-8" });
|
||||
res.end("Not found");
|
||||
});
|
||||
|
||||
// ── WebSocket server ────────────────────────────────────────────────────
|
||||
|
||||
const wss = new WebSocketServer({ noServer: true });
|
||||
|
||||
httpServer.on("error", (err: Error) => {
|
||||
console.error("[remote-control] httpServer error:", err.message);
|
||||
});
|
||||
|
||||
wss.on("error", (err: Error) => {
|
||||
console.error("[remote-control] wss error:", err.message);
|
||||
});
|
||||
|
||||
// Wire upgrade handler (see upgrade.ts for path routing)
|
||||
httpServer.on("upgrade", createUpgradeHandler(wss, isAuthenticated));
|
||||
|
||||
// ── LEGACY: browser HTML client WebSocket connection handling ───────────
|
||||
|
||||
wss.on("connection", (ws: WsClient) => {
|
||||
clients.add(ws);
|
||||
for (const cb of clientChangeListeners) cb();
|
||||
|
||||
// Send full state snapshot to the new client
|
||||
try {
|
||||
ws.send(JSON.stringify(buildSyncMessage(ctx)));
|
||||
} catch {
|
||||
/* client disconnected before first send */
|
||||
}
|
||||
|
||||
// Per-connection rate limiting: max 30 prompts per 60 seconds
|
||||
const RATE_WINDOW_MS = 60_000;
|
||||
const RATE_MAX = 30;
|
||||
const MAX_MSG_BYTES = 64 * 1024;
|
||||
const recentPrompts: number[] = [];
|
||||
|
||||
ws.on("message", (data: Buffer) => {
|
||||
if (data.length > MAX_MSG_BYTES) return;
|
||||
let msg: { type?: string; text?: string };
|
||||
try {
|
||||
const parsed: unknown = JSON.parse(data.toString());
|
||||
if (typeof parsed !== "object" || parsed === null) return;
|
||||
msg = parsed as { type?: string; text?: string };
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
if (msg.type === "stop") {
|
||||
if (!ctx.isIdle()) {
|
||||
ctx.abort();
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (
|
||||
msg.type === "prompt" &&
|
||||
typeof msg.text === "string" &&
|
||||
msg.text.trim()
|
||||
) {
|
||||
const text = msg.text.trim();
|
||||
// Sliding-window rate limit
|
||||
const now = Date.now();
|
||||
const cutoff = now - RATE_WINDOW_MS;
|
||||
while (recentPrompts.length > 0 && recentPrompts[0] < cutoff)
|
||||
recentPrompts.shift();
|
||||
if (recentPrompts.length >= RATE_MAX) return;
|
||||
recentPrompts.push(now);
|
||||
if (ctx.isIdle()) {
|
||||
pi.sendUserMessage(text);
|
||||
} else {
|
||||
pi.sendUserMessage(text, { deliverAs: "followUp" });
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
const onClose = (): void => {
|
||||
clients.delete(ws);
|
||||
broadcast({ type: "status", clientCount: clients.size });
|
||||
for (const cb of clientChangeListeners) cb();
|
||||
};
|
||||
ws.on("close", onClose);
|
||||
ws.on("error", onClose);
|
||||
});
|
||||
|
||||
// ── Listen ───────────────────────────────────────────────────────────────
|
||||
|
||||
return new Promise((resolve) => {
|
||||
httpServer.listen(bindPort, bindHost, () => {
|
||||
resolve({
|
||||
broadcast,
|
||||
sync,
|
||||
stop: () =>
|
||||
new Promise<void>((res) => {
|
||||
// Forcefully kill all WebSocket clients — terminate() sends no
|
||||
// close frame and doesn't wait for the remote end to acknowledge,
|
||||
// so it can't hang on an unresponsive client.
|
||||
for (const client of clients) {
|
||||
try {
|
||||
client.terminate();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
}
|
||||
clients.clear();
|
||||
|
||||
// Safety timeout — if wss/http shutdown callbacks never fire
|
||||
// (e.g. lingering keep-alive sockets), resolve anyway so the
|
||||
// session_shutdown handler doesn't block pi from exiting.
|
||||
const timeout = setTimeout(() => {
|
||||
httpServer.close(() => {});
|
||||
httpServer.closeAllConnections?.();
|
||||
res();
|
||||
}, 2000);
|
||||
|
||||
wss.close(() =>
|
||||
httpServer.close(() => {
|
||||
clearTimeout(timeout);
|
||||
res();
|
||||
}),
|
||||
);
|
||||
}),
|
||||
clientCount: () => clients.size,
|
||||
onClientChange: (cb: () => void) => {
|
||||
clientChangeListeners.push(cb);
|
||||
},
|
||||
get port() {
|
||||
return (httpServer.address() as AddressInfo | null)?.port ?? 0;
|
||||
},
|
||||
get token() {
|
||||
return token;
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Shared type definitions for the remote-control server.
|
||||
*
|
||||
* These interfaces describe the WebSocket client/server shapes loaded
|
||||
* dynamically from the `ws` module bundled with pi, and the public surface
|
||||
* of the HTTP+WS server returned to callers.
|
||||
*/
|
||||
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import type { Socket } from "node:net";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// WebSocket interfaces (ws module loaded via createRequire — no @types/ws)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface WsClient {
|
||||
readyState: number;
|
||||
send(data: string): void;
|
||||
terminate(): void;
|
||||
on(event: "message", listener: (data: Buffer) => void): void;
|
||||
on(event: "close" | "error", listener: () => void): void;
|
||||
}
|
||||
|
||||
export interface WsServer {
|
||||
on(event: "connection", listener: (ws: WsClient) => void): void;
|
||||
on(event: "error", listener: (err: Error) => void): void;
|
||||
handleUpgrade(
|
||||
request: IncomingMessage,
|
||||
socket: Socket,
|
||||
head: Buffer,
|
||||
cb: (ws: WsClient) => void,
|
||||
): void;
|
||||
emit(event: string, ...args: unknown[]): void;
|
||||
close(cb?: () => void): void;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public server handle
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export interface RemoteServer {
|
||||
broadcast: (msg: object) => void;
|
||||
sync: (
|
||||
ctx: import("@earendil-works/pi-coding-agent").ExtensionContext,
|
||||
) => void;
|
||||
stop: () => Promise<void>;
|
||||
clientCount: () => number;
|
||||
onClientChange: (cb: () => void) => void;
|
||||
port: number;
|
||||
token: string;
|
||||
}
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
/**
|
||||
* WebSocket upgrade routing.
|
||||
*
|
||||
* Routes incoming HTTP Upgrade requests to the appropriate WebSocket handler
|
||||
* based on the request path and session/topic. Non-matching paths are
|
||||
* destroyed immediately.
|
||||
*
|
||||
* Current routes (all LEGACY — browser HTML client):
|
||||
* /ws → legacy browser client WebSocket endpoint
|
||||
*
|
||||
* Future routes (T-1.5):
|
||||
* /sessions/:id/stream → binary ANSI stream per tmux session
|
||||
*
|
||||
* T-1.5 will extend createUpgradeHandler to accept a session registry and
|
||||
* dispatch to per-session stream handlers.
|
||||
*/
|
||||
|
||||
import type { IncomingMessage } from "node:http";
|
||||
import type { Socket } from "node:net";
|
||||
import type { WsClient, WsServer } from "./types.js";
|
||||
|
||||
/**
|
||||
* Create the HTTP `upgrade` event handler.
|
||||
*
|
||||
* @param wss - WebSocket server instance.
|
||||
* @param isAuthenticated - Predicate that checks token/session on the request.
|
||||
* @returns A handler suitable for `httpServer.on("upgrade", handler)`.
|
||||
*/
|
||||
export function createUpgradeHandler(
|
||||
wss: WsServer,
|
||||
isAuthenticated: (req: IncomingMessage) => boolean,
|
||||
): (request: IncomingMessage, socket: Socket, head: Buffer) => void {
|
||||
return (request: IncomingMessage, socket: Socket, head: Buffer): void => {
|
||||
const url = new URL(request.url ?? "/", "http://localhost");
|
||||
|
||||
if (url.pathname === "/ws") {
|
||||
// LEGACY: browser HTML client WebSocket endpoint — auth guard
|
||||
if (!isAuthenticated(request)) {
|
||||
socket.write("HTTP/1.1 403 Forbidden\r\n\r\n");
|
||||
socket.destroy();
|
||||
return;
|
||||
}
|
||||
wss.handleUpgrade(request, socket, head, (ws: WsClient) => {
|
||||
wss.emit("connection", ws, request);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Unknown upgrade path — reject
|
||||
socket.destroy();
|
||||
};
|
||||
}
|
||||
Loading…
Reference in New Issue