This repository has been archived on 2026-05-15. You can view files and clone it, but cannot push or open issues or pull requests.
pi-fanout/controller.ts

447 lines
14 KiB
TypeScript

import { spawn } from "node:child_process";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import type { Message } from "@earendil-works/pi-ai";
import { discoverAgents, type AgentConfig, type AgentScope } from "./agents.js";
import type { AbortResult, CollectResult, DispatchResult, FanoutJob, JobUsage, StatusResult } from "./types.js";
function getPiInvocation(args: string[]): { command: string; args: string[] } {
const currentScript = process.argv[1];
const isBunVirtualScript = currentScript?.startsWith("/$bunfs/root/");
if (currentScript && !isBunVirtualScript && fs.existsSync(currentScript)) {
return { command: process.execPath, args: [currentScript, ...args] };
}
const execName = path.basename(process.execPath).toLowerCase();
const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName);
if (!isGenericRuntime) {
return { command: process.execPath, args };
}
return { command: "pi", args };
}
function getFinalOutput(messages: Message[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === "assistant") {
for (const part of msg.content) {
if (part.type === "text") return part.text;
}
}
}
return "";
}
function generateId(): string {
return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
}
function jobDir(dataDir: string, jobId: string): string {
return path.join(dataDir, "jobs", jobId);
}
export class FanoutController {
private pi: ExtensionAPI;
private dataDir: string;
private jobs = new Map<string, FanoutJob>();
private poller: NodeJS.Timeout | null = null;
private agents: AgentConfig[] = [];
private agentScope: AgentScope = "user";
private projectAgentsDir: string | null = null;
private defaultCwd: string;
constructor(pi: ExtensionAPI, defaultCwd: string) {
this.pi = pi;
this.defaultCwd = defaultCwd;
this.dataDir = path.join(os.homedir(), ".pi", "fanout");
fs.mkdirSync(path.join(this.dataDir, "jobs"), { recursive: true });
this.rehydrate();
this.startPoller();
}
private rehydrate() {
const jobsBase = path.join(this.dataDir, "jobs");
if (!fs.existsSync(jobsBase)) return;
const dirs = fs.readdirSync(jobsBase, { withFileTypes: true });
for (const dir of dirs) {
if (!dir.isDirectory()) continue;
const metaPath = path.join(jobsBase, dir.name, "meta.json");
if (!fs.existsSync(metaPath)) continue;
try {
const raw = fs.readFileSync(metaPath, "utf-8");
const job = JSON.parse(raw) as FanoutJob;
if (job.status === "running" && job.pid) {
try {
process.kill(job.pid, 0);
} catch {
job.status = "failed";
job.exitCode = job.exitCode ?? 1;
}
}
this.jobs.set(job.id, job);
} catch {
// ignore corrupt meta
}
}
}
private persist(job: FanoutJob) {
const dir = jobDir(this.dataDir, job.id);
fs.mkdirSync(dir, { recursive: true });
fs.writeFileSync(path.join(dir, "meta.json"), JSON.stringify(job, null, 2));
}
discoverAgents(cwd: string, scope: AgentScope) {
const result = discoverAgents(cwd, scope);
this.agents = result.agents;
this.agentScope = scope;
this.projectAgentsDir = result.projectAgentsDir;
}
dispatch(
agentName: string,
task: string,
cwd: string | undefined,
model?: string,
tools?: string[],
agentScope?: AgentScope,
): DispatchResult {
if (agentScope) this.discoverAgents(cwd ?? this.defaultCwd, agentScope);
const agent = this.agents.find((a) => a.name === agentName);
if (!agent) {
const available = this.agents.map((a) => `"${a.name}"`).join(", ") || "none";
throw new Error(`Unknown agent: "${agentName}". Available: ${available}.`);
}
const jobId = generateId();
const dir = jobDir(this.dataDir, jobId);
fs.mkdirSync(dir, { recursive: true });
const job: FanoutJob = {
id: jobId,
createdAt: Date.now(),
agent: agentName,
agentSource: agent.source,
task,
cwd: cwd ?? this.defaultCwd,
model: model || agent.model,
tools: tools || agent.tools,
status: "queued",
outputFile: path.join(dir, "output.jsonl"),
metaFile: path.join(dir, "meta.json"),
notified: false,
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
};
this.jobs.set(jobId, job);
this.persist(job);
this.spawnJob(job, agent);
return { jobId, status: "running", message: `Dispatched job ${jobId} to ${agentName}` };
}
private spawnJob(job: FanoutJob, agent: AgentConfig) {
const args: string[] = ["--mode", "json", "-p", "--no-session"];
if (job.model) args.push("--model", job.model);
if (job.tools && job.tools.length > 0) args.push("--tools", job.tools.join(","));
let tmpPromptPath: string | null = null;
if (agent.systemPrompt.trim()) {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "pi-fanout-"));
const safeName = agent.name.replace(/[^\w.-]+/g, "_");
tmpPromptPath = path.join(tmpDir, `prompt-${safeName}.md`);
fs.writeFileSync(tmpPromptPath, agent.systemPrompt, { encoding: "utf-8", mode: 0o600 });
args.push("--append-system-prompt", tmpPromptPath);
}
args.push(`Task: ${job.task}`);
const invocation = getPiInvocation(args);
const proc = spawn(invocation.command, invocation.args, {
cwd: job.cwd,
detached: true,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
job.pid = proc.pid;
job.status = "running";
this.persist(job);
const outputFile = fs.openSync(job.outputFile, "w");
let stdoutBuffer = "";
let stderrBuffer = "";
proc.stdout.on("data", (data: Buffer) => {
stdoutBuffer += data.toString();
const lines = stdoutBuffer.split("\n");
stdoutBuffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.trim()) continue;
fs.writeSync(outputFile, line + "\n");
this.processOutputLine(job, line);
}
});
proc.stderr.on("data", (data: Buffer) => {
stderrBuffer += data.toString();
});
proc.on("close", (code) => {
if (stdoutBuffer.trim()) {
fs.writeSync(outputFile, stdoutBuffer + "\n");
this.processOutputLine(job, stdoutBuffer);
}
try { fs.closeSync(outputFile); } catch {}
if (job.status !== "aborted") {
job.exitCode = code ?? 0;
job.status = code === 0 ? "done" : "failed";
}
if (stderrBuffer.trim()) {
job.errorMessage = stderrBuffer.trim().slice(0, 500);
}
this.persist(job);
if (tmpPromptPath) {
try { fs.unlinkSync(tmpPromptPath); } catch {}
try { fs.rmdirSync(path.dirname(tmpPromptPath)); } catch {}
}
});
proc.on("error", () => {
try { fs.closeSync(outputFile); } catch {}
if (job.status !== "aborted") {
job.status = "failed";
job.exitCode = 1;
job.errorMessage = job.errorMessage || "Failed to spawn process.";
}
this.persist(job);
});
proc.unref();
}
private readJobOutput(job: FanoutJob): string {
if (!fs.existsSync(job.outputFile)) return "";
try {
const lines = fs
.readFileSync(job.outputFile, "utf-8")
.split("\n")
.filter(Boolean);
const messages: Message[] = [];
for (const line of lines) {
try {
const event = JSON.parse(line);
if (event.type === "message_end" && event.message) messages.push(event.message as Message);
if (event.type === "tool_result_end" && event.message) messages.push(event.message as Message);
} catch {}
}
return getFinalOutput(messages);
} catch {
return "";
}
}
private getJobPreview(job: FanoutJob, maxChars = 500): string {
const output = this.readJobOutput(job);
if (!output) return "";
if (output.length <= maxChars) return output;
return output.slice(0, maxChars) + "\n… (truncated)";
}
private processOutputLine(job: FanoutJob, line: string) {
let event: any;
try { event = JSON.parse(line); } catch { return; }
if (event.type === "message_end" && event.message) {
const msg = event.message as Message;
if (msg.role === "assistant") {
job.usage.turns++;
const usage = msg.usage;
if (usage) {
job.usage.input += usage.input || 0;
job.usage.output += usage.output || 0;
job.usage.cacheRead += usage.cacheRead || 0;
job.usage.cacheWrite += usage.cacheWrite || 0;
job.usage.cost += usage.cost?.total || 0;
}
if (!job.modelUsed && msg.model) job.modelUsed = msg.model;
if (msg.stopReason) job.stopReason = msg.stopReason;
if (msg.errorMessage) job.errorMessage = msg.errorMessage;
}
}
}
private startPoller() {
if (this.poller) return;
this.poller = setInterval(() => {
this.poll();
this.checkNotifications();
}, 2000);
}
stopPoller() {
if (this.poller) {
clearInterval(this.poller);
this.poller = null;
}
}
private poll() {
for (const job of this.jobs.values()) {
if (job.status !== "running" || !job.pid) continue;
try {
process.kill(job.pid, 0);
} catch {
// Process is gone but we didn't get close event yet (race). Mark failed if not already updated.
if (job.status === "running") {
job.status = "failed";
job.exitCode = job.exitCode ?? 1;
this.persist(job);
}
}
}
}
private checkNotifications() {
for (const job of this.jobs.values()) {
if (job.status === "running" || job.status === "queued") continue;
if (job.notified) continue;
job.notified = true;
this.persist(job);
const outcome = job.status === "done" ? "completed" : `failed (exit ${job.exitCode ?? "?"})`;
const preview = this.getJobPreview(job, 2000);
try {
const parts = [
`Fanout job \`${job.id}\` (${job.agent}) ${outcome}.`,
];
if (preview) {
parts.push(`\n\`\`\`\n${preview}\n\`\`\``);
}
if (preview && preview.length >= 2000) {
parts.push("\n*(output truncated — run `fanout_collect` for full result)*");
}
this.pi.sendUserMessage(parts.join(""), { deliverAs: "followUp" });
} catch {
// If sendUserMessage fails (e.g. no active session), ignore.
}
}
}
status(jobIds?: string[]): StatusResult {
const ids = jobIds?.length ? jobIds : Array.from(this.jobs.keys());
const jobs: StatusResult["jobs"] = [];
for (const id of ids) {
const job = this.jobs.get(id);
if (!job) continue;
jobs.push({
id: job.id,
status: job.status,
agent: job.agent,
task: job.task,
exitCode: job.exitCode,
pid: job.pid,
turns: job.usage.turns,
cost: job.usage.cost,
preview: this.getJobPreview(job, 300),
});
}
return { jobs };
}
collect(jobIds: string[]): CollectResult {
const results: CollectResult["results"] = [];
for (const id of jobIds) {
const job = this.jobs.get(id);
if (!job) {
results.push({
id,
status: "failed",
output: "",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
errorMessage: "Job not found.",
});
continue;
}
if (job.status === "running" || job.status === "queued") {
results.push({
id,
status: job.status,
output: "",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
errorMessage: "Job still running.",
});
continue;
}
let output = "";
if (fs.existsSync(job.outputFile)) {
try {
const lines = fs.readFileSync(job.outputFile, "utf-8").split("\n").filter(Boolean);
const messages: Message[] = [];
for (const line of lines) {
try {
const event = JSON.parse(line);
if (event.type === "message_end" && event.message) messages.push(event.message as Message);
if (event.type === "tool_result_end" && event.message) messages.push(event.message as Message);
} catch {}
}
output = getFinalOutput(messages);
} catch {}
}
results.push({
id,
status: job.status,
output,
exitCode: job.exitCode,
usage: job.usage,
modelUsed: job.modelUsed,
errorMessage: job.errorMessage,
});
}
return { results };
}
abort(jobIds: string[]): AbortResult {
const aborted: string[] = [];
const notFound: string[] = [];
const alreadyDone: string[] = [];
for (const id of jobIds) {
const job = this.jobs.get(id);
if (!job) { notFound.push(id); continue; }
if (job.status !== "running" && job.status !== "queued") {
alreadyDone.push(id);
continue;
}
if (job.pid) {
try {
process.kill(job.pid, "SIGTERM");
setTimeout(() => {
try { process.kill(job.pid!, "SIGKILL"); } catch {}
}, 5000);
} catch {}
}
job.status = "aborted";
this.persist(job);
aborted.push(id);
}
return { aborted, notFound, alreadyDone };
}
cleanupOldJobs(maxAgeMs = 24 * 60 * 60 * 1000) {
const now = Date.now();
for (const [id, job] of this.jobs.entries()) {
if (job.status === "running" || job.status === "queued") continue;
if (now - job.createdAt > maxAgeMs) {
const dir = jobDir(this.dataDir, id);
try {
fs.rmSync(dir, { recursive: true, force: true });
} catch {}
this.jobs.delete(id);
}
}
}
}