This repository has been archived on 2026-05-15. You can view files and clone it, but cannot push or open issues or pull requests.
pi-fanout/index.ts

221 lines
8.5 KiB
TypeScript

import { Type } from "typebox";
import type { AgentToolResult } from "@earendil-works/pi-agent-core";
import { Text } from "@earendil-works/pi-tui";
import type { ExtensionAPI, ExtensionContext } from "@earendil-works/pi-coding-agent";
import { FanoutController } from "./controller.js";
import type { AbortResult, CollectResult, StatusResult } from "./types.js";
const TaskItem = Type.Object({
agent: Type.String({ description: "Name of the agent to invoke" }),
task: Type.String({ description: "Task to delegate" }),
cwd: Type.Optional(Type.String({ description: "Working directory for the agent process" })),
model: Type.Optional(Type.String({ description: "Override model for this task" })),
tools: Type.Optional(Type.Array(Type.String(), { description: "Override tools for this task" })),
});
const AgentScopeSchema = Type.Union(
[Type.Literal("user"), Type.Literal("project"), Type.Literal("both")],
{ description: 'Which agent directories to use. Default: "user".', default: "user" },
);
const DispatchParams = Type.Object({
tasks: Type.Array(TaskItem, { description: "Array of tasks to dispatch in parallel" }),
agentScope: Type.Optional(AgentScopeSchema),
});
const StatusParams = Type.Object({
jobIds: Type.Optional(Type.Array(Type.String(), { description: "Filter by job IDs. Omit for all jobs.", default: [] })),
includeDone: Type.Optional(Type.Boolean({ description: "Include done/aborted jobs", default: true })),
});
const CollectParams = Type.Object({
jobIds: Type.Array(Type.String(), { description: "Job IDs to collect results for" }),
});
const AbortParams = Type.Object({
jobIds: Type.Array(Type.String(), { description: "Job IDs to abort" }),
});
export default function (pi: ExtensionAPI) {
const controller = new FanoutController(pi, process.cwd());
controller.cleanupOldJobs();
pi.registerTool({
name: "fanout_dispatch",
label: "Fanout Dispatch",
description:
"Dispatch one or more subagent tasks asynchronously. Returns immediately with job IDs. " +
"Jobs run in the background and can be monitored via fanout_status and retrieved via fanout_collect.",
parameters: DispatchParams,
promptSnippet: "fanout_dispatch: tasks=[{agent, task}]",
promptGuidelines: [
"Use fanout_dispatch to run multiple agents in parallel without blocking the main session.",
"After dispatching, you may do other work. Check status later with fanout_status.",
"When jobs complete, the system will send a follow-up message. Retrieve output with fanout_collect.",
],
execute: async (
_toolCallId,
params,
_signal,
_onUpdate,
ctx,
): Promise<AgentToolResult<{ dispatched: string[] }>> => {
const scope = params.agentScope ?? "user";
controller.discoverAgents(ctx.cwd, scope);
const dispatched: string[] = [];
const errors: string[] = [];
for (const t of params.tasks) {
try {
const result = controller.dispatch(t.agent, t.task, t.cwd, t.model, t.tools, scope);
dispatched.push(result.jobId);
} catch (e) {
errors.push(`[${t.agent}] ${(e as Error).message}`);
}
}
const text = dispatched.length
? `Dispatched ${dispatched.length} job(s). IDs:\n${dispatched.join("\n")}`
: "No jobs dispatched.";
const errorText = errors.length ? `\n\nErrors:\n${errors.join("\n")}` : "";
return {
content: [{ type: "text", text: text + errorText }],
details: { dispatched },
isError: errors.length > 0 && dispatched.length === 0,
};
},
renderCall(args, theme) {
const count = args.tasks?.length ?? 0;
const scope = args.agentScope ?? "user";
let text =
theme.fg("toolTitle", theme.bold("fanout_dispatch ")) +
theme.fg("accent", `${count} tasks`) +
theme.fg("muted", ` [${scope}]`);
for (const t of args.tasks?.slice(0, 3) ?? []) {
const preview = t.task.length > 40 ? `${t.task.slice(0, 40)}...` : t.task;
text += `\n ${theme.fg("accent", t.agent)}${theme.fg("dim", ` ${preview}`)}`;
}
if (count > 3) text += `\n ${theme.fg("muted", `... +${count - 3} more`)}`;
return new Text(text, 0, 0);
},
});
pi.registerTool({
name: "fanout_status",
label: "Fanout Status",
description:
"Check status of async fanout jobs. Returns running/done/failed counts and per-job metadata. " +
"Call periodically or after receiving a completion notification.",
parameters: StatusParams,
execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise<AgentToolResult<StatusResult>> => {
const result = controller.status(params.jobIds?.length ? params.jobIds : undefined);
if (!params.includeDone) {
result.jobs = result.jobs.filter((j) => j.status === "running" || j.status === "queued");
}
const running = result.jobs.filter((j) => j.status === "running").length;
const queued = result.jobs.filter((j) => j.status === "queued").length;
const done = result.jobs.filter((j) => j.status === "done").length;
const failed = result.jobs.filter((j) => j.status === "failed" || j.status === "aborted").length;
const text = `Jobs: ${result.jobs.length} total — ${running} running, ${queued} queued, ${done} done, ${failed} failed/aborted`;
const detailText =
result.jobs.length > 0
? "\n\n" +
result.jobs
.map((j) => {
let jobLine =
`- ${j.id} | ${j.agent} | ${j.status}` +
(j.exitCode !== undefined ? ` (exit ${j.exitCode})` : "") +
(j.turns ? ` | ${j.turns} turns` : "");
if (j.preview) {
jobLine += `\n \`\`\`\n${j.preview
.split("\n")
.map((line) => ` ${line}`)
.join("\n")}\n \`\`\``;
}
return jobLine;
})
.join("\n")
: "";
return {
content: [{ type: "text", text: text + detailText }],
details: result,
};
},
renderCall(args, theme) {
const ids = args.jobIds?.length ? `(${args.jobIds.length} ids)` : "(all)";
const text = theme.fg("toolTitle", theme.bold("fanout_status ")) + theme.fg("accent", ids);
return new Text(text, 0, 0);
},
});
pi.registerTool({
name: "fanout_collect",
label: "Fanout Collect",
description:
"Retrieve final results from completed fanout jobs. Only works for done/failed jobs. " +
"After collection the jobs remain in the history until cleanup.",
parameters: CollectParams,
execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise<AgentToolResult<CollectResult>> => {
const result = controller.collect(params.jobIds);
return {
content: [
{
type: "text",
text: result.results
.map((r) => {
const header =
`[${r.id}] ${r.status}` +
(r.exitCode !== undefined ? ` (exit ${r.exitCode})` : "") +
(r.modelUsed ? ` [${r.modelUsed}]` : "");
const body = r.output || r.errorMessage || "(no output)";
return `${header}\n${body}`;
})
.join("\n\n---\n\n"),
},
],
details: result,
};
},
renderCall(args, theme) {
const text =
theme.fg("toolTitle", theme.bold("fanout_collect ")) +
theme.fg("accent", `${args.jobIds.length} jobs`);
return new Text(text, 0, 0);
},
});
pi.registerTool({
name: "fanout_abort",
label: "Fanout Abort",
description: "Send SIGTERM (then SIGKILL after 5s) to running fanout jobs.",
parameters: AbortParams,
execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise<AgentToolResult<AbortResult>> => {
const result = controller.abort(params.jobIds);
const parts: string[] = [];
if (result.aborted.length) parts.push(`Aborted: ${result.aborted.join(", ")}`);
if (result.alreadyDone.length) parts.push(`Already done: ${result.alreadyDone.join(", ")}`);
if (result.notFound.length) parts.push(`Not found: ${result.notFound.join(", ")}`);
return {
content: [{ type: "text", text: parts.join("\n") || "Nothing to abort." }],
details: result,
};
},
renderCall(args, theme) {
const text =
theme.fg("toolTitle", theme.bold("fanout_abort ")) +
theme.fg("accent", `${args.jobIds.length} jobs`);
return new Text(text, 0, 0);
},
});
}