src/agents/swarmActivityBridge.ts

import { cancelSpawnedWorkerForTask } from "../tools/spawnCodeWorker.ts"; import type { ClankyPeer, SwarmContextEntry, SwarmMessage, SwarmTaskStatus } from "./swarmPeer.ts";

/**

  • Per-dispatch routing context. The bridge keeps this so progress and
  • terminal events can be routed back to the surface that triggered the
  • dispatch (text channel, voice session, slash command, etc.). */ export type CodeTaskDispatchContext = { taskId: string; workerId: string; scope: string; guildId: string | null; channelId: string | null; userId: string | null; triggerMessageId: string | null; source: string; };

export type CodeTaskProgressEvent = { context: CodeTaskDispatchContext; annotationId: string; summary: string; createdAt: number; };

export type CodeTaskTerminalEvent = { context: CodeTaskDispatchContext; status: Extract<SwarmTaskStatus, "done" | "failed" | "cancelled">; result: string; };

export type SwarmSpawnRequestRole = "implementation" | "review" | "research";

export type SwarmSpawnRequest = { v: 1; kind: "spawn_request"; taskId: string; role: SwarmSpawnRequestRole; reason: string; priority: number | null; };

export type SwarmSpawnRequestEvent = { scope: string; controllerPeer: ClankyPeer; message: SwarmMessage; request: SwarmSpawnRequest; };

export type SwarmActivityBridgeOptions = { onProgress?: (event: CodeTaskProgressEvent) => void | Promise; onTerminal?: (event: CodeTaskTerminalEvent) => void | Promise; onSpawnRequest?: (event: SwarmSpawnRequestEvent) => void | Promise; /** Polling interval per scope. Defaults to 1500ms. / pollIntervalMs?: number; /* Duplicate spawn_request suppression window. Defaults to 60s. / spawnRequestDedupMs?: number; /* Maximum accepted spawn_requests per sender per minute. Defaults to 5. / spawnRequestRateLimitPerMinute?: number; /*

  • Override the cancel hook so tests don't reach into spawnCodeWorker's
  • shared map. Production callers leave this unset; the bridge defaults
  • to cancelSpawnedWorkerForTask. / cancelWorker?: (taskId: string, reason?: string) => Promise; /* Action-log sink for telemetry. */ logAction?: (entry: Record<string, unknown>) => void; };

function normalizeSpawnRequestRole(value: unknown): SwarmSpawnRequestRole | null { const normalized = String(value || "") .trim() .toLowerCase() .replace(/-/g, "_"); if (normalized === "implementation" || normalized === "implement" || normalized === "implementer" || normalized === "fix") { return "implementation"; } if (normalized === "review" || normalized === "reviewer") return "review"; if (normalized === "research" || normalized === "researcher") return "research"; return null; }

function parseSpawnRequestMessage(content: string): SwarmSpawnRequest | null { let parsed: unknown; try { parsed = JSON.parse(String(content || "")); } catch { return null; } if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) return null; const record = parsed as Record<string, unknown>; if (record.v !== 1 || record.kind !== "spawn_request") return null; const taskId = String(record.taskId || record.task_id || "").trim(); const role = normalizeSpawnRequestRole(record.role); if (!taskId || !role) return null; const reason = String(record.reason || "").trim().slice(0, 500); const rawPriority = Number(record.priority); return { v: 1, kind: "spawn_request", taskId, role, reason, priority: Number.isFinite(rawPriority) ? Math.max(-100, Math.min(100, Math.floor(rawPriority))) : null }; }

