feat: non-blocking async agent fanout extension

This commit is contained in:
jay 2026-05-15 05:01:14 +02:00
commit 341e238669
7 changed files with 931 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
node_modules/
*.log
.DS_Store

129
README.md Normal file
View File

@ -0,0 +1,129 @@
# pi-fanout
Non-blocking async agent fanout for [pi](https://pi.earendil.dev).
## Problem
The built-in `subagent` tool is powerful, but its `execute()` blocks until **all**
dispatched agents finish. While parallel tasks run concurrently internally, the main
pi session is frozen waiting for the final result.
## Solution
`pi-fanout` turns subagent dispatch into a **true async job queue**:
- `fanout_dispatch` — returns immediately with job IDs; agents run as detached processes
- `fanout_status` — poll running/done/failed counts at any time
- `fanout_collect` — retrieve final output from completed jobs
- `fanout_abort` — kill running jobs on demand
The main pi session stays unblocked. You can do other work, dispatch more jobs,
and collect results whenever theyre ready. When a background job finishes, the
extension sends a `followUp` message into the session so you know its time to
collect.
## Architecture
```
pi main process
├─ pi-fanout extension
│ ├─ JobManager (in-memory + disk state in ~/.pi/fanout/jobs/)
│ └─ Poller (every 2s: check PIDs, notify on completion)
├─ detached pi child (Agent A) ── writes output to job dir
├─ detached pi child (Agent B) ── writes output to job dir
└─ detached pi child (Agent C) ── writes output to job dir
```
Jobs survive pi restarts because:
1. Child processes are **detached** from the parent
2. State is persisted as `meta.json` + `output.jsonl` per job
3. On startup the extension rehydrates old jobs and checks if their PIDs are still alive
## Install
```bash
pi use git:git.vpsj.de/jay/pi-fanout
```
Or clone into your extensions directory and add it to `~/.pi/extensions.json`.
## Usage
```
> fanout_dispatch tasks=[{agent:"worker", task:"Refactor auth.ts"}, {agent:"reviewer", task:"Review auth.ts"}]
Dispatched 2 job(s). IDs:
ltv123-abc
ltv124-def
> ... do other work in the main session ...
> fanout_status
Jobs: 2 total — 0 running, 0 queued, 2 done, 0 failed/aborted
> fanout_collect jobIds=["ltv123-abc","ltv124-def"]
[ltv123-abc] done (exit 0) [claude-sonnet-4]
Refactored auth.ts to use bearer tokens...
---
[ltv124-def] done (exit 0) [claude-sonnet-4]
The refactored auth.ts looks solid. One suggestion: ...
```
## Tools
### `fanout_dispatch`
Parameters:
- `tasks`: array of `{ agent, task, cwd?, model?, tools? }`
- `agentScope`: `"user" | "project" | "both"` (default `"user"`)
Returns: `{ dispatched: string[] }` — job IDs.
### `fanout_status`
Parameters:
- `jobIds?`: filter to specific IDs (omit for all)
- `includeDone?`: include finished jobs in listing (default `true`)
Returns per-job: `id`, `status`, `agent`, `task`, `exitCode`, `pid`, `turns`, `cost`.
### `fanout_collect`
Parameters:
- `jobIds`: array of IDs to collect
Returns per-job: `id`, `status`, `output` (final assistant text), `exitCode`, `usage`, `modelUsed`, `errorMessage`.
### `fanout_abort`
Parameters:
- `jobIds`: array of IDs to kill
Sends `SIGTERM`, then `SIGKILL` after 5s if still running.
## How it works with the agent loop
1. The LLM calls `fanout_dispatch`.
2. The tool returns **in <50ms** with job IDs.
3. The LLM is free to continue the conversation, run other tools, or ask the user.
4. Every 2 seconds the extension polls job PIDs.
5. When a job transitions to `done`/`failed`, the extension calls:
```ts
pi.sendUserMessage(`Fanout job X completed ...`, { deliverAs: "followUp" })
```
This injects a user-style message that triggers a follow-up turn when the agent is idle.
6. The LLM sees the notification and calls `fanout_collect` to retrieve outputs.
7. The LLM acts on the collected results (e.g. synthesize a final answer, dispatch follow-up jobs).
## Limitations / Roadmap
- **No true server push into a running turn**: If pi is mid-stream when a job finishes, the notification is queued as `followUp` and processed when the current turn ends.
- **No job result streaming** into the main session yet. Jobs are collected atomically after completion.
- **No automatic `fanout_collect`**: The LLM must explicitly call it. Future versions could auto-inject a tool-call hint.
- Jobs older than 24h are auto-cleaned on startup.
## License
MIT

96
agents.ts Normal file
View File

@ -0,0 +1,96 @@
import * as fs from "node:fs";
import * as path from "node:path";
export type AgentScope = "user" | "project" | "both";
export interface AgentConfig {
name: string;
description: string;
tools?: string[];
model?: string;
systemPrompt: string;
source: "user" | "project";
filePath: string;
}
export interface AgentDiscoveryResult {
agents: AgentConfig[];
projectAgentsDir: string | null;
}
function getAgentDir(): string {
const envDir = process.env.PI_AGENT_DIR;
if (envDir) return envDir;
return path.join(process.env.HOME ?? process.env.USERPROFILE ?? "/tmp", ".pi", "agent");
}
function parseFrontmatter<T>(content: string): { frontmatter: T; body: string } {
const match = content.match(/^---\r?\n([\s\S]*?)\r?\n---\r?\n([\s\S]*)$/);
if (!match) return { frontmatter: {} as T, body: content };
const lines = match[1].split(/\r?\n/);
const frontmatter: Record<string, string> = {};
for (const line of lines) {
const idx = line.indexOf(":");
if (idx > 0) frontmatter[line.slice(0, idx).trim()] = line.slice(idx + 1).trim();
}
return { frontmatter: frontmatter as T, body: match[2] };
}
function loadAgentsFromDir(dir: string, source: "user" | "project"): AgentConfig[] {
const agents: AgentConfig[] = [];
if (!fs.existsSync(dir)) return agents;
let entries: fs.Dirent[];
try { entries = fs.readdirSync(dir, { withFileTypes: true }); } catch { return agents; }
for (const entry of entries) {
if (!entry.name.endsWith(".md")) continue;
if (!entry.isFile() && !entry.isSymbolicLink()) continue;
const filePath = path.join(dir, entry.name);
let content: string;
try { content = fs.readFileSync(filePath, "utf-8"); } catch { continue; }
const { frontmatter, body } = parseFrontmatter<Record<string, string>>(content);
if (!frontmatter.name || !frontmatter.description) continue;
const tools = frontmatter.tools?.split(",").map((t: string) => t.trim()).filter(Boolean);
agents.push({
name: frontmatter.name,
description: frontmatter.description,
tools: tools && tools.length > 0 ? tools : undefined,
model: frontmatter.model,
systemPrompt: body,
source,
filePath,
});
}
return agents;
}
function isDirectory(p: string): boolean {
try { return fs.statSync(p).isDirectory(); } catch { return false; }
}
function findNearestProjectAgentsDir(cwd: string): string | null {
let currentDir = cwd;
while (true) {
const candidate = path.join(currentDir, ".pi", "agents");
if (isDirectory(candidate)) return candidate;
const parentDir = path.dirname(currentDir);
if (parentDir === currentDir) return null;
currentDir = parentDir;
}
}
export function discoverAgents(cwd: string, scope: AgentScope): AgentDiscoveryResult {
const userDir = path.join(getAgentDir(), "agents");
const projectAgentsDir = findNearestProjectAgentsDir(cwd);
const userAgents = scope === "project" ? [] : loadAgentsFromDir(userDir, "user");
const projectAgents = scope === "user" || !projectAgentsDir ? [] : loadAgentsFromDir(projectAgentsDir, "project");
const agentMap = new Map<string, AgentConfig>();
if (scope === "both") {
for (const agent of userAgents) agentMap.set(agent.name, agent);
for (const agent of projectAgents) agentMap.set(agent.name, agent);
} else if (scope === "user") {
for (const agent of userAgents) agentMap.set(agent.name, agent);
} else {
for (const agent of projectAgents) agentMap.set(agent.name, agent);
}
return { agents: Array.from(agentMap.values()), projectAgentsDir };
}

410
controller.ts Normal file
View File

@ -0,0 +1,410 @@
import { spawn } from "node:child_process";
import * as fs from "node:fs";
import * as os from "node:os";
import * as path from "node:path";
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import type { Message } from "@earendil-works/pi-ai";
import { discoverAgents, type AgentConfig, type AgentScope } from "./agents.js";
import type { AbortResult, CollectResult, DispatchResult, FanoutJob, JobUsage, StatusResult } from "./types.js";
function getPiInvocation(args: string[]): { command: string; args: string[] } {
const currentScript = process.argv[1];
const isBunVirtualScript = currentScript?.startsWith("/$bunfs/root/");
if (currentScript && !isBunVirtualScript && fs.existsSync(currentScript)) {
return { command: process.execPath, args: [currentScript, ...args] };
}
const execName = path.basename(process.execPath).toLowerCase();
const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName);
if (!isGenericRuntime) {
return { command: process.execPath, args };
}
return { command: "pi", args };
}
function getFinalOutput(messages: Message[]): string {
for (let i = messages.length - 1; i >= 0; i--) {
const msg = messages[i];
if (msg.role === "assistant") {
for (const part of msg.content) {
if (part.type === "text") return part.text;
}
}
}
return "";
}
function generateId(): string {
return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`;
}
function jobDir(dataDir: string, jobId: string): string {
return path.join(dataDir, "jobs", jobId);
}
export class FanoutController {
private pi: ExtensionAPI;
private dataDir: string;
private jobs = new Map<string, FanoutJob>();
private poller: NodeJS.Timeout | null = null;
private agents: AgentConfig[] = [];
private agentScope: AgentScope = "user";
private projectAgentsDir: string | null = null;
private defaultCwd: string;
constructor(pi: ExtensionAPI, defaultCwd: string) {
this.pi = pi;
this.defaultCwd = defaultCwd;
this.dataDir = path.join(os.homedir(), ".pi", "fanout");
fs.mkdirSync(path.join(this.dataDir, "jobs"), { recursive: true });
this.rehydrate();
this.startPoller();
}
private rehydrate() {
const jobsBase = path.join(this.dataDir, "jobs");
if (!fs.existsSync(jobsBase)) return;
const dirs = fs.readdirSync(jobsBase, { withFileTypes: true });
for (const dir of dirs) {
if (!dir.isDirectory()) continue;
const metaPath = path.join(jobsBase, dir.name, "meta.json");
if (!fs.existsSync(metaPath)) continue;
try {
const raw = fs.readFileSync(metaPath, "utf-8");
const job = JSON.parse(raw) as FanoutJob;
if (job.status === "running" && job.pid) {
try {
process.kill(job.pid, 0);
} catch {
job.status = "failed";
job.exitCode = job.exitCode ?? 1;
}
}
this.jobs.set(job.id, job);
} catch {
// ignore corrupt meta
}
}
}
private persist(job: FanoutJob) {
const dir = jobDir(this.dataDir, job.id);
fs.mkdirSync(dir, { recursive: true });
fs.writeFileSync(path.join(dir, "meta.json"), JSON.stringify(job, null, 2));
}
discoverAgents(cwd: string, scope: AgentScope) {
const result = discoverAgents(cwd, scope);
this.agents = result.agents;
this.agentScope = scope;
this.projectAgentsDir = result.projectAgentsDir;
}
dispatch(
agentName: string,
task: string,
cwd: string | undefined,
model?: string,
tools?: string[],
agentScope?: AgentScope,
): DispatchResult {
if (agentScope) this.discoverAgents(cwd ?? this.defaultCwd, agentScope);
const agent = this.agents.find((a) => a.name === agentName);
if (!agent) {
const available = this.agents.map((a) => `"${a.name}"`).join(", ") || "none";
throw new Error(`Unknown agent: "${agentName}". Available: ${available}.`);
}
const jobId = generateId();
const dir = jobDir(this.dataDir, jobId);
fs.mkdirSync(dir, { recursive: true });
const job: FanoutJob = {
id: jobId,
createdAt: Date.now(),
agent: agentName,
agentSource: agent.source,
task,
cwd: cwd ?? this.defaultCwd,
model: model || agent.model,
tools: tools || agent.tools,
status: "queued",
outputFile: path.join(dir, "output.jsonl"),
metaFile: path.join(dir, "meta.json"),
notified: false,
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
};
this.jobs.set(jobId, job);
this.persist(job);
this.spawnJob(job, agent);
return { jobId, status: "running", message: `Dispatched job ${jobId} to ${agentName}` };
}
private spawnJob(job: FanoutJob, agent: AgentConfig) {
const args: string[] = ["--mode", "json", "-p", "--no-session"];
if (job.model) args.push("--model", job.model);
if (job.tools && job.tools.length > 0) args.push("--tools", job.tools.join(","));
let tmpPromptPath: string | null = null;
if (agent.systemPrompt.trim()) {
const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "pi-fanout-"));
const safeName = agent.name.replace(/[^\w.-]+/g, "_");
tmpPromptPath = path.join(tmpDir, `prompt-${safeName}.md`);
fs.writeFileSync(tmpPromptPath, agent.systemPrompt, { encoding: "utf-8", mode: 0o600 });
args.push("--append-system-prompt", tmpPromptPath);
}
args.push(`Task: ${job.task}`);
const invocation = getPiInvocation(args);
const proc = spawn(invocation.command, invocation.args, {
cwd: job.cwd,
detached: true,
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
job.pid = proc.pid;
job.status = "running";
this.persist(job);
const outputFile = fs.openSync(job.outputFile, "w");
let stdoutBuffer = "";
let stderrBuffer = "";
proc.stdout.on("data", (data: Buffer) => {
stdoutBuffer += data.toString();
const lines = stdoutBuffer.split("\n");
stdoutBuffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.trim()) continue;
fs.writeSync(outputFile, line + "\n");
this.processOutputLine(job, line);
}
});
proc.stderr.on("data", (data: Buffer) => {
stderrBuffer += data.toString();
});
proc.on("close", (code) => {
if (stdoutBuffer.trim()) {
fs.writeSync(outputFile, stdoutBuffer + "\n");
this.processOutputLine(job, stdoutBuffer);
}
try { fs.closeSync(outputFile); } catch {}
if (job.status !== "aborted") {
job.exitCode = code ?? 0;
job.status = code === 0 ? "done" : "failed";
}
if (stderrBuffer.trim()) {
job.errorMessage = stderrBuffer.trim().slice(0, 500);
}
this.persist(job);
if (tmpPromptPath) {
try { fs.unlinkSync(tmpPromptPath); } catch {}
try { fs.rmdirSync(path.dirname(tmpPromptPath)); } catch {}
}
});
proc.on("error", () => {
try { fs.closeSync(outputFile); } catch {}
if (job.status !== "aborted") {
job.status = "failed";
job.exitCode = 1;
job.errorMessage = job.errorMessage || "Failed to spawn process.";
}
this.persist(job);
});
proc.unref();
}
private processOutputLine(job: FanoutJob, line: string) {
let event: any;
try { event = JSON.parse(line); } catch { return; }
if (event.type === "message_end" && event.message) {
const msg = event.message as Message;
if (msg.role === "assistant") {
job.usage.turns++;
const usage = msg.usage;
if (usage) {
job.usage.input += usage.input || 0;
job.usage.output += usage.output || 0;
job.usage.cacheRead += usage.cacheRead || 0;
job.usage.cacheWrite += usage.cacheWrite || 0;
job.usage.cost += usage.cost?.total || 0;
}
if (!job.modelUsed && msg.model) job.modelUsed = msg.model;
if (msg.stopReason) job.stopReason = msg.stopReason;
if (msg.errorMessage) job.errorMessage = msg.errorMessage;
}
}
}
private startPoller() {
if (this.poller) return;
this.poller = setInterval(() => {
this.poll();
this.checkNotifications();
}, 2000);
}
stopPoller() {
if (this.poller) {
clearInterval(this.poller);
this.poller = null;
}
}
private poll() {
for (const job of this.jobs.values()) {
if (job.status !== "running" || !job.pid) continue;
try {
process.kill(job.pid, 0);
} catch {
// Process is gone but we didn't get close event yet (race). Mark failed if not already updated.
if (job.status === "running") {
job.status = "failed";
job.exitCode = job.exitCode ?? 1;
this.persist(job);
}
}
}
}
private checkNotifications() {
for (const job of this.jobs.values()) {
if (job.status === "running" || job.status === "queued") continue;
if (job.notified) continue;
job.notified = true;
this.persist(job);
const outcome = job.status === "done" ? "completed" : `failed (exit ${job.exitCode ?? "?"})`;
try {
this.pi.sendUserMessage(
`Fanout job \`${job.id}\` (${job.agent}) ${outcome}. Use \`fanout_collect\` to retrieve results.`,
{ deliverAs: "followUp" },
);
} catch {
// If sendUserMessage fails (e.g. no active session), ignore.
}
}
}
status(jobIds?: string[]): StatusResult {
const ids = jobIds?.length ? jobIds : Array.from(this.jobs.keys());
const jobs: StatusResult["jobs"] = [];
for (const id of ids) {
const job = this.jobs.get(id);
if (!job) continue;
jobs.push({
id: job.id,
status: job.status,
agent: job.agent,
task: job.task,
exitCode: job.exitCode,
pid: job.pid,
turns: job.usage.turns,
cost: job.usage.cost,
});
}
return { jobs };
}
collect(jobIds: string[]): CollectResult {
const results: CollectResult["results"] = [];
for (const id of jobIds) {
const job = this.jobs.get(id);
if (!job) {
results.push({
id,
status: "failed",
output: "",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
errorMessage: "Job not found.",
});
continue;
}
if (job.status === "running" || job.status === "queued") {
results.push({
id,
status: job.status,
output: "",
usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 },
errorMessage: "Job still running.",
});
continue;
}
let output = "";
if (fs.existsSync(job.outputFile)) {
try {
const lines = fs.readFileSync(job.outputFile, "utf-8").split("\n").filter(Boolean);
const messages: Message[] = [];
for (const line of lines) {
try {
const event = JSON.parse(line);
if (event.type === "message_end" && event.message) messages.push(event.message as Message);
if (event.type === "tool_result_end" && event.message) messages.push(event.message as Message);
} catch {}
}
output = getFinalOutput(messages);
} catch {}
}
results.push({
id,
status: job.status,
output,
exitCode: job.exitCode,
usage: job.usage,
modelUsed: job.modelUsed,
errorMessage: job.errorMessage,
});
}
return { results };
}
abort(jobIds: string[]): AbortResult {
const aborted: string[] = [];
const notFound: string[] = [];
const alreadyDone: string[] = [];
for (const id of jobIds) {
const job = this.jobs.get(id);
if (!job) { notFound.push(id); continue; }
if (job.status !== "running" && job.status !== "queued") {
alreadyDone.push(id);
continue;
}
if (job.pid) {
try {
process.kill(job.pid, "SIGTERM");
setTimeout(() => {
try { process.kill(job.pid!, "SIGKILL"); } catch {}
}, 5000);
} catch {}
}
job.status = "aborted";
this.persist(job);
aborted.push(id);
}
return { aborted, notFound, alreadyDone };
}
cleanupOldJobs(maxAgeMs = 24 * 60 * 60 * 1000) {
const now = Date.now();
for (const [id, job] of this.jobs.entries()) {
if (job.status === "running" || job.status === "queued") continue;
if (now - job.createdAt > maxAgeMs) {
const dir = jobDir(this.dataDir, id);
try {
fs.rmSync(dir, { recursive: true, force: true });
} catch {}
this.jobs.delete(id);
}
}
}
}

