92 lines
2.3 KiB
TypeScript
92 lines
2.3 KiB
TypeScript
/**
|
|
* Disk ring-buffer reader.
|
|
*
|
|
* Reads chunks from a session buffer file, optionally starting from a
|
|
* given seq number. Used by the stream route for reconnect replay (T-1.5).
|
|
*
|
|
* File format (mirrors writer.ts):
|
|
* Each record: [seq: 8 bytes BE uint64] [length: 4 bytes BE uint32] [data: N bytes]
|
|
*
|
|
* Owner: T-1.2
|
|
*/
|
|
|
|
import fs from "node:fs";
|
|
import os from "node:os";
|
|
import path from "node:path";
|
|
|
|
export interface BufferChunk {
|
|
seq: number;
|
|
data: Buffer;
|
|
}
|
|
|
|
export interface ReaderConfig {
|
|
stateDir?: string;
|
|
}
|
|
|
|
function stateDir(cfg?: ReaderConfig): string {
|
|
return (
|
|
cfg?.stateDir ?? path.join(os.homedir(), ".local", "share", "pi-remote")
|
|
);
|
|
}
|
|
|
|
function bufferPath(session: string, cfg?: ReaderConfig): string {
|
|
return path.join(stateDir(cfg), "buffers", `${session}.buf`);
|
|
}
|
|
|
|
/**
|
|
* Read all chunks from a session buffer, optionally starting after `afterSeq`.
|
|
*
|
|
* Returns chunks in seq order. If the file doesn't exist, returns [].
|
|
* Stops at the first parse error (truncated file at end is tolerated).
|
|
*/
|
|
export function readChunks(
|
|
session: string,
|
|
opts: { afterSeq?: number; cfg?: ReaderConfig } = {},
|
|
): BufferChunk[] {
|
|
const { afterSeq = 0, cfg } = opts;
|
|
const fp = bufferPath(session, cfg);
|
|
|
|
let buf: Buffer;
|
|
try {
|
|
buf = fs.readFileSync(fp);
|
|
} catch {
|
|
return [];
|
|
}
|
|
|
|
const chunks: BufferChunk[] = [];
|
|
let offset = 0;
|
|
|
|
while (offset + 12 <= buf.length) {
|
|
const seqBig = buf.readBigUInt64BE(offset);
|
|
const seq = Number(seqBig);
|
|
const length = buf.readUInt32BE(offset + 8);
|
|
offset += 12;
|
|
|
|
if (offset + length > buf.length) break; // truncated record at end
|
|
|
|
if (seq > afterSeq) {
|
|
chunks.push({ seq, data: buf.slice(offset, offset + length) });
|
|
}
|
|
offset += length;
|
|
}
|
|
|
|
return chunks;
|
|
}
|
|
|
|
/**
|
|
* Read chunks as an async generator (streaming, for large buffers).
|
|
* Yields one chunk at a time after `afterSeq`.
|
|
*/
|
|
export async function* streamChunks(
|
|
session: string,
|
|
opts: { afterSeq?: number; cfg?: ReaderConfig } = {},
|
|
): AsyncGenerator<BufferChunk> {
|
|
// Simple implementation: read all and yield. For large files T-1.5 can
|
|
// switch to a streaming file read if needed.
|
|
const { afterSeq = 0, cfg } = opts;
|
|
const chunks = readChunks(session, { afterSeq, cfg });
|
|
for (const chunk of chunks) {
|
|
yield chunk;
|
|
}
|
|
}
|