commit 341e238669d7d0b2bda0e8f61eb4d21769a3430d Author: jay Date: Fri May 15 05:01:14 2026 +0200 feat: non-blocking async agent fanout extension diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c45938 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +node_modules/ +*.log +.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..9a6602f --- /dev/null +++ b/README.md @@ -0,0 +1,129 @@ +# pi-fanout + +Non-blocking async agent fanout for [pi](https://pi.earendil.dev). + +## Problem + +The built-in `subagent` tool is powerful, but its `execute()` blocks until **all** +dispatched agents finish. While parallel tasks run concurrently internally, the main +pi session is frozen waiting for the final result. + +## Solution + +`pi-fanout` turns subagent dispatch into a **true async job queue**: + +- `fanout_dispatch` — returns immediately with job IDs; agents run as detached processes +- `fanout_status` — poll running/done/failed counts at any time +- `fanout_collect` — retrieve final output from completed jobs +- `fanout_abort` — kill running jobs on demand + +The main pi session stays unblocked. You can do other work, dispatch more jobs, +and collect results whenever they’re ready. When a background job finishes, the +extension sends a `followUp` message into the session so you know it’s time to +collect. + +## Architecture + +``` +pi main process +├─ pi-fanout extension +│ ├─ JobManager (in-memory + disk state in ~/.pi/fanout/jobs/) +│ └─ Poller (every 2s: check PIDs, notify on completion) +│ +├─ detached pi child (Agent A) ── writes output to job dir +├─ detached pi child (Agent B) ── writes output to job dir +└─ detached pi child (Agent C) ── writes output to job dir +``` + +Jobs survive pi restarts because: +1. Child processes are **detached** from the parent +2. State is persisted as `meta.json` + `output.jsonl` per job +3. On startup the extension rehydrates old jobs and checks if their PIDs are still alive + +## Install + +```bash +pi use git:git.vpsj.de/jay/pi-fanout +``` + +Or clone into your extensions directory and add it to `~/.pi/extensions.json`. + +## Usage + +``` +> fanout_dispatch tasks=[{agent:"worker", task:"Refactor auth.ts"}, {agent:"reviewer", task:"Review auth.ts"}] +Dispatched 2 job(s). IDs: +ltv123-abc +ltv124-def + +> ... do other work in the main session ... + +> fanout_status +Jobs: 2 total — 0 running, 0 queued, 2 done, 0 failed/aborted + +> fanout_collect jobIds=["ltv123-abc","ltv124-def"] +[ltv123-abc] done (exit 0) [claude-sonnet-4] +Refactored auth.ts to use bearer tokens... + +--- + +[ltv124-def] done (exit 0) [claude-sonnet-4] +The refactored auth.ts looks solid. One suggestion: ... +``` + +## Tools + +### `fanout_dispatch` + +Parameters: +- `tasks`: array of `{ agent, task, cwd?, model?, tools? }` +- `agentScope`: `"user" | "project" | "both"` (default `"user"`) + +Returns: `{ dispatched: string[] }` — job IDs. + +### `fanout_status` + +Parameters: +- `jobIds?`: filter to specific IDs (omit for all) +- `includeDone?`: include finished jobs in listing (default `true`) + +Returns per-job: `id`, `status`, `agent`, `task`, `exitCode`, `pid`, `turns`, `cost`. + +### `fanout_collect` + +Parameters: +- `jobIds`: array of IDs to collect + +Returns per-job: `id`, `status`, `output` (final assistant text), `exitCode`, `usage`, `modelUsed`, `errorMessage`. + +### `fanout_abort` + +Parameters: +- `jobIds`: array of IDs to kill + +Sends `SIGTERM`, then `SIGKILL` after 5s if still running. + +## How it works with the agent loop + +1. The LLM calls `fanout_dispatch`. +2. The tool returns **in <50ms** with job IDs. +3. The LLM is free to continue the conversation, run other tools, or ask the user. +4. Every 2 seconds the extension polls job PIDs. +5. When a job transitions to `done`/`failed`, the extension calls: + ```ts + pi.sendUserMessage(`Fanout job X completed ...`, { deliverAs: "followUp" }) + ``` + This injects a user-style message that triggers a follow-up turn when the agent is idle. +6. The LLM sees the notification and calls `fanout_collect` to retrieve outputs. +7. The LLM acts on the collected results (e.g. synthesize a final answer, dispatch follow-up jobs). + +## Limitations / Roadmap + +- **No true server push into a running turn**: If pi is mid-stream when a job finishes, the notification is queued as `followUp` and processed when the current turn ends. +- **No job result streaming** into the main session yet. Jobs are collected atomically after completion. +- **No automatic `fanout_collect`**: The LLM must explicitly call it. Future versions could auto-inject a tool-call hint. +- Jobs older than 24h are auto-cleaned on startup. + +## License + +MIT diff --git a/agents.ts b/agents.ts new file mode 100644 index 0000000..f0a7588 --- /dev/null +++ b/agents.ts @@ -0,0 +1,96 @@ +import * as fs from "node:fs"; +import * as path from "node:path"; + +export type AgentScope = "user" | "project" | "both"; + +export interface AgentConfig { + name: string; + description: string; + tools?: string[]; + model?: string; + systemPrompt: string; + source: "user" | "project"; + filePath: string; +} + +export interface AgentDiscoveryResult { + agents: AgentConfig[]; + projectAgentsDir: string | null; +} + +function getAgentDir(): string { + const envDir = process.env.PI_AGENT_DIR; + if (envDir) return envDir; + return path.join(process.env.HOME ?? process.env.USERPROFILE ?? "/tmp", ".pi", "agent"); +} + +function parseFrontmatter(content: string): { frontmatter: T; body: string } { + const match = content.match(/^---\r?\n([\s\S]*?)\r?\n---\r?\n([\s\S]*)$/); + if (!match) return { frontmatter: {} as T, body: content }; + const lines = match[1].split(/\r?\n/); + const frontmatter: Record = {}; + for (const line of lines) { + const idx = line.indexOf(":"); + if (idx > 0) frontmatter[line.slice(0, idx).trim()] = line.slice(idx + 1).trim(); + } + return { frontmatter: frontmatter as T, body: match[2] }; +} + +function loadAgentsFromDir(dir: string, source: "user" | "project"): AgentConfig[] { + const agents: AgentConfig[] = []; + if (!fs.existsSync(dir)) return agents; + let entries: fs.Dirent[]; + try { entries = fs.readdirSync(dir, { withFileTypes: true }); } catch { return agents; } + for (const entry of entries) { + if (!entry.name.endsWith(".md")) continue; + if (!entry.isFile() && !entry.isSymbolicLink()) continue; + const filePath = path.join(dir, entry.name); + let content: string; + try { content = fs.readFileSync(filePath, "utf-8"); } catch { continue; } + const { frontmatter, body } = parseFrontmatter>(content); + if (!frontmatter.name || !frontmatter.description) continue; + const tools = frontmatter.tools?.split(",").map((t: string) => t.trim()).filter(Boolean); + agents.push({ + name: frontmatter.name, + description: frontmatter.description, + tools: tools && tools.length > 0 ? tools : undefined, + model: frontmatter.model, + systemPrompt: body, + source, + filePath, + }); + } + return agents; +} + +function isDirectory(p: string): boolean { + try { return fs.statSync(p).isDirectory(); } catch { return false; } +} + +function findNearestProjectAgentsDir(cwd: string): string | null { + let currentDir = cwd; + while (true) { + const candidate = path.join(currentDir, ".pi", "agents"); + if (isDirectory(candidate)) return candidate; + const parentDir = path.dirname(currentDir); + if (parentDir === currentDir) return null; + currentDir = parentDir; + } +} + +export function discoverAgents(cwd: string, scope: AgentScope): AgentDiscoveryResult { + const userDir = path.join(getAgentDir(), "agents"); + const projectAgentsDir = findNearestProjectAgentsDir(cwd); + const userAgents = scope === "project" ? [] : loadAgentsFromDir(userDir, "user"); + const projectAgents = scope === "user" || !projectAgentsDir ? [] : loadAgentsFromDir(projectAgentsDir, "project"); + const agentMap = new Map(); + if (scope === "both") { + for (const agent of userAgents) agentMap.set(agent.name, agent); + for (const agent of projectAgents) agentMap.set(agent.name, agent); + } else if (scope === "user") { + for (const agent of userAgents) agentMap.set(agent.name, agent); + } else { + for (const agent of projectAgents) agentMap.set(agent.name, agent); + } + return { agents: Array.from(agentMap.values()), projectAgentsDir }; +} diff --git a/controller.ts b/controller.ts new file mode 100644 index 0000000..57db316 --- /dev/null +++ b/controller.ts @@ -0,0 +1,410 @@ +import { spawn } from "node:child_process"; +import * as fs from "node:fs"; +import * as os from "node:os"; +import * as path from "node:path"; +import type { ExtensionAPI } from "@earendil-works/pi-coding-agent"; +import type { Message } from "@earendil-works/pi-ai"; +import { discoverAgents, type AgentConfig, type AgentScope } from "./agents.js"; +import type { AbortResult, CollectResult, DispatchResult, FanoutJob, JobUsage, StatusResult } from "./types.js"; + +function getPiInvocation(args: string[]): { command: string; args: string[] } { + const currentScript = process.argv[1]; + const isBunVirtualScript = currentScript?.startsWith("/$bunfs/root/"); + if (currentScript && !isBunVirtualScript && fs.existsSync(currentScript)) { + return { command: process.execPath, args: [currentScript, ...args] }; + } + const execName = path.basename(process.execPath).toLowerCase(); + const isGenericRuntime = /^(node|bun)(\.exe)?$/.test(execName); + if (!isGenericRuntime) { + return { command: process.execPath, args }; + } + return { command: "pi", args }; +} + +function getFinalOutput(messages: Message[]): string { + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg.role === "assistant") { + for (const part of msg.content) { + if (part.type === "text") return part.text; + } + } + } + return ""; +} + +function generateId(): string { + return `${Date.now().toString(36)}-${Math.random().toString(36).slice(2, 8)}`; +} + +function jobDir(dataDir: string, jobId: string): string { + return path.join(dataDir, "jobs", jobId); +} + +export class FanoutController { + private pi: ExtensionAPI; + private dataDir: string; + private jobs = new Map(); + private poller: NodeJS.Timeout | null = null; + private agents: AgentConfig[] = []; + private agentScope: AgentScope = "user"; + private projectAgentsDir: string | null = null; + private defaultCwd: string; + + constructor(pi: ExtensionAPI, defaultCwd: string) { + this.pi = pi; + this.defaultCwd = defaultCwd; + this.dataDir = path.join(os.homedir(), ".pi", "fanout"); + fs.mkdirSync(path.join(this.dataDir, "jobs"), { recursive: true }); + this.rehydrate(); + this.startPoller(); + } + + private rehydrate() { + const jobsBase = path.join(this.dataDir, "jobs"); + if (!fs.existsSync(jobsBase)) return; + const dirs = fs.readdirSync(jobsBase, { withFileTypes: true }); + for (const dir of dirs) { + if (!dir.isDirectory()) continue; + const metaPath = path.join(jobsBase, dir.name, "meta.json"); + if (!fs.existsSync(metaPath)) continue; + try { + const raw = fs.readFileSync(metaPath, "utf-8"); + const job = JSON.parse(raw) as FanoutJob; + if (job.status === "running" && job.pid) { + try { + process.kill(job.pid, 0); + } catch { + job.status = "failed"; + job.exitCode = job.exitCode ?? 1; + } + } + this.jobs.set(job.id, job); + } catch { + // ignore corrupt meta + } + } + } + + private persist(job: FanoutJob) { + const dir = jobDir(this.dataDir, job.id); + fs.mkdirSync(dir, { recursive: true }); + fs.writeFileSync(path.join(dir, "meta.json"), JSON.stringify(job, null, 2)); + } + + discoverAgents(cwd: string, scope: AgentScope) { + const result = discoverAgents(cwd, scope); + this.agents = result.agents; + this.agentScope = scope; + this.projectAgentsDir = result.projectAgentsDir; + } + + dispatch( + agentName: string, + task: string, + cwd: string | undefined, + model?: string, + tools?: string[], + agentScope?: AgentScope, + ): DispatchResult { + if (agentScope) this.discoverAgents(cwd ?? this.defaultCwd, agentScope); + const agent = this.agents.find((a) => a.name === agentName); + if (!agent) { + const available = this.agents.map((a) => `"${a.name}"`).join(", ") || "none"; + throw new Error(`Unknown agent: "${agentName}". Available: ${available}.`); + } + + const jobId = generateId(); + const dir = jobDir(this.dataDir, jobId); + fs.mkdirSync(dir, { recursive: true }); + + const job: FanoutJob = { + id: jobId, + createdAt: Date.now(), + agent: agentName, + agentSource: agent.source, + task, + cwd: cwd ?? this.defaultCwd, + model: model || agent.model, + tools: tools || agent.tools, + status: "queued", + outputFile: path.join(dir, "output.jsonl"), + metaFile: path.join(dir, "meta.json"), + notified: false, + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 }, + }; + + this.jobs.set(jobId, job); + this.persist(job); + this.spawnJob(job, agent); + + return { jobId, status: "running", message: `Dispatched job ${jobId} to ${agentName}` }; + } + + private spawnJob(job: FanoutJob, agent: AgentConfig) { + const args: string[] = ["--mode", "json", "-p", "--no-session"]; + if (job.model) args.push("--model", job.model); + if (job.tools && job.tools.length > 0) args.push("--tools", job.tools.join(",")); + + let tmpPromptPath: string | null = null; + + if (agent.systemPrompt.trim()) { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "pi-fanout-")); + const safeName = agent.name.replace(/[^\w.-]+/g, "_"); + tmpPromptPath = path.join(tmpDir, `prompt-${safeName}.md`); + fs.writeFileSync(tmpPromptPath, agent.systemPrompt, { encoding: "utf-8", mode: 0o600 }); + args.push("--append-system-prompt", tmpPromptPath); + } + + args.push(`Task: ${job.task}`); + + const invocation = getPiInvocation(args); + const proc = spawn(invocation.command, invocation.args, { + cwd: job.cwd, + detached: true, + shell: false, + stdio: ["ignore", "pipe", "pipe"], + }); + + job.pid = proc.pid; + job.status = "running"; + this.persist(job); + + const outputFile = fs.openSync(job.outputFile, "w"); + let stdoutBuffer = ""; + let stderrBuffer = ""; + + proc.stdout.on("data", (data: Buffer) => { + stdoutBuffer += data.toString(); + const lines = stdoutBuffer.split("\n"); + stdoutBuffer = lines.pop() ?? ""; + for (const line of lines) { + if (!line.trim()) continue; + fs.writeSync(outputFile, line + "\n"); + this.processOutputLine(job, line); + } + }); + + proc.stderr.on("data", (data: Buffer) => { + stderrBuffer += data.toString(); + }); + + proc.on("close", (code) => { + if (stdoutBuffer.trim()) { + fs.writeSync(outputFile, stdoutBuffer + "\n"); + this.processOutputLine(job, stdoutBuffer); + } + try { fs.closeSync(outputFile); } catch {} + + if (job.status !== "aborted") { + job.exitCode = code ?? 0; + job.status = code === 0 ? "done" : "failed"; + } + if (stderrBuffer.trim()) { + job.errorMessage = stderrBuffer.trim().slice(0, 500); + } + this.persist(job); + if (tmpPromptPath) { + try { fs.unlinkSync(tmpPromptPath); } catch {} + try { fs.rmdirSync(path.dirname(tmpPromptPath)); } catch {} + } + }); + + proc.on("error", () => { + try { fs.closeSync(outputFile); } catch {} + if (job.status !== "aborted") { + job.status = "failed"; + job.exitCode = 1; + job.errorMessage = job.errorMessage || "Failed to spawn process."; + } + this.persist(job); + }); + + proc.unref(); + } + + private processOutputLine(job: FanoutJob, line: string) { + let event: any; + try { event = JSON.parse(line); } catch { return; } + if (event.type === "message_end" && event.message) { + const msg = event.message as Message; + if (msg.role === "assistant") { + job.usage.turns++; + const usage = msg.usage; + if (usage) { + job.usage.input += usage.input || 0; + job.usage.output += usage.output || 0; + job.usage.cacheRead += usage.cacheRead || 0; + job.usage.cacheWrite += usage.cacheWrite || 0; + job.usage.cost += usage.cost?.total || 0; + } + if (!job.modelUsed && msg.model) job.modelUsed = msg.model; + if (msg.stopReason) job.stopReason = msg.stopReason; + if (msg.errorMessage) job.errorMessage = msg.errorMessage; + } + } + } + + private startPoller() { + if (this.poller) return; + this.poller = setInterval(() => { + this.poll(); + this.checkNotifications(); + }, 2000); + } + + stopPoller() { + if (this.poller) { + clearInterval(this.poller); + this.poller = null; + } + } + + private poll() { + for (const job of this.jobs.values()) { + if (job.status !== "running" || !job.pid) continue; + try { + process.kill(job.pid, 0); + } catch { + // Process is gone but we didn't get close event yet (race). Mark failed if not already updated. + if (job.status === "running") { + job.status = "failed"; + job.exitCode = job.exitCode ?? 1; + this.persist(job); + } + } + } + } + + private checkNotifications() { + for (const job of this.jobs.values()) { + if (job.status === "running" || job.status === "queued") continue; + if (job.notified) continue; + job.notified = true; + this.persist(job); + const outcome = job.status === "done" ? "completed" : `failed (exit ${job.exitCode ?? "?"})`; + try { + this.pi.sendUserMessage( + `Fanout job \`${job.id}\` (${job.agent}) ${outcome}. Use \`fanout_collect\` to retrieve results.`, + { deliverAs: "followUp" }, + ); + } catch { + // If sendUserMessage fails (e.g. no active session), ignore. + } + } + } + + status(jobIds?: string[]): StatusResult { + const ids = jobIds?.length ? jobIds : Array.from(this.jobs.keys()); + const jobs: StatusResult["jobs"] = []; + for (const id of ids) { + const job = this.jobs.get(id); + if (!job) continue; + jobs.push({ + id: job.id, + status: job.status, + agent: job.agent, + task: job.task, + exitCode: job.exitCode, + pid: job.pid, + turns: job.usage.turns, + cost: job.usage.cost, + }); + } + return { jobs }; + } + + collect(jobIds: string[]): CollectResult { + const results: CollectResult["results"] = []; + for (const id of jobIds) { + const job = this.jobs.get(id); + if (!job) { + results.push({ + id, + status: "failed", + output: "", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 }, + errorMessage: "Job not found.", + }); + continue; + } + if (job.status === "running" || job.status === "queued") { + results.push({ + id, + status: job.status, + output: "", + usage: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, cost: 0, turns: 0 }, + errorMessage: "Job still running.", + }); + continue; + } + + let output = ""; + if (fs.existsSync(job.outputFile)) { + try { + const lines = fs.readFileSync(job.outputFile, "utf-8").split("\n").filter(Boolean); + const messages: Message[] = []; + for (const line of lines) { + try { + const event = JSON.parse(line); + if (event.type === "message_end" && event.message) messages.push(event.message as Message); + if (event.type === "tool_result_end" && event.message) messages.push(event.message as Message); + } catch {} + } + output = getFinalOutput(messages); + } catch {} + } + + results.push({ + id, + status: job.status, + output, + exitCode: job.exitCode, + usage: job.usage, + modelUsed: job.modelUsed, + errorMessage: job.errorMessage, + }); + } + return { results }; + } + + abort(jobIds: string[]): AbortResult { + const aborted: string[] = []; + const notFound: string[] = []; + const alreadyDone: string[] = []; + for (const id of jobIds) { + const job = this.jobs.get(id); + if (!job) { notFound.push(id); continue; } + if (job.status !== "running" && job.status !== "queued") { + alreadyDone.push(id); + continue; + } + if (job.pid) { + try { + process.kill(job.pid, "SIGTERM"); + setTimeout(() => { + try { process.kill(job.pid!, "SIGKILL"); } catch {} + }, 5000); + } catch {} + } + job.status = "aborted"; + this.persist(job); + aborted.push(id); + } + return { aborted, notFound, alreadyDone }; + } + + cleanupOldJobs(maxAgeMs = 24 * 60 * 60 * 1000) { + const now = Date.now(); + for (const [id, job] of this.jobs.entries()) { + if (job.status === "running" || job.status === "queued") continue; + if (now - job.createdAt > maxAgeMs) { + const dir = jobDir(this.dataDir, id); + try { + fs.rmSync(dir, { recursive: true, force: true }); + } catch {} + this.jobs.delete(id); + } + } + } +} diff --git a/index.ts b/index.ts new file mode 100644 index 0000000..078a5e0 --- /dev/null +++ b/index.ts @@ -0,0 +1,213 @@ +import { Type } from "typebox"; +import type { AgentToolResult } from "@earendil-works/pi-agent-core"; +import { Text } from "@earendil-works/pi-tui"; +import type { ExtensionAPI, ExtensionContext } from "@earendil-works/pi-coding-agent"; +import { FanoutController } from "./controller.js"; +import type { AbortResult, CollectResult, StatusResult } from "./types.js"; + +const TaskItem = Type.Object({ + agent: Type.String({ description: "Name of the agent to invoke" }), + task: Type.String({ description: "Task to delegate" }), + cwd: Type.Optional(Type.String({ description: "Working directory for the agent process" })), + model: Type.Optional(Type.String({ description: "Override model for this task" })), + tools: Type.Optional(Type.Array(Type.String(), { description: "Override tools for this task" })), +}); + +const AgentScopeSchema = Type.Union( + [Type.Literal("user"), Type.Literal("project"), Type.Literal("both")], + { description: 'Which agent directories to use. Default: "user".', default: "user" }, +); + +const DispatchParams = Type.Object({ + tasks: Type.Array(TaskItem, { description: "Array of tasks to dispatch in parallel" }), + agentScope: Type.Optional(AgentScopeSchema), +}); + +const StatusParams = Type.Object({ + jobIds: Type.Optional(Type.Array(Type.String(), { description: "Filter by job IDs. Omit for all jobs.", default: [] })), + includeDone: Type.Optional(Type.Boolean({ description: "Include done/aborted jobs", default: true })), +}); + +const CollectParams = Type.Object({ + jobIds: Type.Array(Type.String(), { description: "Job IDs to collect results for" }), +}); + +const AbortParams = Type.Object({ + jobIds: Type.Array(Type.String(), { description: "Job IDs to abort" }), +}); + +export default function (pi: ExtensionAPI) { + const controller = new FanoutController(pi, process.cwd()); + controller.cleanupOldJobs(); + + pi.registerTool({ + name: "fanout_dispatch", + label: "Fanout Dispatch", + description: + "Dispatch one or more subagent tasks asynchronously. Returns immediately with job IDs. " + + "Jobs run in the background and can be monitored via fanout_status and retrieved via fanout_collect.", + parameters: DispatchParams, + promptSnippet: "fanout_dispatch: tasks=[{agent, task}]", + promptGuidelines: [ + "Use fanout_dispatch to run multiple agents in parallel without blocking the main session.", + "After dispatching, you may do other work. Check status later with fanout_status.", + "When jobs complete, the system will send a follow-up message. Retrieve output with fanout_collect.", + ], + + execute: async ( + _toolCallId, + params, + _signal, + _onUpdate, + ctx, + ): Promise> => { + const scope = params.agentScope ?? "user"; + controller.discoverAgents(ctx.cwd, scope); + + const dispatched: string[] = []; + const errors: string[] = []; + + for (const t of params.tasks) { + try { + const result = controller.dispatch(t.agent, t.task, t.cwd, t.model, t.tools, scope); + dispatched.push(result.jobId); + } catch (e) { + errors.push(`[${t.agent}] ${(e as Error).message}`); + } + } + + const text = dispatched.length + ? `Dispatched ${dispatched.length} job(s). IDs:\n${dispatched.join("\n")}` + : "No jobs dispatched."; + const errorText = errors.length ? `\n\nErrors:\n${errors.join("\n")}` : ""; + + return { + content: [{ type: "text", text: text + errorText }], + details: { dispatched }, + isError: errors.length > 0 && dispatched.length === 0, + }; + }, + + renderCall(args, theme) { + const count = args.tasks?.length ?? 0; + const scope = args.agentScope ?? "user"; + let text = + theme.fg("toolTitle", theme.bold("fanout_dispatch ")) + + theme.fg("accent", `${count} tasks`) + + theme.fg("muted", ` [${scope}]`); + for (const t of args.tasks?.slice(0, 3) ?? []) { + const preview = t.task.length > 40 ? `${t.task.slice(0, 40)}...` : t.task; + text += `\n ${theme.fg("accent", t.agent)}${theme.fg("dim", ` ${preview}`)}`; + } + if (count > 3) text += `\n ${theme.fg("muted", `... +${count - 3} more`)}`; + return new Text(text, 0, 0); + }, + }); + + pi.registerTool({ + name: "fanout_status", + label: "Fanout Status", + description: + "Check status of async fanout jobs. Returns running/done/failed counts and per-job metadata. " + + "Call periodically or after receiving a completion notification.", + parameters: StatusParams, + + execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise> => { + const result = controller.status(params.jobIds?.length ? params.jobIds : undefined); + if (!params.includeDone) { + result.jobs = result.jobs.filter((j) => j.status === "running" || j.status === "queued"); + } + const running = result.jobs.filter((j) => j.status === "running").length; + const queued = result.jobs.filter((j) => j.status === "queued").length; + const done = result.jobs.filter((j) => j.status === "done").length; + const failed = result.jobs.filter((j) => j.status === "failed" || j.status === "aborted").length; + const text = `Jobs: ${result.jobs.length} total — ${running} running, ${queued} queued, ${done} done, ${failed} failed/aborted`; + const detailText = + result.jobs.length > 0 + ? "\n\n" + + result.jobs + .map( + (j) => + `- ${j.id} | ${j.agent} | ${j.status}` + + (j.exitCode !== undefined ? ` (exit ${j.exitCode})` : "") + + (j.turns ? ` | ${j.turns} turns` : ""), + ) + .join("\n") + : ""; + return { + content: [{ type: "text", text: text + detailText }], + details: result, + }; + }, + + renderCall(args, theme) { + const ids = args.jobIds?.length ? `(${args.jobIds.length} ids)` : "(all)"; + const text = theme.fg("toolTitle", theme.bold("fanout_status ")) + theme.fg("accent", ids); + return new Text(text, 0, 0); + }, + }); + + pi.registerTool({ + name: "fanout_collect", + label: "Fanout Collect", + description: + "Retrieve final results from completed fanout jobs. Only works for done/failed jobs. " + + "After collection the jobs remain in the history until cleanup.", + parameters: CollectParams, + + execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise> => { + const result = controller.collect(params.jobIds); + return { + content: [ + { + type: "text", + text: result.results + .map((r) => { + const header = + `[${r.id}] ${r.status}` + + (r.exitCode !== undefined ? ` (exit ${r.exitCode})` : "") + + (r.modelUsed ? ` [${r.modelUsed}]` : ""); + const body = r.output || r.errorMessage || "(no output)"; + return `${header}\n${body}`; + }) + .join("\n\n---\n\n"), + }, + ], + details: result, + }; + }, + + renderCall(args, theme) { + const text = + theme.fg("toolTitle", theme.bold("fanout_collect ")) + + theme.fg("accent", `${args.jobIds.length} jobs`); + return new Text(text, 0, 0); + }, + }); + + pi.registerTool({ + name: "fanout_abort", + label: "Fanout Abort", + description: "Send SIGTERM (then SIGKILL after 5s) to running fanout jobs.", + parameters: AbortParams, + + execute: async (_toolCallId, params, _signal, _onUpdate, _ctx): Promise> => { + const result = controller.abort(params.jobIds); + const parts: string[] = []; + if (result.aborted.length) parts.push(`Aborted: ${result.aborted.join(", ")}`); + if (result.alreadyDone.length) parts.push(`Already done: ${result.alreadyDone.join(", ")}`); + if (result.notFound.length) parts.push(`Not found: ${result.notFound.join(", ")}`); + return { + content: [{ type: "text", text: parts.join("\n") || "Nothing to abort." }], + details: result, + }; + }, + + renderCall(args, theme) { + const text = + theme.fg("toolTitle", theme.bold("fanout_abort ")) + + theme.fg("accent", `${args.jobIds.length} jobs`); + return new Text(text, 0, 0); + }, + }); +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..140d240 --- /dev/null +++ b/package.json @@ -0,0 +1,14 @@ +{ + "name": "pi-fanout", + "version": "0.1.0", + "description": "Non-blocking async agent fanout for pi", + "type": "module", + "main": "index.ts", + "dependencies": { + "@earendil-works/pi-coding-agent": "^1.0.0", + "@earendil-works/pi-agent-core": "^1.0.0", + "@earendil-works/pi-ai": "^1.0.0", + "@earendil-works/pi-tui": "^1.0.0", + "typebox": "^0.0.1" + } +} diff --git a/types.ts b/types.ts new file mode 100644 index 0000000..19d0cbb --- /dev/null +++ b/types.ts @@ -0,0 +1,66 @@ +export interface JobUsage { + input: number; + output: number; + cacheRead: number; + cacheWrite: number; + cost: number; + turns: number; +} + +export interface FanoutJob { + id: string; + createdAt: number; + agent: string; + agentSource: "user" | "project" | "unknown"; + task: string; + cwd: string; + model?: string; + tools?: string[]; + status: "queued" | "running" | "done" | "failed" | "aborted"; + pid?: number; + exitCode?: number; + outputFile: string; + metaFile: string; + notified: boolean; + errorMessage?: string; + usage: JobUsage; + modelUsed?: string; + stopReason?: string; +} + +export interface DispatchResult { + jobId: string; + status: "queued" | "running"; + message: string; +} + +export interface StatusResult { + jobs: Array<{ + id: string; + status: FanoutJob["status"]; + agent: string; + task: string; + exitCode?: number; + pid?: number; + turns?: number; + cost?: number; + }>; +} + +export interface CollectResult { + results: Array<{ + id: string; + status: FanoutJob["status"]; + output: string; + exitCode?: number; + usage: JobUsage; + modelUsed?: string; + errorMessage?: string; + }>; +} + +export interface AbortResult { + aborted: string[]; + notFound: string[]; + alreadyDone: string[]; +}