213
index.ts Normal file
View File

@ -0,0 +1,213 @@
import { Type } from "typebox";
import type { AgentToolResult } from "@earendil-works/pi-agent-core";
import { Text } from "@earendil-works/pi-tui";
import type { ExtensionAPI, ExtensionContext } from "@earendil-works/pi-coding-agent";
import { FanoutController } from "./controller.js";
import type { AbortResult, CollectResult, StatusResult } from "./types.js";
const TaskItem = Type.Object({
agent: Type.String({ description: "Name of the agent to invoke" }),
task: Type.String({ description: "Task to delegate" }),
cwd: Type.Optional(Type.String({ description: "Working directory for the agent process" })),
model: Type.Optional(Type.String({ description: "Override model for this task" })),
tools: Type.Optional(Type.Array(Type.String(), { description: "Override tools for this task" })),
});
const AgentScopeSchema = Type.Union(
[Type.Literal("user"), Type.Literal("project"), Type.Literal("both")],
{ description: 'Which agent directories to use. Default: "user".', default: "user" },
);
const DispatchParams = Type.Object({
tasks: Type.Array(TaskItem, { description: "Array of tasks to dispatch in parallel" }),
agentScope: Type.Optional(AgentScopeSchema),
});
const StatusParams = Type.Object({
jobIds: Type.Optional(Type.Array(Type.String(), { description: "Filter by job IDs. Omit for all jobs.", default: [] })),
includeDone: Type.Optional(Type.Boolean({ description: "Include done/aborted jobs", default: true })),
});
const CollectParams = Type.Object({
jobIds: Type.Array(Type.String(), { description: "Job IDs to collect results for" }),
});
const AbortParams = Type.Object({
jobIds: Type.Array(Type.String(), { description: "Job IDs to abort" }),
});
export default function (pi: ExtensionAPI) {
const controller = new FanoutController(pi, process.cwd());
controller.cleanupOldJobs();
pi.registerTool({
name: "fanout_dispatch",
label: "Fanout Dispatch",
description:
"Dispatch one or more subagent tasks asynchronously. Returns immediately with job IDs. " +
"Jobs run in the background and can be monitored via fanout_status and retrieved via fanout_collect.",
parameters: DispatchParams,
promptSnippet: "fanout_dispatch: tasks=[{agent, task}]",
promptGuidelines: [
"Use fanout_dispatch to run multiple agents in parallel without blocking the main session.",
"After dispatching, you may do other work. Check status later with fanout_status.",
"When jobs complete, the system will send a follow-up message. Retrieve output with fanout_collect.",
],
execute: async (
_toolCallId,
params,
_signal,
_onUpdate,
ctx,
): Promise<AgentToolResult<{ dispatched: string[] }>> => {
const scope = params.agentScope ?? "user";
controller.discoverAgents(ctx.cwd, scope);
const dispatched: string[] = [];
const errors: string[] = [];
for (const t of params.tasks) {
try {
const result = controller.dispatch(t.agent, t.task, t.cwd, t.model, t.tools, scope);
dispatched.push(result.jobId);
} catch (e) {
errors.push(`[${t.agent}] ${(e as Error).message}`);
}
}
const text = dispatched.length
? `Dispatched ${dispatched.length} job(s). IDs:\n${dispatched.join("\n")}`
: "No jobs dispatched.";
const errorText = errors.length ? `\n\nErrors:\n${errors.join("\n")}` : "";
return {
content: [{ type: "text", text: text + errorText }],
details: { dispatched },
isError: errors.length > 0 && dispatched.length === 0,
};
},
renderCall(args, theme) {
const count = args.tasks?.length ?? 0;
const scope = args.agentScope ?? "user";
let text =
theme.fg("toolTitle", theme.bold("fanout_dispatch ")) +
theme.fg("accent", `${count} tasks`) +
theme.fg("muted", ` [${scope}]`);
for (const t of args.tasks?.slice(0, 3) ?? []) {
const preview = t.task.length > 40 ? `${t.task.slice(0, 40)}...` : t.task;
text += `\n ${theme.fg("accent", t.agent)}${theme.fg("dim", ` ${preview}`)}`;
}
if (count > 3) text += `\n ${theme.fg("muted", `... +${count - 3} more`)}`;
return new Text(text, 0, 0);
},
});
pi.registerTool({
name: "fanout_status",
label: "Fanout Status",
description:
"Check status of async fanout jobs. Returns running/done/failed counts and per-job metadata. " +
"Call periodically or after receiving a completion notification.",
parameters: StatusParams,
execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise<AgentToolResult<StatusResult>> => {
const result = controller.status(params.jobIds?.length ? params.jobIds : undefined);
if (!params.includeDone) {
result.jobs = result.jobs.filter((j) => j.status === "running" || j.status === "queued");
}
const running = result.jobs.filter((j) => j.status === "running").length;
const queued = result.jobs.filter((j) => j.status === "queued").length;
const done = result.jobs.filter((j) => j.status === "done").length;
const failed = result.jobs.filter((j) => j.status === "failed" || j.status === "aborted").length;
const text = `Jobs: ${result.jobs.length} total — ${running} running, ${queued} queued, ${done} done, ${failed} failed/aborted`;
const detailText =
result.jobs.length > 0
? "\n\n" +
result.jobs
.map(
(j) =>
`- ${j.id} | ${j.agent} | ${j.status}` +
(j.exitCode !== undefined ? ` (exit ${j.exitCode})` : "") +
(j.turns ? ` | ${j.turns} turns` : ""),
)
.join("\n")
: "";
return {
content: [{ type: "text", text: text + detailText }],
details: result,
};
},
renderCall(args, theme) {
const ids = args.jobIds?.length ? `(${args.jobIds.length} ids)` : "(all)";
const text = theme.fg("toolTitle", theme.bold("fanout_status ")) + theme.fg("accent", ids);
return new Text(text, 0, 0);
},
});
pi.registerTool({
name: "fanout_collect",
label: "Fanout Collect",
description:
"Retrieve final results from completed fanout jobs. Only works for done/failed jobs. " +
"After collection the jobs remain in the history until cleanup.",
parameters: CollectParams,
execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise<AgentToolResult<CollectResult>> => {
const result = controller.collect(params.jobIds);
return {
content: [
{
type: "text",
text: result.results
.map((r) => {
const header =
`[${r.id}] ${r.status}` +
(r.exitCode !== undefined ? ` (exit ${r.exitCode})` : "") +
(r.modelUsed ? ` [${r.modelUsed}]` : "");
const body = r.output || r.errorMessage || "(no output)";
return `${header}\n${body}`;
})
.join("\n\n---\n\n"),
},
],
details: result,
};
},
renderCall(args, theme) {
const text =
theme.fg("toolTitle", theme.bold("fanout_collect ")) +
theme.fg("accent", `${args.jobIds.length} jobs`);
return new Text(text, 0, 0);
},
});
pi.registerTool({
name: "fanout_abort",
label: "Fanout Abort",
description: "Send SIGTERM (then SIGKILL after 5s) to running fanout jobs.",
parameters: AbortParams,
execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise<AgentToolResult<AbortResult>> => {
const result = controller.abort(params.jobIds);
const parts: string[] = [];
if (result.aborted.length) parts.push(`Aborted: ${result.aborted.join(", ")}`);
if (result.alreadyDone.length) parts.push(`Already done: ${result.alreadyDone.join(", ")}`);
if (result.notFound.length) parts.push(`Not found: ${result.notFound.join(", ")}`);
return {
content: [{ type: "text", text: parts.join("\n") || "Nothing to abort." }],
details: result,
};
},
renderCall(args, theme) {
const text =
theme.fg("toolTitle", theme.bold("fanout_abort ")) +
theme.fg("accent", `${args.jobIds.length} jobs`);
return new Text(text, 0, 0);
},
});
}

