#!/usr/bin/env bun /**
- Fake swarm worker fixture used by Wave 2 tests for the swarm-launcher
- redesign. Simulates the behaviors a real claude-code / codex-cli worker
- exhibits after Clanky pre-creates an instance row and spawns it with
- the SWARM_MCP_* env vars.
- This fixture writes directly to the swarm-mcp SQLite DB. It does NOT
- mount a real MCP server. That is intentional: launcher tests verify the
- reserve → adopt → exit sequence, which only depends on row state. If a
- future test needs full MCP-protocol fidelity, spawn an actual swarm-mcp
- subprocess instead.
- Inputs (all read from process.env):
- Required (matches what swarm-mcp's tryAutoAdopt reads):
-
SWARM_DB_PATH path to swarm.db -
SWARM_MCP_INSTANCE_ID reserved instance UUID - Optional identity (passed through to the row on adopt):
-
SWARM_MCP_LABEL label tokens - Behavior selection:
-
FAKE_WORKER_BEHAVIOR one of: -
adopt_then_exit (default) -
claim_and_complete -
claim_and_fail -
progress_then_complete -
crash_after_adopt -
never_adopt -
hang - Task-related (required for claim_, progress_):
-
FAKE_WORKER_TASK_ID task id to claim/update -
FAKE_WORKER_RESULT_TEXT final result text (default "fake worker result") -
FAKE_WORKER_ERROR_MESSAGE failure result text (default "fake worker error") -
FAKE_WORKER_USAGE_JSON JSON to post as kind="usage" annotation -
FAKE_WORKER_PROGRESS_COUNT progress notes to emit (default 3) -
FAKE_WORKER_PROGRESS_INTERVAL_MS ms between progress notes (default 50) - Timing:
-
FAKE_WORKER_DELAY_MS sleep before adopting (default 0) -
FAKE_WORKER_HANG_MS max sleep for "hang" behavior (default 60000) - Exit code:
- 0 — happy-path completion (including expected failure modes that posted to the task ledger)
- 1 — explicit crash behaviors and missing-required-env errors */ import { Database } from "bun:sqlite"; import { randomUUID } from "node:crypto";
type Behavior = | "adopt_then_exit" | "claim_and_complete" | "claim_and_fail" | "progress_then_complete" | "crash_after_adopt" | "never_adopt" | "hang";
const BEHAVIORS: ReadonlySet = new Set([ "adopt_then_exit", "claim_and_complete", "claim_and_fail", "progress_then_complete", "crash_after_adopt", "never_adopt", "hang" ]);
function readBehavior(): Behavior {
const raw = String(process.env.FAKE_WORKER_BEHAVIOR || "adopt_then_exit").trim();
if (BEHAVIORS.has(raw as Behavior)) return raw as Behavior;
throw new Error(fakeSwarmWorker: unknown FAKE_WORKER_BEHAVIOR=${raw});
}
function requireEnv(name: string): string {
const value = String(process.env[name] || "").trim();
if (!value) throw new Error(fakeSwarmWorker: missing required env ${name});
return value;
}
function readNumber(name: string, fallback: number): number {
const raw = process.env[name];
if (raw === undefined || raw === "") return fallback;
const parsed = Number(raw);
if (!Number.isFinite(parsed)) {
throw new Error(fakeSwarmWorker: ${name} must be numeric, got ${raw});
}
return parsed;
}
function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); }
function openDb(dbPath: string) { const db = new Database(dbPath); db.exec("PRAGMA journal_mode = WAL"); db.exec("PRAGMA busy_timeout = 3000"); return db; }
function adoptInstance(db: Database, instanceId: string, label: string | null) {
// Mirrors swarm-mcp/src/registry.ts::register adoption path:
// UPDATE instances SET pid=?, label=?, adopted=1, heartbeat=unixepoch() WHERE id=?
// We respect any non-null label that's already on the row (matches
// nextLabel = trimmedLabel ?? existing.label).
const existing = db
.query("SELECT label, scope FROM instances WHERE id = ?")
.get(instanceId) as { label: string | null; scope: string } | null;
if (!existing) {
throw new Error(fakeSwarmWorker: no reserved instance row for id=${instanceId});
}
const nextLabel = label ?? existing.label;
db.run(
UPDATE instances SET pid = ?, label = ?, adopted = 1, heartbeat = unixepoch() WHERE id = ?,
[process.pid, nextLabel, instanceId]
);
emitEvent(db, existing.scope, "instance.registered", instanceId, instanceId, {
label: nextLabel,
adopted: true,
pid: process.pid,
fake_worker: true
});
return existing.scope;
}
function claimTask(db: Database, taskId: string, instanceId: string, scope: string) {
// Status transitions to 'claimed' immediately. The real implementation
// walks dependencies and may end up at 'in_progress' instead, but for
// launcher tests we only care that an assignee + status flip lands.
const ts = Date.now();
const updated = db.run(
UPDATE tasks SET assignee = ?, status = 'claimed', updated_at = unixepoch(), changed_at = ? WHERE id = ? AND scope = ?,
[instanceId, ts, taskId, scope]
);
if (updated.changes === 0) {
throw new Error(fakeSwarmWorker: task ${taskId} not found in scope ${scope});
}
emitEvent(db, scope, "task.claimed", instanceId, taskId, { fake_worker: true });
}
function updateTaskTerminal(
db: Database,
taskId: string,
scope: string,
status: "done" | "failed",
result: string
) {
const ts = Date.now();
db.run(
UPDATE tasks SET status = ?, result = ?, updated_at = unixepoch(), changed_at = ? WHERE id = ? AND scope = ?,
[status, result, ts, taskId, scope]
);
emitEvent(db, scope, task.${status}, null, taskId, {
fake_worker: true,
ts
});
}
function annotate(
db: Database,
scope: string,
instanceId: string,
file: string,
kind: string,
content: string
) {
db.run(
INSERT INTO context (id, scope, instance_id, file, type, content) VALUES (?, ?, ?, ?, ?, ?),
[randomUUID(), scope, instanceId, file, kind, content]
);
emitEvent(db, scope, "context.annotated", instanceId, file, {
kind,
fake_worker: true
});
}
function emitEvent(
db: Database,
scope: string,
type: string,
actor: string | null,
subject: string,
payload: Record<string, unknown>
) {
db.run(
INSERT INTO events (scope, type, actor, subject, payload) VALUES (?, ?, ?, ?, ?),
[scope, type, actor, subject, JSON.stringify(payload)]
);
}
async function runBehavior(behavior: Behavior) { if (behavior === "never_adopt") { // Simulate a worker that boots but its MCP layer never adopts the row. // Sleeps for the hang window so the launcher's adoption-timeout trips. const hangMs = readNumber("FAKE_WORKER_HANG_MS", 60_000); await sleep(hangMs); return 0; }
const dbPath = requireEnv("SWARM_DB_PATH"); const instanceId = requireEnv("SWARM_MCP_INSTANCE_ID"); const label = process.env.SWARM_MCP_LABEL?.trim() || null; const delayMs = readNumber("FAKE_WORKER_DELAY_MS", 0);
if (delayMs > 0) await sleep(delayMs);
const stdoutMarker = process.env.FAKE_WORKER_STDOUT_MARKER?.trim();
if (stdoutMarker) {
process.stdout.write(${stdoutMarker} );
}
const db = openDb(dbPath); let scope: string; try { scope = adoptInstance(db, instanceId, label); } catch (error) { db.close(); throw error; }
try { switch (behavior) { case "adopt_then_exit": return 0;
case "crash_after_adopt":
// Adopted but never reports task status. Simulates `claude` crashing
// after MCP boot but before the model finishes.
return 1;
case "hang": {
const hangMs = readNumber("FAKE_WORKER_HANG_MS", 60_000);
await sleep(hangMs);
return 0;
}
case "claim_and_complete": {
const taskId = requireEnv("FAKE_WORKER_TASK_ID");
const resultText = process.env.FAKE_WORKER_RESULT_TEXT ?? "fake worker result";
claimTask(db, taskId, instanceId, scope);
const usageJson = process.env.FAKE_WORKER_USAGE_JSON?.trim();
if (usageJson) {
annotate(db, scope, instanceId, taskId, "usage", usageJson);
}
updateTaskTerminal(db, taskId, scope, "done", resultText);
return 0;
}
case "claim_and_fail": {
const taskId = requireEnv("FAKE_WORKER_TASK_ID");
const errorText = process.env.FAKE_WORKER_ERROR_MESSAGE ?? "fake worker error";
claimTask(db, taskId, instanceId, scope);
updateTaskTerminal(db, taskId, scope, "failed", errorText);
return 0;
}
case "progress_then_complete": {
const taskId = requireEnv("FAKE_WORKER_TASK_ID");
const resultText = process.env.FAKE_WORKER_RESULT_TEXT ?? "fake worker result";
const progressCount = Math.max(0, Math.floor(readNumber("FAKE_WORKER_PROGRESS_COUNT", 3)));
const intervalMs = Math.max(0, Math.floor(readNumber("FAKE_WORKER_PROGRESS_INTERVAL_MS", 50)));
claimTask(db, taskId, instanceId, scope);
for (let i = 0; i < progressCount; i++) {
annotate(db, scope, instanceId, taskId, "progress", `step ${i + 1} of ${progressCount}`);
if (intervalMs > 0 && i < progressCount - 1) await sleep(intervalMs);
}
const usageJson = process.env.FAKE_WORKER_USAGE_JSON?.trim();
if (usageJson) {
annotate(db, scope, instanceId, taskId, "usage", usageJson);
}
updateTaskTerminal(db, taskId, scope, "done", resultText);
return 0;
}
}
} finally { db.close(); } }
const isMain = import.meta.path === Bun.main;
if (isMain) {
const behavior = readBehavior();
runBehavior(behavior)
.then((code) => process.exit(code))
.catch((error) => {
console.error([fakeSwarmWorker] ${(error as Error)?.message || error});
process.exit(1);
});
}
// Exported for tests that want to invoke the fixture in-process instead of // spawning a subprocess. export { runBehavior, type Behavior };
