470 lines
15 KiB
TypeScript
470 lines
15 KiB
TypeScript
import { spawn } from "node:child_process";
|
|
import * as fs from "node:fs";
|
|
import * as os from "node:os";
|
|
import * as path from "node:path";
|
|
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
|
|
import type { Message } from "@earendil-works/pi-ai";
|
|
import { discoverAgents, type AgentConfig, type AgentScope } from "./agents.js";
|
|
import type { AbortResult, CollectResult, DispatchResult, FanoutJob, JobUsage, StatusResult } from "./types.js";
|
|
|
|
function getPiInvocation(args: string[]): { command: string; args: string[] } {
|
|
const currentScript = process.argv[1];
|
|
const isBunVirtualScript = currentScript?.startsWith("/$bunfs/root/");
|
|
if (currentScript && !isBunVirtualScript && fs.existsSync(currentScript)) {
|
|
return { command: process.execPath, args: [currentScript, ...args] };
|
|
}
|
|
const execName = path.basename(process.execPath).toLowerCase();
|
|
const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName);
|
|
if (!isGenericRuntime) {
|
|
return { command: process.execPath, args };
|
|
}
|
|
return { command: "pi", args };
|
|
}
|
|
|
|
function getFinalOutput(messages: Message[]): string {
|
|
for (let i = messages.length - 1; i >= 0; i--) {
|
|
const msg = messages[i];
|
|
if (msg.role === "assistant") {
|
|
for (const part of msg.content) {
|
|
if (part.type === "text") return part.text;
|
|
}
|
|
}
|
|
}
|
|
return "";
|
|
}
|
|
|
|
function generateId(): string {
|
|
return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
|
|
}
|
|
|
|
function jobDir(dataDir: string, jobId: string): string {
|
|
return path.join(dataDir, "jobs", jobId);
|
|
}
|
|
|
|
export class FanoutController {
|
|
private pi: ExtensionAPI;
|
|
private dataDir: string;
|
|
private jobs = new Map<string, FanoutJob>();
|
|
private poller: NodeJS.Timeout | null = null;
|
|
private agents: AgentConfig[] = [];
|
|
private agentScope: AgentScope = "user";
|
|
private projectAgentsDir: string | null = null;
|
|
private defaultCwd: string;
|
|
|
|
constructor(pi: ExtensionAPI, defaultCwd: string) {
|
|
this.pi = pi;
|
|
this.defaultCwd = defaultCwd;
|
|
this.dataDir = path.join(os.homedir(), ".pi", "fanout");
|
|
fs.mkdirSync(path.join(this.dataDir, "jobs"), { recursive: true });
|
|
this.rehydrate();
|
|
this.startPoller();
|
|
}
|
|
|
|
private rehydrate() {
|
|
const jobsBase = path.join(this.dataDir, "jobs");
|
|
if (!fs.existsSync(jobsBase)) return;
|
|
const dirs = fs.readdirSync(jobsBase, { withFileTypes: true });
|
|
for (const dir of dirs) {
|
|
if (!dir.isDirectory()) continue;
|
|
const metaPath = path.join(jobsBase, dir.name, "meta.json");
|
|
if (!fs.existsSync(metaPath)) continue;
|
|
try {
|
|
const raw = fs.readFileSync(metaPath, "utf-8");
|
|
const job = JSON.parse(raw) as FanoutJob;
|
|
if (job.status === "running" && job.pid) {
|
|
try {
|
|
process.kill(job.pid, 0);
|
|
} catch {
|
|
job.status = "failed";
|
|
job.exitCode = job.exitCode ?? 1;
|
|
}
|
|
}
|
|
this.jobs.set(job.id, job);
|
|
} catch {
|
|
// ignore corrupt meta
|
|
}
|
|
}
|
|
}
|
|
|
|
private persist(job: FanoutJob) {
|
|
const dir = jobDir(this.dataDir, job.id);
|
|
fs.mkdirSync(dir, { recursive: true });
|
|
fs.writeFileSync(path.join(dir, "meta.json"), JSON.stringify(job, null, 2));
|
|
}
|
|
|
|
discoverAgents(cwd: string, scope: AgentScope) {
|
|
const result = discoverAgents(cwd, scope);
|
|
this.agents = result.agents;
|
|
this.agentScope = scope;
|
|
this.projectAgentsDir = result.projectAgentsDir;
|
|
}
|
|
|
|
dispatch(
|
|
agentName: string,
|
|
task: string,
|
|
cwd: string | undefined,
|
|
model?: string,
|
|
tools?: string[],
|
|
agentScope?: AgentScope,
|
|
): DispatchResult {
|
|
if (agentScope) this.discoverAgents(cwd ?? this.defaultCwd, agentScope);
|
|
const agent = this.agents.find((a) => a.name === agentName);
|
|
if (!agent) {
|
|
const available = this.agents.map((a) => `"${a.name}"`).join(", ") || "none";
|
|
throw new Error(`Unknown agent: "${agentName}". Available: ${available}.`);
|
|
}
|
|
|
|
const jobId = generateId();
|
|
const dir = jobDir(this.dataDir, jobId);
|
|
fs.mkdirSync(dir, { recursive: true });
|
|
|
|
const job: FanoutJob = {
|
|
id: jobId,
|
|
createdAt: Date.now(),
|
|
agent: agentName,
|
|
agentSource: agent.source,
|
|
task,
|
|
cwd: cwd ?? this.defaultCwd,
|
|
model: model || agent.model,
|
|
tools: tools || agent.tools,
|
|
status: "queued",
|
|
outputFile: path.join(dir, "output.jsonl"),
|
|
metaFile: path.join(dir, "meta.json"),
|
|
notified: false,
|
|
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
|
|
};
|
|
|
|
this.jobs.set(jobId, job);
|
|
this.persist(job);
|
|
this.spawnJob(job, agent);
|
|
|
|
return { jobId, status: "running", message: `Dispatched job ${jobId} to ${agentName}` };
|
|
}
|
|
|
|
private spawnJob(job: FanoutJob, agent: AgentConfig) {
|
|
const args: string[] = ["--mode", "json", "-p", "--no-session"];
|
|
if (job.model) args.push("--model", job.model);
|
|
if (job.tools && job.tools.length > 0) args.push("--tools", job.tools.join(","));
|
|
|
|
let tmpPromptPath: string | null = null;
|
|
|
|
if (agent.systemPrompt.trim()) {
|
|
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "pi-fanout-"));
|
|
const safeName = agent.name.replace(/[^\w.-]+/g, "_");
|
|
tmpPromptPath = path.join(tmpDir, `prompt-${safeName}.md`);
|
|
fs.writeFileSync(tmpPromptPath, agent.systemPrompt, { encoding: "utf-8", mode: 0o600 });
|
|
args.push("--append-system-prompt", tmpPromptPath);
|
|
}
|
|
|
|
args.push(`Task: ${job.task}`);
|
|
|
|
const invocation = getPiInvocation(args);
|
|
const proc = spawn(invocation.command, invocation.args, {
|
|
cwd: job.cwd,
|
|
detached: true,
|
|
shell: false,
|
|
stdio: ["ignore", "pipe", "pipe"],
|
|
});
|
|
|
|
job.pid = proc.pid;
|
|
job.status = "running";
|
|
this.persist(job);
|
|
|
|
const outputFile = fs.openSync(job.outputFile, "w");
|
|
let stdoutBuffer = "";
|
|
let stderrBuffer = "";
|
|
|
|
proc.stdout.on("data", (data: Buffer) => {
|
|
stdoutBuffer += data.toString();
|
|
const lines = stdoutBuffer.split("\n");
|
|
stdoutBuffer = lines.pop() ?? "";
|
|
for (const line of lines) {
|
|
if (!line.trim()) continue;
|
|
fs.writeSync(outputFile, line + "\n");
|
|
this.processOutputLine(job, line);
|
|
}
|
|
});
|
|
|
|
proc.stderr.on("data", (data: Buffer) => {
|
|
stderrBuffer += data.toString();
|
|
});
|
|
|
|
proc.on("close", (code) => {
|
|
if (stdoutBuffer.trim()) {
|
|
fs.writeSync(outputFile, stdoutBuffer + "\n");
|
|
this.processOutputLine(job, stdoutBuffer);
|
|
}
|
|
try { fs.closeSync(outputFile); } catch {}
|
|
|
|
if (job.status !== "aborted") {
|
|
job.exitCode = code ?? 0;
|
|
job.status = code === 0 ? "done" : "failed";
|
|
}
|
|
if (stderrBuffer.trim()) {
|
|
job.errorMessage = stderrBuffer.trim().slice(0, 500);
|
|
}
|
|
this.persist(job);
|
|
if (tmpPromptPath) {
|
|
try { fs.unlinkSync(tmpPromptPath); } catch {}
|
|
try { fs.rmdirSync(path.dirname(tmpPromptPath)); } catch {}
|
|
}
|
|
});
|
|
|
|
proc.on("error", () => {
|
|
try { fs.closeSync(outputFile); } catch {}
|
|
if (job.status !== "aborted") {
|
|
job.status = "failed";
|
|
job.exitCode = 1;
|
|
job.errorMessage = job.errorMessage || "Failed to spawn process.";
|
|
}
|
|
this.persist(job);
|
|
});
|
|
|
|
proc.unref();
|
|
}
|
|
|
|
private readJobOutput(job: FanoutJob): string {
|
|
if (!fs.existsSync(job.outputFile)) return "";
|
|
try {
|
|
const lines = fs
|
|
.readFileSync(job.outputFile, "utf-8")
|
|
.split("\n")
|
|
.filter(Boolean);
|
|
const messages: Message[] = [];
|
|
for (const line of lines) {
|
|
try {
|
|
const event = JSON.parse(line);
|
|
if (event.type === "message_end" && event.message) messages.push(event.message as Message);
|
|
if (event.type === "tool_result_end" && event.message) messages.push(event.message as Message);
|
|
} catch {}
|
|
}
|
|
return getFinalOutput(messages);
|
|
} catch {
|
|
return "";
|
|
}
|
|
}
|
|
|
|
private getJobPreview(job: FanoutJob, maxChars = 500): string {
|
|
// Use cached preview if available (updated in processOutputLine)
|
|
const output = job.lastPreview || this.readJobOutput(job);
|
|
if (!output) return "";
|
|
if (output.length <= maxChars) return output;
|
|
return output.slice(0, maxChars) + "\n… (truncated)";
|
|
}
|
|
|
|
private processOutputLine(job: FanoutJob, line: string) {
|
|
let event: any;
|
|
try { event = JSON.parse(line); } catch { return; }
|
|
if (event.type === "message_end" && event.message) {
|
|
const msg = event.message as Message;
|
|
if (msg.role === "assistant") {
|
|
// Cache latest assistant text for live preview
|
|
for (const part of msg.content) {
|
|
if (part.type === "text") {
|
|
job.lastPreview = part.text.length > 500 ? part.text.slice(0, 500) + "…" : part.text;
|
|
break;
|
|
}
|
|
}
|
|
job.usage.turns++;
|
|
const usage = msg.usage;
|
|
if (usage) {
|
|
job.usage.input += usage.input || 0;
|
|
job.usage.output += usage.output || 0;
|
|
job.usage.cacheRead += usage.cacheRead || 0;
|
|
job.usage.cacheWrite += usage.cacheWrite || 0;
|
|
job.usage.cost += usage.cost?.total || 0;
|
|
}
|
|
if (!job.modelUsed && msg.model) job.modelUsed = msg.model;
|
|
if (msg.stopReason) job.stopReason = msg.stopReason;
|
|
if (msg.errorMessage) job.errorMessage = msg.errorMessage;
|
|
|
|
this.persist(job);
|
|
}
|
|
}
|
|
// Also cache assistant text from tool result events
|
|
if (event.type === "tool_result_end" && event.message) {
|
|
const msg = event.message as Message;
|
|
if (msg.role === "assistant") {
|
|
for (const part of msg.content) {
|
|
if (part.type === "text") {
|
|
job.lastPreview = part.text.length > 500 ? part.text.slice(0, 500) + "…" : part.text;
|
|
this.persist(job);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private startPoller() {
|
|
if (this.poller) return;
|
|
this.poller = setInterval(() => {
|
|
this.poll();
|
|
this.checkNotifications();
|
|
}, 2000);
|
|
}
|
|
|
|
stopPoller() {
|
|
if (this.poller) {
|
|
clearInterval(this.poller);
|
|
this.poller = null;
|
|
}
|
|
}
|
|
|
|
private poll() {
|
|
for (const job of this.jobs.values()) {
|
|
if (job.status !== "running" || !job.pid) continue;
|
|
try {
|
|
process.kill(job.pid, 0);
|
|
} catch {
|
|
// Process is gone but we didn't get close event yet (race). Mark failed if not already updated.
|
|
if (job.status === "running") {
|
|
job.status = "failed";
|
|
job.exitCode = job.exitCode ?? 1;
|
|
this.persist(job);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
private checkNotifications() {
|
|
for (const job of this.jobs.values()) {
|
|
if (job.status === "running" || job.status === "queued") continue;
|
|
if (job.notified) continue;
|
|
job.notified = true;
|
|
this.persist(job);
|
|
const outcome = job.status === "done" ? "completed" : `failed (exit ${job.exitCode ?? "?"})`;
|
|
const preview = this.getJobPreview(job, 2000);
|
|
try {
|
|
const parts = [
|
|
`Fanout job \`${job.id}\` (${job.agent}) ${outcome}.`,
|
|
];
|
|
if (preview) {
|
|
parts.push(`\n\`\`\`\n${preview}\n\`\`\``);
|
|
}
|
|
if (preview && preview.length >= 2000) {
|
|
parts.push("\n*(output truncated — run `fanout_collect` for full result)*");
|
|
}
|
|
this.pi.sendUserMessage(parts.join(""), { deliverAs: "followUp" });
|
|
} catch {
|
|
// If sendUserMessage fails (e.g. no active session), ignore.
|
|
}
|
|
}
|
|
}
|
|
|
|
status(jobIds?: string[]): StatusResult {
|
|
const ids = jobIds?.length ? jobIds : Array.from(this.jobs.keys());
|
|
const jobs: StatusResult["jobs"] = [];
|
|
for (const id of ids) {
|
|
const job = this.jobs.get(id);
|
|
if (!job) continue;
|
|
jobs.push({
|
|
id: job.id,
|
|
status: job.status,
|
|
agent: job.agent,
|
|
task: job.task,
|
|
exitCode: job.exitCode,
|
|
pid: job.pid,
|
|
turns: job.usage.turns,
|
|
cost: job.usage.cost,
|
|
preview: this.getJobPreview(job, 300),
|
|
});
|
|
}
|
|
return { jobs };
|
|
}
|
|
|
|
collect(jobIds: string[]): CollectResult {
|
|
const results: CollectResult["results"] = [];
|
|
for (const id of jobIds) {
|
|
const job = this.jobs.get(id);
|
|
if (!job) {
|
|
results.push({
|
|
id,
|
|
status: "failed",
|
|
output: "",
|
|
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
|
|
errorMessage: "Job not found.",
|
|
});
|
|
continue;
|
|
}
|
|
if (job.status === "running" || job.status === "queued") {
|
|
results.push({
|
|
id,
|
|
status: job.status,
|
|
output: "",
|
|
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
|
|
errorMessage: "Job still running.",
|
|
});
|
|
continue;
|
|
}
|
|
|
|
let output = "";
|
|
if (fs.existsSync(job.outputFile)) {
|
|
try {
|
|
const lines = fs.readFileSync(job.outputFile, "utf-8").split("\n").filter(Boolean);
|
|
const messages: Message[] = [];
|
|
for (const line of lines) {
|
|
try {
|
|
const event = JSON.parse(line);
|
|
if (event.type === "message_end" && event.message) messages.push(event.message as Message);
|
|
if (event.type === "tool_result_end" && event.message) messages.push(event.message as Message);
|
|
} catch {}
|
|
}
|
|
output = getFinalOutput(messages);
|
|
} catch {}
|
|
}
|
|
|
|
results.push({
|
|
id,
|
|
status: job.status,
|
|
output,
|
|
exitCode: job.exitCode,
|
|
usage: job.usage,
|
|
modelUsed: job.modelUsed,
|
|
errorMessage: job.errorMessage,
|
|
});
|
|
}
|
|
return { results };
|
|
}
|
|
|
|
abort(jobIds: string[]): AbortResult {
|
|
const aborted: string[] = [];
|
|
const notFound: string[] = [];
|
|
const alreadyDone: string[] = [];
|
|
for (const id of jobIds) {
|
|
const job = this.jobs.get(id);
|
|
if (!job) { notFound.push(id); continue; }
|
|
if (job.status !== "running" && job.status !== "queued") {
|
|
alreadyDone.push(id);
|
|
continue;
|
|
}
|
|
if (job.pid) {
|
|
try {
|
|
process.kill(job.pid, "SIGTERM");
|
|
setTimeout(() => {
|
|
try { process.kill(job.pid!, "SIGKILL"); } catch {}
|
|
}, 5000);
|
|
} catch {}
|
|
}
|
|
job.status = "aborted";
|
|
this.persist(job);
|
|
aborted.push(id);
|
|
}
|
|
return { aborted, notFound, alreadyDone };
|
|
}
|
|
|
|
cleanupOldJobs(maxAgeMs = 24 * 60 * 60 * 1000) {
|
|
const now = Date.now();
|
|
for (const [id, job] of this.jobs.entries()) {
|
|
if (job.status === "running" || job.status === "queued") continue;
|
|
if (now - job.createdAt > maxAgeMs) {
|
|
const dir = jobDir(this.dataDir, id);
|
|
try {
|
|
fs.rmSync(dir, { recursive: true, force: true });
|
|
} catch {}
|
|
this.jobs.delete(id);
|
|
}
|
|
}
|
|
}
|
|
}
|