feat(T-1.2): sequence counter + disk ring-buffer writer/reader

This commit is contained in:
Johannes Merz 2026-05-15 11:29:41 +02:00
parent bd990a07ab
commit 17c32e7e93
3 changed files with 335 additions and 0 deletions

View File

@ -0,0 +1,91 @@
/**
* Disk ring-buffer reader.
*
* Reads chunks from a session buffer file, optionally starting from a
* given seq number. Used by the stream route for reconnect replay (T-1.5).
*
* File format (mirrors writer.ts):
* Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes]
*
* Owner: T-1.2
*/
import fs from "node:fs";
import os from "node:os";
import path from "node:path";
export interface BufferChunk {
seq: number;
data: Buffer;
}
export interface ReaderConfig {
stateDir?: string;
}
function stateDir(cfg?: ReaderConfig): string {
return (
cfg?.stateDir ?? path.join(os.homedir(), ".local", "share", "pi-remote")
);
}
function bufferPath(session: string, cfg?: ReaderConfig): string {
return path.join(stateDir(cfg), "buffers", `${session}.buf`);
}
/**
* Read all chunks from a session buffer, optionally starting after `afterSeq`.
*
* Returns chunks in seq order. If the file doesn't exist, returns [].
* Stops at the first parse error (truncated file at end is tolerated).
*/
export function readChunks(
session: string,
opts: { afterSeq?: number; cfg?: ReaderConfig } = {},
): BufferChunk[] {
const { afterSeq = 0, cfg } = opts;
const fp = bufferPath(session, cfg);
let buf: Buffer;
try {
buf = fs.readFileSync(fp);
} catch {
return [];
}
const chunks: BufferChunk[] = [];
let offset = 0;
while (offset + 12 <= buf.length) {
const seqBig = buf.readBigUInt64BE(offset);
const seq = Number(seqBig);
const length = buf.readUInt32BE(offset + 8);
offset += 12;
if (offset + length > buf.length) break; // truncated record at end
if (seq > afterSeq) {
chunks.push({ seq, data: buf.slice(offset, offset + length) });
}
offset += length;
}
return chunks;
}
/**
* Read chunks as an async generator (streaming, for large buffers).
* Yields one chunk at a time after `afterSeq`.
*/
export async function* streamChunks(
session: string,
opts: { afterSeq?: number; cfg?: ReaderConfig } = {},
): AsyncGenerator<BufferChunk> {
// Simple implementation: read all and yield. For large files T-1.5 can
// switch to a streaming file read if needed.
const { afterSeq = 0, cfg } = opts;
const chunks = readChunks(session, { afterSeq, cfg });
for (const chunk of chunks) {
yield chunk;
}
}

View File

