211 lines
6.5 KiB
TypeScript
211 lines
6.5 KiB
TypeScript
/**
|
|
* Disk ring-buffer writer.
|
|
*
|
|
* Appends chunks to a per-session file, enforcing:
|
|
* - Per-session cap: 100 MB (configurable)
|
|
* - Global cap: 1 GB across all sessions (configurable)
|
|
* - Free-space watchdog: refuse writes if free disk < 1 GB
|
|
* - Idle cleanup: sessions inactive for > 30 days are deleted
|
|
*
|
|
* File format (binary, append-only):
|
|
* Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes]
|
|
*
|
|
* Risk R1 mitigation: all writes serialised through a per-session async queue.
|
|
* Global cap protected by a module-level mutex (simple flag since JS is single-threaded).
|
|
*
|
|
* Owner: T-1.2
|
|
*/
|
|
|
|
import fs from "node:fs/promises";
|
|
import type { Dirent } from "node:fs";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
import type { SeqNum } from "../sequence.js";
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Config defaults (can be overridden; T-1.7 will wire these from config.toml)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export interface BufferConfig {
|
|
stateDir: string;
|
|
perSessionMb: number; // default 100
|
|
globalGb: number; // default 1
|
|
freeMinGb: number; // default 1
|
|
idleDays: number; // default 30
|
|
}
|
|
|
|
function defaultConfig(): BufferConfig {
|
|
return {
|
|
stateDir: path.join(os.homedir(), ".local", "share", "pi-remote"),
|
|
perSessionMb: 100,
|
|
globalGb: 1,
|
|
freeMinGb: 1,
|
|
idleDays: 30,
|
|
};
|
|
}
|
|
|
|
let _config: BufferConfig = defaultConfig();
|
|
|
|
export function configureBuffer(cfg: Partial<BufferConfig>): void {
|
|
_config = { ..._config, ...cfg };
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Global cap mutex (JS single-threaded so a flag suffices)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
let _globalBusy = false;
|
|
let _globalBytes = 0; // tracked in-memory; recalculated on startup
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Per-session writer
|
|
// ---------------------------------------------------------------------------
|
|
|
|
export class BufferWriter {
|
|
readonly session: string;
|
|
private filePath: string;
|
|
private queue: Promise<void> = Promise.resolve();
|
|
private sessionBytes = 0;
|
|
private lastWriteAt = Date.now();
|
|
|
|
constructor(session: string) {
|
|
this.session = session;
|
|
this.filePath = path.join(_config.stateDir, "buffers", `${session}.buf`);
|
|
}
|
|
|
|
async open(): Promise<void> {
|
|
await fs.mkdir(path.dirname(this.filePath), { recursive: true });
|
|
// Load existing size for cap tracking
|
|
try {
|
|
const stat = await fs.stat(this.filePath);
|
|
this.sessionBytes = stat.size;
|
|
_globalBytes += stat.size;
|
|
} catch {
|
|
this.sessionBytes = 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Enqueue a chunk write. Writes are serialised per session.
|
|
*/
|
|
write(seq: SeqNum, data: Buffer): void {
|
|
this.queue = this.queue.then(() => this._write(seq, data));
|
|
}
|
|
|
|
private async _write(seq: SeqNum, data: Buffer): Promise<void> {
|
|
const perSessionCap = _config.perSessionMb * 1024 * 1024;
|
|
const globalCap = _config.globalGb * 1024 * 1024 * 1024;
|
|
|
|
// Free-space watchdog
|
|
try {
|
|
const { available } = await checkFreeSpace(path.dirname(this.filePath));
|
|
const freeMin = _config.freeMinGb * 1024 * 1024 * 1024;
|
|
if (available < freeMin) return; // silently drop; could emit a warning
|
|
} catch {
|
|
// If we can't check, don't block writes
|
|
}
|
|
|
|
// Cap enforcement
|
|
const recordSize = 8 + 4 + data.length;
|
|
if (
|
|
_globalBusy ||
|
|
this.sessionBytes + recordSize > perSessionCap ||
|
|
_globalBytes + recordSize > globalCap
|
|
) {
|
|
return; // drop oldest strategy: just don't write (ring via truncation not implemented yet)
|
|
}
|
|
|
|
_globalBusy = true;
|
|
try {
|
|
const header = Buffer.allocUnsafe(12);
|
|
header.writeBigUInt64BE(BigInt(seq), 0);
|
|
header.writeUInt32BE(data.length, 8);
|
|
|
|
await fs.appendFile(this.filePath, Buffer.concat([header, data]));
|
|
this.sessionBytes += recordSize;
|
|
_globalBytes += recordSize;
|
|
this.lastWriteAt = Date.now();
|
|
} finally {
|
|
_globalBusy = false;
|
|
}
|
|
}
|
|
|
|
async close(): Promise<void> {
|
|
await this.queue; // drain
|
|
}
|
|
|
|
/** Delete the buffer file and reclaim global tracking bytes. */
|
|
async delete(): Promise<void> {
|
|
await this.queue;
|
|
try {
|
|
await fs.unlink(this.filePath);
|
|
_globalBytes = Math.max(0, _globalBytes - this.sessionBytes);
|
|
this.sessionBytes = 0;
|
|
} catch {
|
|
// already gone
|
|
}
|
|
}
|
|
|
|
get idleMs(): number {
|
|
return Date.now() - this.lastWriteAt;
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Idle cleanup
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/**
|
|
* Delete buffer files for sessions idle longer than idleDays.
|
|
* Safe to call periodically (e.g. on startup or daily timer).
|
|
*/
|
|
export async function cleanupIdleBuffers(
|
|
cfg: BufferConfig = _config,
|
|
): Promise<string[]> {
|
|
const dir = path.join(cfg.stateDir, "buffers");
|
|
const maxIdleMs = cfg.idleDays * 24 * 60 * 60 * 1000;
|
|
const deleted: string[] = [];
|
|
|
|
let entries: Dirent[] = [];
|
|
try {
|
|
entries = await fs.readdir(dir, { withFileTypes: true });
|
|
} catch {
|
|
return deleted;
|
|
}
|
|
|
|
for (const entry of entries) {
|
|
if (!entry.name.endsWith(".buf")) continue;
|
|
const fp = path.join(dir, entry.name);
|
|
try {
|
|
const stat = await fs.stat(fp);
|
|
if (Date.now() - stat.mtimeMs > maxIdleMs) {
|
|
await fs.unlink(fp);
|
|
deleted.push(entry.name.replace(/\.buf$/, ""));
|
|
}
|
|
} catch {
|
|
// skip
|
|
}
|
|
}
|
|
return deleted;
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/** Approximate free disk space on the filesystem containing `dir`. */
|
|
async function checkFreeSpace(dir: string): Promise<{ available: number }> {
|
|
// Node doesn't expose statvfs directly; use df -k as a fallback.
|
|
// If it fails, caller ignores the error.
|
|
const { execFile } = await import("node:child_process");
|
|
const { promisify } = await import("node:util");
|
|
const exec = promisify(execFile);
|
|
const { stdout } = await exec("df", ["-k", dir]);
|
|
const lines = stdout.trim().split("\n");
|
|
const last = lines[lines.length - 1];
|
|
const parts = last.split(/\s+/);
|
|
// df -k columns: Filesystem 1K-blocks Used Available Use% Mounted
|
|
const availKb = parseInt(parts[3], 10);
|
|
return { available: availKb * 1024 };
|
|
}
|