/**

  • Long-running subscription that watches swarm activity for tasks Clanky
  • dispatched and surfaces progress/terminal events back to the originating
  • Discord context. Lives across reply turns — progress that arrives after
  • the orchestrator's turn ends still gets delivered.
  • The bridge polls per-scope rather than keeping a websocket open against
  • swarm-mcp's event stream. Polling keeps the implementation simple and
  • matches the existing pattern in swarmTaskWaiter. Cadence is per-scope,
  • not per-task, so a scope with N tasks polls once per interval and reads
  • all of them in a single pass.
  • On status="cancelled", the bridge also kills the backing worker via
  • cancelSpawnedWorkerForTask (closes its swarm-server PTY when path A,
  • SIGTERMs the direct child otherwise) — that's how the orchestrator's
  • update_task(cancelled) and the keyword cancel handler in bot.ts
  • actually stop the running process. */ export class SwarmActivityBridge { private readonly peers: Map<string, ClankyPeer> = new Map(); private readonly contexts: Map<string, CodeTaskDispatchContext> = new Map(); private readonly seenProgressIds: Map<string, Set> = new Map(); private readonly controllerScopes: Set = new Set(); private readonly seenSpawnRequestKeys: Map<string, number> = new Map(); private readonly spawnRequestSenderHits: Map<string, number[]> = new Map(); private readonly polling: Map<string, ReturnType> = new Map(); private readonly pollIntervalMs: number; private readonly spawnRequestDedupMs: number; private readonly spawnRequestRateLimitPerMinute: number; private readonly onProgress?: SwarmActivityBridgeOptions["onProgress"]; private readonly onTerminal?: SwarmActivityBridgeOptions["onTerminal"]; private readonly onSpawnRequest?: SwarmActivityBridgeOptions["onSpawnRequest"]; private readonly cancelWorker: NonNullable<SwarmActivityBridgeOptions["cancelWorker"]>; private readonly logAction?: SwarmActivityBridgeOptions["logAction"]; private closed = false;

constructor(opts: SwarmActivityBridgeOptions = {}) { this.pollIntervalMs = Math.max(250, Math.floor(Number(opts.pollIntervalMs) || 1500)); this.spawnRequestDedupMs = Math.max(1_000, Math.floor(Number(opts.spawnRequestDedupMs) || 60_000)); this.spawnRequestRateLimitPerMinute = Math.max(1, Math.floor(Number(opts.spawnRequestRateLimitPerMinute) || 5)); this.onProgress = opts.onProgress; this.onTerminal = opts.onTerminal; this.onSpawnRequest = opts.onSpawnRequest; this.cancelWorker = opts.cancelWorker ?? ((taskId, reason) => cancelSpawnedWorkerForTask(taskId, reason)); this.logAction = opts.logAction; }

/** Watch this Clanky controller peer's inbox for planner spawn_request escalations. */ watchControllerPeer(peer: ClankyPeer, context: { scope: string }): void { if (this.closed) return; const scope = String(context.scope || peer.scope || "").trim(); if (!scope) return; this.peers.set(scope, peer); this.controllerScopes.add(scope); if (!this.polling.has(scope)) { this.startPolling(scope); } }

/** Register a freshly-dispatched task. The peer is the controller peer for the scope. */ trackTask(peer: ClankyPeer, context: CodeTaskDispatchContext): void { if (this.closed) return; const taskId = String(context.taskId || "").trim(); if (!taskId) return; this.peers.set(context.scope, peer); this.contexts.set(taskId, { ...context, taskId }); this.seenProgressIds.set(taskId, new Set()); if (!this.polling.has(context.scope)) { this.startPolling(context.scope); } }

/** Stop tracking a task without firing terminal — used when a caller takes over. */ forgetTask(taskId: string): void { const id = String(taskId || "").trim(); if (!id) return; this.contexts.delete(id); this.seenProgressIds.delete(id); }

/** Active dispatch contexts for a (guildId, channelId) scope. Used by cancel handlers. */ contextsForScope(filter: { guildId?: string | null; channelId?: string | null }): CodeTaskDispatchContext[] { const guildId = filter.guildId ? String(filter.guildId) : null; const channelId = filter.channelId ? String(filter.channelId) : null; const matches: CodeTaskDispatchContext[] = []; for (const ctx of this.contexts.values()) { if (guildId && ctx.guildId !== guildId) continue; if (channelId && ctx.channelId !== channelId) continue; matches.push(ctx); } return matches; }

/** Number of tasks currently tracked. */ size(): number { return this.contexts.size; }

shutdown(): void { if (this.closed) return; this.closed = true; for (const handle of this.polling.values()) clearInterval(handle); this.polling.clear(); this.peers.clear(); this.contexts.clear(); this.seenProgressIds.clear(); this.controllerScopes.clear(); this.seenSpawnRequestKeys.clear(); this.spawnRequestSenderHits.clear(); }

/** Force a poll cycle now. Exposed for tests. */ async pollOnce(scope?: string): Promise { if (this.closed) return; const scopes = scope ? [scope] : [...this.peers.keys()]; for (const s of scopes) { await this.pollScope(s); } }

private startPolling(scope: string) { const handle = setInterval(() => { void this.pollScope(scope).catch(() => {}); }, this.pollIntervalMs); this.polling.set(scope, handle); }

private stopPollingIfEmpty(scope: string) { const stillTracked = [...this.contexts.values()].some((ctx) => ctx.scope === scope); if (stillTracked || this.controllerScopes.has(scope)) return; const handle = this.polling.get(scope); if (handle) clearInterval(handle); this.polling.delete(scope); this.peers.delete(scope); }

private async pollScope(scope: string): Promise { if (this.closed) return; const peer = this.peers.get(scope); if (!peer) return; if (this.controllerScopes.has(scope)) { await this.processControllerInbox(peer, scope).catch((error) => { this.logAction?.({ kind: "swarm_activity_bridge_error", content: "controller_inbox_poll_failure", metadata: { scope, error: String((error as Error)?.message || error) } }); }); } const tasks = [...this.contexts.values()].filter((ctx) => ctx.scope === scope); if (tasks.length === 0) { this.stopPollingIfEmpty(scope); return; } for (const ctx of tasks) { await this.processTask(peer, ctx).catch((error) => { this.logAction?.({ kind: "swarm_activity_bridge_error", content: "poll_failure", metadata: { taskId: ctx.taskId, scope: ctx.scope, error: String((error as Error)?.message || error) } }); }); } }

private async processControllerInbox(peer: ClankyPeer, scope: string): Promise { if (!this.onSpawnRequest) return; const messages = await peer.pollMessages(50); for (const message of messages) { const request = parseSpawnRequestMessage(message.content); if (!request) continue; if (this.isDuplicateSpawnRequest(message, request)) continue; if (!this.acceptSpawnRequestRate(message, request, scope)) continue; await this.onSpawnRequest({ scope, controllerPeer: peer, message, request }); } }

private isDuplicateSpawnRequest(message: SwarmMessage, request: SwarmSpawnRequest): boolean { const now = Date.now(); const cutoff = now - this.spawnRequestDedupMs; for (const [key, seenAt] of this.seenSpawnRequestKeys.entries()) { if (seenAt < cutoff) this.seenSpawnRequestKeys.delete(key); } const key = ${message.sender}:${request.kind}:${request.taskId}; const seenAt = this.seenSpawnRequestKeys.get(key); if (seenAt && seenAt >= cutoff) return true; this.seenSpawnRequestKeys.set(key, now); return false; }

private acceptSpawnRequestRate(message: SwarmMessage, request: SwarmSpawnRequest, scope: string): boolean { const now = Date.now(); const cutoff = now - 60_000; const sender = String(message.sender || "").trim() || "unknown"; const hits = (this.spawnRequestSenderHits.get(sender) || []).filter((stamp) => stamp >= cutoff); if (hits.length >= this.spawnRequestRateLimitPerMinute) { this.spawnRequestSenderHits.set(sender, hits); this.logAction?.({ kind: "swarm_spawn_request_rate_limited", content: "spawn_request_rate_limited", metadata: { scope, sender, taskId: request.taskId, limitPerMinute: this.spawnRequestRateLimitPerMinute } }); return false; } hits.push(now); this.spawnRequestSenderHits.set(sender, hits); return true; }

private async processTask(peer: ClankyPeer, ctx: CodeTaskDispatchContext): Promise { const task = await peer.getTask(ctx.taskId); if (!task) return;

// Progress: emit one event per new `kind="progress"` annotation.
const annotations = await peer.checkFile(ctx.taskId).catch(() => [] as SwarmContextEntry[]);
const seen = this.seenProgressIds.get(ctx.taskId);
if (seen) {
  for (const ann of annotations) {
    if (ann.type !== "progress") continue;
    if (seen.has(ann.id)) continue;
    seen.add(ann.id);
    if (this.onProgress) {
      await this.onProgress({
        context: ctx,
        annotationId: ann.id,
        summary: String(ann.content || "").trim(),
        createdAt: Number(ann.createdAt) || Date.now()
      });
    }
  }
}

if (task.status === "done" || task.status === "failed" || task.status === "cancelled") {
  const status = task.status;
  const result = String(task.result || "").trim();
  let terminalError: unknown = null;
  try {
    if (this.onTerminal) {
      await this.onTerminal({ context: ctx, status, result });
    }
  } catch (error) {
    terminalError = error;
  } finally {
    this.contexts.delete(ctx.taskId);
    this.seenProgressIds.delete(ctx.taskId);
  }
  if (status === "cancelled") {
    // SIGTERM the worker if it's still running. Idempotent — returns false
    // if the worker has already exited or was never tracked.
    await this.cancelWorker(ctx.taskId, "swarm task cancelled").catch(() => false);
  }
  this.stopPollingIfEmpty(ctx.scope);
  if (terminalError) throw terminalError;
}

} }