@ -0,0 +1,209 @@
/**
* Disk ring-buffer writer.
*
* Appends chunks to a per-session file, enforcing:
* - Per-session cap: 100 MB (configurable)
* - Global cap: 1 GB across all sessions (configurable)
* - Free-space watchdog: refuse writes if free disk < 1 GB
* - Idle cleanup: sessions inactive for > 30 days are deleted
*
* File format (binary, append-only):
* Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes]
*
* Risk R1 mitigation: all writes serialised through a per-session async queue.
* Global cap protected by a module-level mutex (simple flag since JS is single-threaded).
*
* Owner: T-1.2
*/
import fs from "node:fs/promises";
import os from "node:os";
import path from "node:path";
import type { SeqNum } from "../sequence.js";
// ---------------------------------------------------------------------------
// Config defaults (can be overridden; T-1.7 will wire these from config.toml)
// ---------------------------------------------------------------------------
export interface BufferConfig {
stateDir: string;
perSessionMb: number; // default 100
globalGb: number; // default 1
freeMinGb: number; // default 1
idleDays: number; // default 30
}
function defaultConfig(): BufferConfig {
return {
stateDir: path.join(os.homedir(), ".local", "share", "pi-remote"),
perSessionMb: 100,
globalGb: 1,
freeMinGb: 1,
idleDays: 30,
};
}
let _config: BufferConfig = defaultConfig();
export function configureBuffer(cfg: Partial<BufferConfig>): void {
_config = { ..._config, ...cfg };
}
// ---------------------------------------------------------------------------
// Global cap mutex (JS single-threaded so a flag suffices)
// ---------------------------------------------------------------------------
let _globalBusy = false;
let _globalBytes = 0; // tracked in-memory; recalculated on startup
// ---------------------------------------------------------------------------
// Per-session writer
// ---------------------------------------------------------------------------
export class BufferWriter {
readonly session: string;
private filePath: string;
private queue: Promise<void> = Promise.resolve();
private sessionBytes = 0;
private lastWriteAt = Date.now();
constructor(session: string) {
this.session = session;
this.filePath = path.join(_config.stateDir, "buffers", `${session}.buf`);
}
async open(): Promise<void> {
await fs.mkdir(path.dirname(this.filePath), { recursive: true });
// Load existing size for cap tracking
try {
const stat = await fs.stat(this.filePath);
this.sessionBytes = stat.size;
_globalBytes += stat.size;
} catch {
this.sessionBytes = 0;
}
}
/**
* Enqueue a chunk write. Writes are serialised per session.
*/
write(seq: SeqNum, data: Buffer): void {
this.queue = this.queue.then(() => this._write(seq, data));
}
private async _write(seq: SeqNum, data: Buffer): Promise<void> {
const perSessionCap = _config.perSessionMb * 1024 * 1024;
const globalCap = _config.globalGb * 1024 * 1024 * 1024;
// Free-space watchdog
try {
const { available } = await checkFreeSpace(path.dirname(this.filePath));
const freeMin = _config.freeMinGb * 1024 * 1024 * 1024;
if (available < freeMin) return; // silently drop; could emit a warning
} catch {
// If we can't check, don't block writes
}
// Cap enforcement
const recordSize = 8 + 4 + data.length;
if (
_globalBusy ||
this.sessionBytes + recordSize > perSessionCap ||
_globalBytes + recordSize > globalCap
) {
return; // drop oldest strategy: just don't write (ring via truncation not implemented yet)
}
_globalBusy = true;
try {
const header = Buffer.allocUnsafe(12);
header.writeBigUInt64BE(BigInt(seq), 0);
header.writeUInt32BE(data.length, 8);
await fs.appendFile(this.filePath, Buffer.concat([header, data]));
this.sessionBytes += recordSize;
_globalBytes += recordSize;
this.lastWriteAt = Date.now();
} finally {
_globalBusy = false;
}
}
async close(): Promise<void> {
await this.queue; // drain
}
/** Delete the buffer file and reclaim global tracking bytes. */
async delete(): Promise<void> {
await this.queue;
try {
await fs.unlink(this.filePath);
_globalBytes = Math.max(0, _globalBytes - this.sessionBytes);
this.sessionBytes = 0;
} catch {
// already gone
}
}
get idleMs(): number {
return Date.now() - this.lastWriteAt;
}
}
// ---------------------------------------------------------------------------
// Idle cleanup
// ---------------------------------------------------------------------------
/**
* Delete buffer files for sessions idle longer than idleDays.
* Safe to call periodically (e.g. on startup or daily timer).
*/
export async function cleanupIdleBuffers(
cfg: BufferConfig = _config,
): Promise<string[]> {
const dir = path.join(cfg.stateDir, "buffers");
const maxIdleMs = cfg.idleDays * 24 * 60 * 60 * 1000;
const deleted: string[] = [];
let entries: fs.Dirent[] = [];
try {
entries = await fs.readdir(dir, { withFileTypes: true });
} catch {
return deleted;
}
for (const entry of entries) {
if (!entry.name.endsWith(".buf")) continue;
const fp = path.join(dir, entry.name);
try {
const stat = await fs.stat(fp);
if (Date.now() - stat.mtimeMs > maxIdleMs) {
await fs.unlink(fp);
deleted.push(entry.name.replace(/\.buf$/, ""));
}
} catch {
// skip
}
}
return deleted;
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
/** Approximate free disk space on the filesystem containing `dir`. */
async function checkFreeSpace(dir: string): Promise<{ available: number }> {
// Node doesn't expose statvfs directly; use df -k as a fallback.
// If it fails, caller ignores the error.
const { execFile } = await import("node:child_process");
const { promisify } = await import("node:util");
const exec = promisify(execFile);
const { stdout } = await exec("df", ["-k", dir]);
const lines = stdout.trim().split("\n");
const last = lines[lines.length - 1];
const parts = last.split(/\s+/);
// df -k columns: Filesystem 1K-blocks Used Available Use% Mounted
const availKb = parseInt(parts[3], 10);
return { available: availKb * 1024 };
}

View File

@ -0,0 +1,35 @@
/**
* Monotonic sequence number generator shared by stream + buffer.
*
* Each chunk of output gets a unique, monotonically increasing seq number.
* This lets clients resume a stream from a known position (IC-1 `lastSeq`).
*
* Owner: T-1.2
*/
export type SeqNum = number; // safe JS integer, starts at 1
/**
* Per-session sequence counter.
* Create one instance per session; share between the buffer writer and the
* WebSocket broadcaster.
*/
export class SequenceCounter {
private current: SeqNum = 0;
/** Increment and return the next seq number. */
next(): SeqNum {
this.current += 1;
return this.current;
}
/** Current value without incrementing. */
peek(): SeqNum {
return this.current;
}
/** Reset (e.g. after session restart). */
reset(): void {
this.current = 0;
}
}