import { expect, test } from "bun:test"; import { spawnSync } from "node:child_process"; import { mkdirSync, mkdtempSync, realpathSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import path from "node:path"; import type { SwarmTask, UpdateTaskOpts } from "../agents/swarmPeer.ts"; import type { SpawnedPeer, SpawnPeerOptions } from "../agents/swarmLauncher.ts"; import { createTestSettings } from "../testSettings.ts"; import { cancelSpawnedWorkerForTask, getActiveSpawnedWorkerCount, spawnCodeWorker } from "./spawnCodeWorker.ts";
function makeTask(id: string, scope: string): SwarmTask { return { id, scope, type: "implement", title: "Implement thing", description: "Implement thing", requester: "planner-1", assignee: null, status: "open", files: [], result: null, createdAt: 1, updatedAt: 1, changedAt: 1, priority: 0, dependsOn: [], idempotencyKey: null, parentTaskId: null }; }
function makeSettings(workspaceDir: string, dbPath: string) { return createTestSettings({ permissions: { devTasks: { allowedUserIds: ["user-1"], allowedWorkspaceRoots: [workspaceDir] } }, agentStack: { overrides: { devTeam: { codingWorkers: ["codex_cli"], roles: { implementation: "codex_cli" } } }, runtimeConfig: { devTeam: { swarm: { enabled: true, command: "bun", args: ["run", "./mcp-servers/swarm-mcp/src/index.ts"], dbPath }, codexCli: { enabled: true, defaultCwd: workspaceDir, maxTasksPerHour: 100, maxParallelTasks: 4 } } } } }); }
function makeSpawnedPeer(instanceId: string, workspaceDir: string, order: string[]): SpawnedPeer {
let resolveExited!: (value: { code: number | null; signal: NodeJS.Signals | null }) => void;
const exited = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => {
resolveExited = resolve;
});
return {
instanceId,
launchMode: "direct_child",
scope: workspaceDir,
fileRoot: workspaceDir,
workspace: {
repoRoot: workspaceDir,
cwd: workspaceDir,
canonicalCwd: workspaceDir,
relativeCwd: ""
},
adopted: Promise.resolve(),
exited,
outputTail: () => "",
cancel: async (reason?: string) => {
order.push(cancel:${reason});
resolveExited({ code: null, signal: "SIGTERM" });
}
};
}
async function withTempWorkspace(run: (workspaceDir: string, dbPath: string) => Promise) { const tempDir = mkdtempSync(path.join(tmpdir(), "clanky-spawn-code-worker-")); const workspaceDir = path.join(tempDir, "workspace"); mkdirSync(workspaceDir, { recursive: true }); spawnSync("git", ["init", "--quiet"], { cwd: workspaceDir }); try { await run(workspaceDir, path.join(tempDir, "swarm.db")); } finally { rmSync(tempDir, { recursive: true, force: true }); } }
test("cancelSpawnedWorkerForTask marks the task cancelled before stopping the worker", async () => {
await withTempWorkspace(async (workspaceDir, dbPath) => {
const order: string[] = [];
const task = makeTask("task-1", workspaceDir);
const spawned = makeSpawnedPeer("worker-1", workspaceDir, order);
const peer = {
requestTask: async () => task,
assignTask: async (_taskId: string, assignee: string) => {
task.assignee = assignee;
return task;
},
updateTask: async (_taskId: string, opts: UpdateTaskOpts) => {
order.push(update:${opts.result});
task.status = opts.status;
task.result = opts.result ?? null;
return task;
}
};
const result = await spawnCodeWorker({
settings: makeSettings(workspaceDir, dbPath),
task: "Implement thing",
guildId: null,
channelId: "channel-1",
userId: "user-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: () => peer
} as never,
reservationKeeper: {} as never,
spawnPeer: async (_opts: SpawnPeerOptions) => spawned
});
expect(result.taskId).toBe("task-1");
expect(result.workerId).toBe("worker-1");
expect(getActiveSpawnedWorkerCount("codex-cli")).toBe(1);
await expect(cancelSpawnedWorkerForTask("task-1", "operator stop")).resolves.toBe(true);
expect(order).toEqual(["update:operator stop", "cancel:operator stop"]);
expect(task.status).toBe("cancelled");
expect(task.result).toBe("operator stop");
expect(getActiveSpawnedWorkerCount("codex-cli")).toBe(0);
}); });
test("spawnCodeWorker persists a session record on every spawn so followups can find the live worker", async () => {
await withTempWorkspace(async (workspaceDir, dbPath) => {
const order: string[] = [];
const task = makeTask("task-inbox", workspaceDir);
const spawned = makeSpawnedPeer("worker-inbox", workspaceDir, order);
const kvWrites: Array<{ key: string; value: string }> = [];
let capturedOptions: SpawnPeerOptions | null = null;
const peer = {
instanceId: "clanky-planner",
requestTask: async () => task,
assignTask: async (_taskId: string, assignee: string) => {
task.assignee = assignee;
return task;
},
getTask: async () => task,
kvSet: async (key: string, value: string) => {
kvWrites.push({ key, value });
return {
scope: workspaceDir,
key,
value,
updatedAt: Date.now()
};
},
updateTask: async (_taskId: string, opts: UpdateTaskOpts) => {
order.push(update:${opts.result});
task.status = opts.status;
task.result = opts.result ?? null;
return task;
}
};
const result = await spawnCodeWorker({
settings: makeSettings(workspaceDir, dbPath),
task: "Plan an implementation",
role: "design",
guildId: "guild-1",
channelId: "channel-1",
userId: "user-1",
triggerMessageId: "message-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: () => peer
} as never,
reservationKeeper: {} as never,
spawnPeer: async (opts: SpawnPeerOptions) => {
capturedOptions = opts;
return spawned;
}
});
expect(capturedOptions?.role).toBe("planner");
expect(result.sessionKey).toContain("clanky:code_worker_session:last:guild:guild-1:channel:channel-1:user:user-1");
expect(result.persistedSession).toBe(true);
expect(kvWrites.length).toBe(4);
expect(kvWrites.some((entry) => entry.key === "clanky/controller" && entry.value)).toBe(true);
const lastSession = kvWrites.find((entry) => entry.key === result.sessionKey);
expect(lastSession).toBeDefined();
const sessionRecord = JSON.parse(String(lastSession?.value || "{}")) as Record<string, unknown>;
expect(sessionRecord.workerId).toBe("worker-inbox");
expect(sessionRecord.taskId).toBe("task-inbox");
expect(sessionRecord.role).toBe("design");
const workerSession = kvWrites.find((entry) => entry.key === "clanky:code_worker_session:worker:worker-inbox");
expect(workerSession).toBeDefined();
await expect(cancelSpawnedWorkerForTask("task-inbox", "test cleanup")).resolves.toBe(true);
}); });
test("spawnCodeWorker resolves GitHub issue URLs to approved local clones", async () => { await withTempWorkspace(async (workspaceDir, dbPath) => { const repoDir = path.join(workspaceDir, "nested", "clanky"); mkdirSync(repoDir, { recursive: true }); spawnSync("git", ["init", "--quiet"], { cwd: repoDir }); spawnSync("git", ["remote", "add", "origin", "https://github.com/Volpestyle/clanky.git"], { cwd: repoDir });
const realRepoDir = realpathSync(repoDir);
const order: string[] = [];
const task = makeTask("task-github", realRepoDir);
const spawned = makeSpawnedPeer("worker-github", realRepoDir, order);
let capturedOptions: SpawnPeerOptions | null = null;
const peer = {
instanceId: "clanky-planner",
requestTask: async () => task,
assignTask: async (_taskId: string, assignee: string) => {
task.assignee = assignee;
return task;
},
getTask: async () => task,
kvSet: async (key: string, value: string) => ({
scope: realRepoDir,
key,
value,
updatedAt: Date.now()
}),
updateTask: async (_taskId: string, opts: UpdateTaskOpts) => {
task.status = opts.status;
task.result = opts.result ?? null;
return task;
}
};
const result = await spawnCodeWorker({
settings: makeSettings(workspaceDir, dbPath),
task: "Fix https://github.com/Volpestyle/clanky/issues/25",
guildId: null,
channelId: "channel-1",
userId: "user-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: () => peer
} as never,
reservationKeeper: {} as never,
spawnPeer: async (opts: SpawnPeerOptions) => {
capturedOptions = opts;
return spawned;
}
});
expect(capturedOptions?.cwd).toBe(realRepoDir);
expect(result.cwd).toBe(realRepoDir);
await expect(cancelSpawnedWorkerForTask("task-github", "test cleanup")).resolves.toBe(true);
}); });
test("spawnCodeWorker resolves bare cwd under the configured root and allows non-git workspaces", async () => { const tempDir = mkdtempSync(path.join(tmpdir(), "clanky-spawn-code-worker-non-git-")); const workspaceRoot = path.join(tempDir, "volpestyle"); const packageDir = path.join(workspaceRoot, "swarm-test"); mkdirSync(packageDir, { recursive: true }); try { const realPackageDir = realpathSync(packageDir); const order: string[] = []; const task = makeTask("task-non-git", realPackageDir); const spawned = makeSpawnedPeer("worker-non-git", realPackageDir, order); let capturedOptions: SpawnPeerOptions | null = null; let capturedPeerScope = ""; let capturedPeerRepoRoot = ""; let capturedPeerFileRoot = ""; const peer = { instanceId: "clanky-planner", requestTask: async () => task, assignTask: async (_taskId: string, assignee: string) => { task.assignee = assignee; return task; }, kvSet: async (key: string, value: string) => ({ scope: realPackageDir, key, value, updatedAt: Date.now() }), updateTask: async (_taskId: string, opts: UpdateTaskOpts) => { task.status = opts.status; task.result = opts.result ?? null; return task; } };
const result = await spawnCodeWorker({
settings: makeSettings(workspaceRoot, path.join(tempDir, "swarm.db")),
task: "Create a short todo app in this package",
cwd: "swarm-test",
guildId: null,
channelId: "channel-1",
userId: "user-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: (scope: string, repoRoot: string, fileRoot: string) => {
capturedPeerScope = scope;
capturedPeerRepoRoot = repoRoot;
capturedPeerFileRoot = fileRoot;
return peer;
}
} as never,
reservationKeeper: {} as never,
spawnPeer: async (opts: SpawnPeerOptions) => {
capturedOptions = opts;
return spawned;
}
});
expect(capturedOptions?.cwd).toBe(realPackageDir);
expect(capturedPeerScope).toBe(realPackageDir);
expect(capturedPeerRepoRoot).toBe(realPackageDir);
expect(capturedPeerFileRoot).toBe(realPackageDir);
expect(result.cwd).toBe(realPackageDir);
expect(result.scope).toBe(realPackageDir);
await expect(cancelSpawnedWorkerForTask("task-non-git", "test cleanup")).resolves.toBe(true);
} finally { rmSync(tempDir, { recursive: true, force: true }); } });
test("spawnCodeWorker can assign an existing open swarm task instead of creating a duplicate", async () => { await withTempWorkspace(async (workspaceDir, dbPath) => { const order: string[] = []; const task = makeTask("existing-task", workspaceDir); task.title = "Existing planner task"; const spawned = makeSpawnedPeer("worker-existing", workspaceDir, order); let requestTaskCalls = 0; let capturedTaskId: string | null = null; const peer = { instanceId: "clanky-planner", requestTask: async () => { requestTaskCalls += 1; return task; }, getTask: async (taskId: string) => taskId === task.id ? task : null, kvSet: async (key: string, value: string) => ({ scope: workspaceDir, key, value, updatedAt: Date.now() }), assignTask: async (taskId: string, assignee: string) => { capturedTaskId = taskId; task.assignee = assignee; task.status = "claimed"; return task; }, updateTask: async (_taskId: string, opts: UpdateTaskOpts) => { task.status = opts.status; task.result = opts.result ?? null; return task; } };
const result = await spawnCodeWorker({
settings: makeSettings(workspaceDir, dbPath),
task: "Handle existing task",
existingTaskId: "existing-task",
guildId: null,
channelId: "channel-1",
userId: "user-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: () => peer
} as never,
reservationKeeper: {} as never,
spawnPeer: async (_opts: SpawnPeerOptions) => spawned
});
expect(requestTaskCalls).toBe(0);
expect(capturedTaskId).toBe("existing-task");
expect(result.taskId).toBe("existing-task");
expect(result.workerId).toBe("worker-existing");
await expect(cancelSpawnedWorkerForTask("existing-task", "test cleanup")).resolves.toBe(true);
}); });
test("spawnCodeWorker marks newly created tasks failed when worker launch fails", async () => { await withTempWorkspace(async (workspaceDir, dbPath) => { const task = makeTask("task-launch-failed", workspaceDir); const updates: UpdateTaskOpts[] = []; const peer = { instanceId: "clanky-planner", requestTask: async () => task, kvSet: async (key: string, value: string) => ({ scope: workspaceDir, key, value, updatedAt: Date.now() }), assignTask: async (_taskId: string, assignee: string) => { task.assignee = assignee; return task; }, updateTask: async (_taskId: string, opts: UpdateTaskOpts) => { updates.push(opts); task.status = opts.status; task.result = opts.result ?? null; return task; } };
await expect(spawnCodeWorker({
settings: makeSettings(workspaceDir, dbPath),
task: "Implement thing",
guildId: null,
channelId: "channel-1",
userId: "user-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: () => peer
} as never,
reservationKeeper: {} as never,
spawnPeer: async () => {
throw new Error("spawn exploded");
}
})).rejects.toThrow("spawn exploded");
expect(updates).toEqual([{
status: "failed",
result: "spawn_code_worker failed before worker assignment: spawn exploded"
}]);
expect(task.status).toBe("failed");
expect(task.result).toBe("spawn_code_worker failed before worker assignment: spawn exploded");
}); });
test("cancelSpawnedWorkerForTask still stops the worker when the task is already terminal", async () => { await withTempWorkspace(async (workspaceDir, dbPath) => { const order: string[] = []; const task = makeTask("task-2", workspaceDir); const spawned = makeSpawnedPeer("worker-2", workspaceDir, order); const peer = { requestTask: async () => task, assignTask: async (_taskId: string, assignee: string) => { task.assignee = assignee; return task; }, updateTask: async () => { order.push("update"); throw new Error("Task task-2 is already cancelled."); } };
await spawnCodeWorker({
settings: makeSettings(workspaceDir, dbPath),
task: "Implement other thing",
guildId: null,
channelId: "channel-1",
userId: "user-1"
}, {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: {
ensurePeer: () => peer
} as never,
reservationKeeper: {} as never,
spawnPeer: async (_opts: SpawnPeerOptions) => spawned
});
await expect(cancelSpawnedWorkerForTask("task-2", "already cancelled")).resolves.toBe(true);
expect(order).toEqual(["update", "cancel:already cancelled"]);
expect(getActiveSpawnedWorkerCount("codex-cli")).toBe(0);
}); });
test("spawnCodeWorker reuses an idle worker via send_message instead of spawning fresh", async () => {
await withTempWorkspace(async (workspaceDir, dbPath) => {
const order: string[] = [];
let nextTaskId = 1;
const tasks = new Map<string, SwarmTask>();
const makeFreshTask = () => {
const id = task-${nextTaskId++};
const t = makeTask(id, workspaceDir);
tasks.set(id, t);
return t;
};
const spawned = makeSpawnedPeer("worker-reuse", workspaceDir, order);
let spawnPeerCalls = 0;
const sentMessages: Array<{ recipient: string; content: string }> = [];
const assignedTaskIds: string[] = [];
const peer = {
instanceId: "clanky-orchestrator",
requestTask: async () => makeFreshTask(),
getTask: async (taskId: string) => tasks.get(taskId) ?? null,
assignTask: async (taskId: string, assignee: string) => {
const t = tasks.get(taskId);
if (!t) throw new Error(`unknown task ${taskId}`);
t.assignee = assignee;
t.status = "claimed";
assignedTaskIds.push(taskId);
return t;
},
sendMessage: async (recipient: string, content: string) => {
sentMessages.push({ recipient, content });
},
kvSet: async (key: string, value: string) => ({
scope: workspaceDir,
key,
value,
updatedAt: Date.now()
}),
updateTask: async (taskId: string, opts: UpdateTaskOpts) => {
const t = tasks.get(taskId);
if (!t) throw new Error(`unknown task ${taskId}`);
order.push(`update:${taskId}:${opts.status}`);
t.status = opts.status;
t.result = opts.result ?? null;
return t;
}
};
const settings = makeSettings(workspaceDir, dbPath);
const baseArgs = {
settings,
role: "implementation" as const,
guildId: "guild-1",
channelId: "channel-1",
userId: "user-1"
};
const deps = {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: { ensurePeer: () => peer } as never,
reservationKeeper: {} as never,
spawnPeer: async (_opts: SpawnPeerOptions) => {
spawnPeerCalls += 1;
return spawned;
}
};
const first = await spawnCodeWorker({ ...baseArgs, task: "First task" }, deps);
expect(spawnPeerCalls).toBe(1);
expect(first.taskId).toBe("task-1");
expect(first.workerId).toBe("worker-reuse");
expect(getActiveSpawnedWorkerCount("codex-cli")).toBe(1);
// Mark the first task done so the worker enters its idle listen window.
// The next spawn call's refresh pass will flip the worker's idle flag.
tasks.get("task-1")!.status = "done";
const second = await spawnCodeWorker({ ...baseArgs, task: "Follow-up task" }, deps);
expect(spawnPeerCalls).toBe(1);
expect(second.workerId).toBe("worker-reuse");
expect(second.taskId).toBe("task-2");
expect(assignedTaskIds).toEqual(["task-1", "task-2"]);
expect(sentMessages).toHaveLength(1);
expect(sentMessages[0]?.recipient).toBe("worker-reuse");
expect(sentMessages[0]?.content).toContain("task-2");
expect(sentMessages[0]?.content).toContain("Follow-up task");
expect(getActiveSpawnedWorkerCount("codex-cli")).toBe(1);
await expect(cancelSpawnedWorkerForTask("task-2", "test cleanup")).resolves.toBe(true);
expect(getActiveSpawnedWorkerCount("codex-cli")).toBe(0);
}); });
test("spawnCodeWorker falls through to fresh spawn when reuse fails", async () => {
await withTempWorkspace(async (workspaceDir, dbPath) => {
const order: string[] = [];
let nextTaskId = 1;
const tasks = new Map<string, SwarmTask>();
const makeFreshTask = () => {
const id = task-${nextTaskId++};
const t = makeTask(id, workspaceDir);
tasks.set(id, t);
return t;
};
const spawnedFirst = makeSpawnedPeer("worker-stale", workspaceDir, order);
const spawnedSecond = makeSpawnedPeer("worker-fresh", workspaceDir, order);
const spawnedQueue: SpawnedPeer[] = [spawnedFirst, spawnedSecond];
let spawnPeerCalls = 0;
const peer = {
instanceId: "clanky-orchestrator",
requestTask: async () => makeFreshTask(),
getTask: async (taskId: string) => tasks.get(taskId) ?? null,
assignTask: async (taskId: string, assignee: string) => {
const t = tasks.get(taskId);
if (!t) throw new Error(`unknown task ${taskId}`);
t.assignee = assignee;
t.status = "claimed";
return t;
},
sendMessage: async () => {
throw new Error("Instance worker-stale is not active in this scope.");
},
kvSet: async (key: string, value: string) => ({
scope: workspaceDir,
key,
value,
updatedAt: Date.now()
}),
updateTask: async (taskId: string, opts: UpdateTaskOpts) => {
const t = tasks.get(taskId);
if (!t) throw new Error(`unknown task ${taskId}`);
t.status = opts.status;
t.result = opts.result ?? null;
return t;
}
};
const settings = makeSettings(workspaceDir, dbPath);
const baseArgs = {
settings,
role: "implementation" as const,
guildId: "guild-1",
channelId: "channel-1",
userId: "user-1"
};
const deps = {
store: {
countActionsSince: () => 0,
logAction: () => {}
},
peerManager: { ensurePeer: () => peer } as never,
reservationKeeper: {} as never,
spawnPeer: async (_opts: SpawnPeerOptions) => {
spawnPeerCalls += 1;
const next = spawnedQueue.shift();
if (!next) throw new Error("no more spawned peers queued");
return next;
}
};
await spawnCodeWorker({ ...baseArgs, task: "First task" }, deps);
tasks.get("task-1")!.status = "done";
const second = await spawnCodeWorker({ ...baseArgs, task: "Follow-up after stale" }, deps);
expect(spawnPeerCalls).toBe(2);
expect(second.workerId).toBe("worker-fresh");
expect(second.taskId).toBe("task-3");
await expect(cancelSpawnedWorkerForTask(second.taskId, "test cleanup")).resolves.toBe(true);
}); });
