import { afterEach, beforeEach, expect, test } from "bun:test"; import { Database } from "bun:sqlite"; import { existsSync, mkdtempSync, mkdirSync, rmSync } from "node:fs"; import { tmpdir } from "node:os"; import path from "node:path"; import { spawn, type ChildProcessWithoutNullStreams } from "node:child_process"; import { bootstrapSwarmTestSchema } from "./fixtures/swarmTestSchema.ts"; import { deleteUnadopted, fullDeregister, heartbeatUnadopted, isAdopted, reserveInstance } from "./swarmDb.ts";
let tempDir: string; let dbPath: string; let workspaceDir: string;
beforeEach(() => { tempDir = mkdtempSync(path.join(tmpdir(), "clanky-swarm-db-")); dbPath = path.join(tempDir, "swarm.db"); workspaceDir = path.join(tempDir, "workspace"); mkdirSync(workspaceDir, { recursive: true }); bootstrapSwarmTestSchema(dbPath); });
afterEach(() => { rmSync(tempDir, { recursive: true, force: true }); });
function readInstance(id: string) { const verify = new Database(dbPath, { readonly: true }); try { return verify .query( "SELECT id, scope, directory, root, file_root, pid, label, adopted FROM instances WHERE id = ?" ) .get(id) as | { id: string; scope: string; directory: string; root: string; file_root: string; pid: number; label: string | null; adopted: number; } | null; } finally { verify.close(); } }
test("reserveInstance writes pid=0, adopted=0 and resolves git root for scope", () => { mkdirSync(path.join(workspaceDir, ".git"), { recursive: true }); const innerDir = path.join(workspaceDir, "src", "agents"); mkdirSync(innerDir, { recursive: true });
const reserved = reserveInstance({ dbPath, directory: innerDir, label: "origin:clanky role:implementer" });
expect(reserved.id).toMatch(/^[0-9a-f-]{36}$/); expect(reserved.directory).toBe(path.resolve(innerDir)); expect(reserved.root).toBe(path.resolve(workspaceDir)); expect(reserved.scope).toBe(path.resolve(workspaceDir)); expect(reserved.fileRoot).toBe(path.resolve(innerDir));
const row = readInstance(reserved.id); expect(row).not.toBeNull(); expect(row?.pid).toBe(0); expect(row?.adopted).toBe(0); expect(row?.label).toBe("origin:clanky role:implementer"); expect(row?.directory).toBe(path.resolve(innerDir)); expect(row?.root).toBe(path.resolve(workspaceDir)); expect(row?.scope).toBe(path.resolve(workspaceDir)); expect(row?.file_root).toBe(path.resolve(innerDir)); });
test("reserveInstance falls back to directory itself when no .git ancestor exists", () => { const reserved = reserveInstance({ dbPath, directory: workspaceDir, label: "origin:clanky" }); expect(reserved.scope).toBe(path.resolve(workspaceDir)); expect(reserved.root).toBe(path.resolve(workspaceDir)); });
test("reserveInstance honors explicit scope override and fileRoot", () => { mkdirSync(path.join(workspaceDir, ".git"), { recursive: true }); const innerDir = path.join(workspaceDir, "src"); mkdirSync(innerDir, { recursive: true });
const reserved = reserveInstance({ dbPath, directory: innerDir, scope: "/custom/scope", fileRoot: "/custom/file-root", label: "origin:clanky" }); expect(reserved.scope).toBe(path.resolve("/custom/scope")); expect(reserved.fileRoot).toBe(path.resolve("/custom/file-root")); expect(reserved.root).toBe(path.resolve(workspaceDir)); });
test("heartbeatUnadopted refreshes pre-adopt rows and skips adopted ones", async () => { const reserved = reserveInstance({ dbPath, directory: workspaceDir, label: "origin:clanky" });
const stomp = new Database(dbPath); try { stomp.run("UPDATE instances SET heartbeat = 1 WHERE id = ?", [reserved.id]); } finally { stomp.close(); }
expect(heartbeatUnadopted(dbPath, reserved.id)).toBe(true);
const verify = new Database(dbPath, { readonly: true }); try { const row = verify .query("SELECT heartbeat FROM instances WHERE id = ?") .get(reserved.id) as { heartbeat: number }; expect(row.heartbeat).toBeGreaterThan(1); } finally { verify.close(); }
const adopt = new Database(dbPath); try { adopt.run("UPDATE instances SET adopted = 1 WHERE id = ?", [reserved.id]); } finally { adopt.close(); } expect(heartbeatUnadopted(dbPath, reserved.id)).toBe(false); });
test("deleteUnadopted removes pre-adopt rows and leaves adopted ones intact", () => { const a = reserveInstance({ dbPath, directory: workspaceDir, label: "a" }); const b = reserveInstance({ dbPath, directory: workspaceDir, label: "b" });
const adopt = new Database(dbPath); try { adopt.run("UPDATE instances SET adopted = 1, pid = 99 WHERE id = ?", [b.id]); } finally { adopt.close(); }
expect(deleteUnadopted(dbPath, a.id)).toBe(true); expect(deleteUnadopted(dbPath, b.id)).toBe(false); expect(readInstance(a.id)).toBeNull(); expect(readInstance(b.id)?.adopted).toBe(1); });
test("isAdopted reports the row's adoption state and null when missing", () => { const reserved = reserveInstance({ dbPath, directory: workspaceDir, label: "x" }); expect(isAdopted(dbPath, reserved.id)).toBe(false);
const adopt = new Database(dbPath); try { adopt.run("UPDATE instances SET adopted = 1 WHERE id = ?", [reserved.id]); } finally { adopt.close(); } expect(isAdopted(dbPath, reserved.id)).toBe(true);
expect(isAdopted(dbPath, "missing-id")).toBeNull(); });
test("fullDeregister cascades cleanup and emits instance.deregistered", () => { const reserved = reserveInstance({ dbPath, directory: workspaceDir, label: "role:x" });
const seed = new Database(dbPath);
try {
seed.run("UPDATE instances SET adopted = 1, pid = 42 WHERE id = ?", [reserved.id]);
seed.run(
INSERT INTO context (id, scope, instance_id, file, type, content) VALUES ('lock1', ?, ?, '/repo/a.txt', 'lock', 'held'),
[reserved.scope, reserved.id]
);
seed.run(
INSERT INTO tasks (id, scope, type, title, requester, assignee, status) VALUES ('t-claimed', ?, 'implement', 'claimed', 'planner', ?, 'claimed'),
[reserved.scope, reserved.id]
);
seed.run(
INSERT INTO tasks (id, scope, type, title, requester, assignee, status) VALUES ('t-blocked', ?, 'implement', 'blocked', 'planner', ?, 'blocked'),
[reserved.scope, reserved.id]
);
seed.run(
INSERT INTO messages (scope, sender, recipient, content) VALUES (?, 'other', ?, 'queued'),
[reserved.scope, reserved.id]
);
} finally {
seed.close();
}
fullDeregister(dbPath, reserved.id);
const verify = new Database(dbPath, { readonly: true }); try { expect(readInstance(reserved.id)).toBeNull();
const lockCount = (verify
.query("SELECT COUNT(*) AS n FROM context WHERE instance_id = ?")
.get(reserved.id) as { n: number }).n;
expect(lockCount).toBe(0);
const msgCount = (verify
.query("SELECT COUNT(*) AS n FROM messages WHERE recipient = ?")
.get(reserved.id) as { n: number }).n;
expect(msgCount).toBe(0);
const claimed = verify
.query("SELECT status, assignee FROM tasks WHERE id = 't-claimed'")
.get() as { status: string; assignee: string | null };
expect(claimed.status).toBe("open");
expect(claimed.assignee).toBeNull();
const blocked = verify
.query("SELECT status, assignee FROM tasks WHERE id = 't-blocked'")
.get() as { status: string; assignee: string | null };
expect(blocked.status).toBe("blocked");
expect(blocked.assignee).toBeNull();
const event = verify
.query("SELECT type, subject FROM events WHERE type = 'instance.deregistered'")
.get() as { type: string; subject: string } | null;
expect(event?.subject).toBe(reserved.id);
} finally { verify.close(); } });
test("fullDeregister is a no-op when the instance row is gone", () => { expect(() => fullDeregister(dbPath, "missing-id")).not.toThrow(); });
async function waitFor(
predicate: () => boolean,
{ timeoutMs = 8000, intervalMs = 50 }: { timeoutMs?: number; intervalMs?: number } = {}
) {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
if (predicate()) return;
await new Promise((resolve) => setTimeout(resolve, intervalMs));
}
throw new Error(Timed out after ${timeoutMs}ms waiting for condition.);
}
async function closeChild(child: ChildProcessWithoutNullStreams) { if (child.exitCode !== null || child.killed) return; const closed = new Promise((resolve) => { child.once("close", () => resolve()); }); try { child.kill("SIGTERM"); } catch { // ignore } await Promise.race([ closed, new Promise((resolve) => setTimeout(resolve, 1000)) ]); }
test("reserved row is adopted by a real swarm-mcp child via SWARM_MCP_INSTANCE_ID", async () => { const swarmIndex = path.resolve(process.cwd(), "./mcp-servers/swarm-mcp/src/index.ts"); if (!existsSync(swarmIndex)) { // Submodule not initialized — skip the live integration leg. return; }
const reserved = reserveInstance({ dbPath, directory: workspaceDir, label: "origin:clanky role:implementer thread:dm user:anon" });
const child = spawn(process.execPath, ["run", swarmIndex], { env: { ...process.env, SWARM_DB_PATH: dbPath, SWARM_MCP_INSTANCE_ID: reserved.id, SWARM_MCP_DIRECTORY: workspaceDir, SWARM_MCP_SCOPE: reserved.scope, SWARM_MCP_FILE_ROOT: reserved.fileRoot, SWARM_MCP_LABEL: "origin:clanky role:implementer thread:dm user:anon" }, stdio: ["pipe", "pipe", "pipe"] });
let stderr = ""; child.stderr.on("data", (chunk) => { stderr += String(chunk || ""); });
try { await waitFor( () => isAdopted(dbPath, reserved.id) === true, { timeoutMs: 10000 } ); } catch (error) { throw new Error( `swarm-mcp child failed to adopt reservation. stderr= ${stderr}
${(error as Error).message}` ); } finally { await closeChild(child); } });