14
package.json Normal file
View File

@ -0,0 +1,14 @@
{
"name": "pi-fanout",
"version": "0.1.0",
"description": "Non-blocking async agent fanout for pi",
"type": "module",
"main": "index.ts",
"dependencies": {
"@earendil-works/pi-coding-agent": "^1.0.0",
"@earendil-works/pi-agent-core": "^1.0.0",
"@earendil-works/pi-ai": "^1.0.0",
"@earendil-works/pi-tui": "^1.0.0",
"typebox": "^0.0.1"
}
}

66
types.ts Normal file
View File

@ -0,0 +1,66 @@
export interface JobUsage {
input: number;
output: number;
cacheRead: number;
cacheWrite: number;
cost: number;
turns: number;
}
export interface FanoutJob {
id: string;
createdAt: number;
agent: string;
agentSource: "user" | "project" | "unknown";
task: string;
cwd: string;
model?: string;
tools?: string[];
status: "queued" | "running" | "done" | "failed" | "aborted";
pid?: number;
exitCode?: number;
outputFile: string;
metaFile: string;
notified: boolean;
errorMessage?: string;
usage: JobUsage;
modelUsed?: string;
stopReason?: string;
}
export interface DispatchResult {
jobId: string;
status: "queued" | "running";
message: string;
}
export interface StatusResult {
jobs: Array<{
id: string;
status: FanoutJob["status"];
agent: string;
task: string;
exitCode?: number;
pid?: number;
turns?: number;
cost?: number;
}>;
}
export interface CollectResult {
results: Array<{
id: string;
status: FanoutJob["status"];
output: string;
exitCode?: number;
usage: JobUsage;
modelUsed?: string;
errorMessage?: string;
}>;
}
export interface AbortResult {
aborted: string[];
notFound: string[];
alreadyDone: string[];
}