From 17c32e7e93cfe5554020ed9aba1a81b2a500054d Mon Sep 17 00:00:00 2001 From: jay Date: Fri, 15 May 2026 11:29:41 +0200 Subject: [PATCH] feat(T-1.2): sequence counter + disk ring-buffer writer/reader --- extensions/remote-control/buffer/reader.ts | 91 +++++++++ extensions/remote-control/buffer/writer.ts | 209 +++++++++++++++++++++ extensions/remote-control/sequence.ts | 35 ++++ 3 files changed, 335 insertions(+) create mode 100644 extensions/remote-control/buffer/reader.ts create mode 100644 extensions/remote-control/buffer/writer.ts create mode 100644 extensions/remote-control/sequence.ts diff --git a/extensions/remote-control/buffer/reader.ts b/extensions/remote-control/buffer/reader.ts new file mode 100644 index 0000000..b1453bf --- /dev/null +++ b/extensions/remote-control/buffer/reader.ts @@ -0,0 +1,91 @@ +/** + * Disk ring-buffer reader. + * + * Reads chunks from a session buffer file, optionally starting from a + * given seq number. Used by the stream route for reconnect replay (T-1.5). + * + * File format (mirrors writer.ts): + * Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes] + * + * Owner: T-1.2 + */ + +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; + +export interface BufferChunk { + seq: number; + data: Buffer; +} + +export interface ReaderConfig { + stateDir?: string; +} + +function stateDir(cfg?: ReaderConfig): string { + return ( + cfg?.stateDir ?? path.join(os.homedir(), ".local", "share", "pi-remote") + ); +} + +function bufferPath(session: string, cfg?: ReaderConfig): string { + return path.join(stateDir(cfg), "buffers", `${session}.buf`); +} + +/** + * Read all chunks from a session buffer, optionally starting after `afterSeq`. + * + * Returns chunks in seq order. If the file doesn't exist, returns []. + * Stops at the first parse error (truncated file at end is tolerated). + */ +export function readChunks( + session: string, + opts: { afterSeq?: number; cfg?: ReaderConfig } = {}, +): BufferChunk[] { + const { afterSeq = 0, cfg } = opts; + const fp = bufferPath(session, cfg); + + let buf: Buffer; + try { + buf = fs.readFileSync(fp); + } catch { + return []; + } + + const chunks: BufferChunk[] = []; + let offset = 0; + + while (offset + 12 <= buf.length) { + const seqBig = buf.readBigUInt64BE(offset); + const seq = Number(seqBig); + const length = buf.readUInt32BE(offset + 8); + offset += 12; + + if (offset + length > buf.length) break; // truncated record at end + + if (seq > afterSeq) { + chunks.push({ seq, data: buf.slice(offset, offset + length) }); + } + offset += length; + } + + return chunks; +} + +/** + * Read chunks as an async generator (streaming, for large buffers). + * Yields one chunk at a time after `afterSeq`. + */ +export async function* streamChunks( + session: string, + opts: { afterSeq?: number; cfg?: ReaderConfig } = {}, +): AsyncGenerator { + // Simple implementation: read all and yield. For large files T-1.5 can + // switch to a streaming file read if needed. + const { afterSeq = 0, cfg } = opts; + const chunks = readChunks(session, { afterSeq, cfg }); + for (const chunk of chunks) { + yield chunk; + } +} diff --git a/extensions/remote-control/buffer/writer.ts b/extensions/remote-control/buffer/writer.ts new file mode 100644 index 0000000..2ed726d --- /dev/null +++ b/extensions/remote-control/buffer/writer.ts @@ -0,0 +1,209 @@ +/** + * Disk ring-buffer writer. + * + * Appends chunks to a per-session file, enforcing: + * - Per-session cap: 100 MB (configurable) + * - Global cap: 1 GB across all sessions (configurable) + * - Free-space watchdog: refuse writes if free disk < 1 GB + * - Idle cleanup: sessions inactive for > 30 days are deleted + * + * File format (binary, append-only): + * Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes] + * + * Risk R1 mitigation: all writes serialised through a per-session async queue. + * Global cap protected by a module-level mutex (simple flag since JS is single-threaded). + * + * Owner: T-1.2 + */ + +import fs from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import type { SeqNum } from "../sequence.js"; + +// --------------------------------------------------------------------------- +// Config defaults (can be overridden; T-1.7 will wire these from config.toml) +// --------------------------------------------------------------------------- + +export interface BufferConfig { + stateDir: string; + perSessionMb: number; // default 100 + globalGb: number; // default 1 + freeMinGb: number; // default 1 + idleDays: number; // default 30 +} + +function defaultConfig(): BufferConfig { + return { + stateDir: path.join(os.homedir(), ".local", "share", "pi-remote"), + perSessionMb: 100, + globalGb: 1, + freeMinGb: 1, + idleDays: 30, + }; +} + +let _config: BufferConfig = defaultConfig(); + +export function configureBuffer(cfg: Partial): void { + _config = { ..._config, ...cfg }; +} + +// --------------------------------------------------------------------------- +// Global cap mutex (JS single-threaded so a flag suffices) +// --------------------------------------------------------------------------- + +let _globalBusy = false; +let _globalBytes = 0; // tracked in-memory; recalculated on startup + +// --------------------------------------------------------------------------- +// Per-session writer +// --------------------------------------------------------------------------- + +export class BufferWriter { + readonly session: string; + private filePath: string; + private queue: Promise = Promise.resolve(); + private sessionBytes = 0; + private lastWriteAt = Date.now(); + + constructor(session: string) { + this.session = session; + this.filePath = path.join(_config.stateDir, "buffers", `${session}.buf`); + } + + async open(): Promise { + await fs.mkdir(path.dirname(this.filePath), { recursive: true }); + // Load existing size for cap tracking + try { + const stat = await fs.stat(this.filePath); + this.sessionBytes = stat.size; + _globalBytes += stat.size; + } catch { + this.sessionBytes = 0; + } + } + + /** + * Enqueue a chunk write. Writes are serialised per session. + */ + write(seq: SeqNum, data: Buffer): void { + this.queue = this.queue.then(() => this._write(seq, data)); + } + + private async _write(seq: SeqNum, data: Buffer): Promise { + const perSessionCap = _config.perSessionMb * 1024 * 1024; + const globalCap = _config.globalGb * 1024 * 1024 * 1024; + + // Free-space watchdog + try { + const { available } = await checkFreeSpace(path.dirname(this.filePath)); + const freeMin = _config.freeMinGb * 1024 * 1024 * 1024; + if (available < freeMin) return; // silently drop; could emit a warning + } catch { + // If we can't check, don't block writes + } + + // Cap enforcement + const recordSize = 8 + 4 + data.length; + if ( + _globalBusy || + this.sessionBytes + recordSize > perSessionCap || + _globalBytes + recordSize > globalCap + ) { + return; // drop oldest strategy: just don't write (ring via truncation not implemented yet) + } + + _globalBusy = true; + try { + const header = Buffer.allocUnsafe(12); + header.writeBigUInt64BE(BigInt(seq), 0); + header.writeUInt32BE(data.length, 8); + + await fs.appendFile(this.filePath, Buffer.concat([header, data])); + this.sessionBytes += recordSize; + _globalBytes += recordSize; + this.lastWriteAt = Date.now(); + } finally { + _globalBusy = false; + } + } + + async close(): Promise { + await this.queue; // drain + } + + /** Delete the buffer file and reclaim global tracking bytes. */ + async delete(): Promise { + await this.queue; + try { + await fs.unlink(this.filePath); + _globalBytes = Math.max(0, _globalBytes - this.sessionBytes); + this.sessionBytes = 0; + } catch { + // already gone + } + } + + get idleMs(): number { + return Date.now() - this.lastWriteAt; + } +} + +// --------------------------------------------------------------------------- +// Idle cleanup +// --------------------------------------------------------------------------- + +/** + * Delete buffer files for sessions idle longer than idleDays. + * Safe to call periodically (e.g. on startup or daily timer). + */ +export async function cleanupIdleBuffers( + cfg: BufferConfig = _config, +): Promise { + const dir = path.join(cfg.stateDir, "buffers"); + const maxIdleMs = cfg.idleDays * 24 * 60 * 60 * 1000; + const deleted: string[] = []; + + let entries: fs.Dirent[] = []; + try { + entries = await fs.readdir(dir, { withFileTypes: true }); + } catch { + return deleted; + } + + for (const entry of entries) { + if (!entry.name.endsWith(".buf")) continue; + const fp = path.join(dir, entry.name); + try { + const stat = await fs.stat(fp); + if (Date.now() - stat.mtimeMs > maxIdleMs) { + await fs.unlink(fp); + deleted.push(entry.name.replace(/\.buf$/, "")); + } + } catch { + // skip + } + } + return deleted; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/** Approximate free disk space on the filesystem containing `dir`. */ +async function checkFreeSpace(dir: string): Promise<{ available: number }> { + // Node doesn't expose statvfs directly; use df -k as a fallback. + // If it fails, caller ignores the error. + const { execFile } = await import("node:child_process"); + const { promisify } = await import("node:util"); + const exec = promisify(execFile); + const { stdout } = await exec("df", ["-k", dir]); + const lines = stdout.trim().split("\n"); + const last = lines[lines.length - 1]; + const parts = last.split(/\s+/); + // df -k columns: Filesystem 1K-blocks Used Available Use% Mounted + const availKb = parseInt(parts[3], 10); + return { available: availKb * 1024 }; +} diff --git a/extensions/remote-control/sequence.ts b/extensions/remote-control/sequence.ts new file mode 100644 index 0000000..99e71a9 --- /dev/null +++ b/extensions/remote-control/sequence.ts @@ -0,0 +1,35 @@ +/** + * Monotonic sequence number generator — shared by stream + buffer. + * + * Each chunk of output gets a unique, monotonically increasing seq number. + * This lets clients resume a stream from a known position (IC-1 `lastSeq`). + * + * Owner: T-1.2 + */ + +export type SeqNum = number; // safe JS integer, starts at 1 + +/** + * Per-session sequence counter. + * Create one instance per session; share between the buffer writer and the + * WebSocket broadcaster. + */ +export class SequenceCounter { + private current: SeqNum = 0; + + /** Increment and return the next seq number. */ + next(): SeqNum { + this.current += 1; + return this.current; + } + + /** Current value without incrementing. */ + peek(): SeqNum { + return this.current; + } + + /** Reset (e.g. after session restart). */ + reset(): void { + this.current = 0; + } +}