/** * Disk ring-buffer reader. * * Reads chunks from a session buffer file, optionally starting from a * given seq number. Used by the stream route for reconnect replay (T-1.5). * * File format (mirrors writer.ts): * Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes] * * Owner: T-1.2 */ import fs from "node:fs"; import os from "node:os"; import path from "node:path"; export interface BufferChunk { seq: number; data: Buffer; } export interface ReaderConfig { stateDir?: string; } function stateDir(cfg?: ReaderConfig): string { return ( cfg?.stateDir ?? path.join(os.homedir(), ".local", "share", "pi-remote") ); } function bufferPath(session: string, cfg?: ReaderConfig): string { return path.join(stateDir(cfg), "buffers", `${session}.buf`); } /** * Read all chunks from a session buffer, optionally starting after `afterSeq`. * * Returns chunks in seq order. If the file doesn't exist, returns []. * Stops at the first parse error (truncated file at end is tolerated). */ export function readChunks( session: string, opts: { afterSeq?: number; cfg?: ReaderConfig } = {}, ): BufferChunk[] { const { afterSeq = 0, cfg } = opts; const fp = bufferPath(session, cfg); let buf: Buffer; try { buf = fs.readFileSync(fp); } catch { return []; } const chunks: BufferChunk[] = []; let offset = 0; while (offset + 12 <= buf.length) { const seqBig = buf.readBigUInt64BE(offset); const seq = Number(seqBig); const length = buf.readUInt32BE(offset + 8); offset += 12; if (offset + length > buf.length) break; // truncated record at end if (seq > afterSeq) { chunks.push({ seq, data: buf.slice(offset, offset + length) }); } offset += length; } return chunks; } /** * Read chunks as an async generator (streaming, for large buffers). * Yields one chunk at a time after `afterSeq`. */ export async function* streamChunks( session: string, opts: { afterSeq?: number; cfg?: ReaderConfig } = {}, ): AsyncGenerator { // Simple implementation: read all and yield. For large files T-1.5 can // switch to a streaming file read if needed. const { afterSeq = 0, cfg } = opts; const chunks = readChunks(session, { afterSeq, cfg }); for (const chunk of chunks) { yield chunk; } }