integrations/_shared/swarm_hook_core.py

"""Runtime-agnostic hook core for swarm subprocess-based plugins.

The Claude Code and Codex CLI plugins both use the same shape of integration: short-lived subprocess hooks that receive a JSON payload on stdin, optionally print one back on stdout, and shell out to the swarm-mcp CLI for coordination side-effects. This module contains the runtime-agnostic core they share. Each plugin's _common.py instantiates HookCore with a RuntimeConfig that captures the per-runtime differences:

  • Which tool name(s) constitute a "write" event.
  • How to extract file paths from a write tool's input (Claude Code uses tool_input.file_path; codex parses an apply_patch envelope).
  • The label token, env-var prefix, and scratch directory namespace that distinguish one runtime's sessions from another's in the same coordination scope.

Hermes does not use this module — it integrates with hermes-agent's in-process plugin API directly, where stdin/stdout subprocess machinery would be the wrong shape. """

from future import annotations

import json import os import shlex import shutil import subprocess import tempfile from dataclasses import dataclass from pathlib import Path from typing import Callable, Optional

try: import swarm_adapter_contract as contract except ModuleNotFoundError: # pragma: no cover - package import path for tests from . import swarm_adapter_contract as contract

try: import herdr_agent_report except ModuleNotFoundError: # pragma: no cover - package import path for tests from . import herdr_agent_report

_WARNED_SKIP_REGISTRATION: set[str] = set()

def _warn_skip_registration_once(env_prefix: str) -> None: if env_prefix in _WARNED_SKIP_REGISTRATION: return _WARNED_SKIP_REGISTRATION.add(env_prefix) import sys

runtime_hint = {
    "CC": "claude-code",
    "CODEX": "codex",
}.get(env_prefix, f"runtime with env prefix {env_prefix}")
print(
    f"[swarm-mcp] {runtime_hint}: session has no AGENT_IDENTITY / "
    f"SWARM_{env_prefix}_IDENTITY / SWARM_IDENTITY env set; swarm "
    f"registration skipped. Raw binaries don't join the swarm — launch "
    f"via a per-profile worker/gateway alias from "
    f"swarm-mcp/env/launchers.zsh.example (or your own copy) to "
    f"coordinate. Set SWARM_MCP_ALLOW_UNLABELED=1 to opt back in to "
    f"unlabeled registration from a trusted shell.",
    file=sys.stderr,
)

@dataclass(frozen=True) class RuntimeConfig: """Per-runtime knobs that parameterize HookCore.

Attributes:
    runtime_name: Token used in derived labels, e.g. ``"claude-code"``.
    env_prefix: Used to build ``SWARM_<prefix>_*`` env-var aliases.
    scratch_dir_name: Top-level dir under ``$TMPDIR`` for per-session
        scratch (e.g. ``"swarm-cc"``).
    write_tools: Set of tool names that should trigger the pre-tool lock
        check.
    extract_paths: Function ``(tool_name, tool_input) -> [absolute_path]``.
        Receives ``tool_input`` exactly as it appears in the hook payload.
    soul_path: Optional path to a runtime-specific ``SOUL.md`` identity doc.
        When set and the session is registered as a gateway/lead, the file's
        contents are appended to the SessionStart ``additionalContext`` so
        the agent boots with its gateway-mode identity attached. Workers
        don't see it.
"""

runtime_name: str
env_prefix: str
scratch_dir_name: str
write_tools: frozenset[str]
extract_paths: Callable[[str, object], list[str]]
soul_path: Optional[Path] = None

class HookCore: """Runtime-agnostic hook implementation.

Public methods come in two flavors:

1. **Building blocks** (``swarm_cmd``, ``run_swarm``, ``derived_label``,
   ``list_locks``, ``find_peer_lock_conflict``, …) — for hook scripts
   that need fine-grained control.
2. **Top-level hook entry points** (``run_session_start_hook``,
   ``run_pre_tool_use_hook``, ``run_post_tool_use_hook``,
   ``run_session_end_hook``) — drive the full hook lifecycle from a
   stdin file object. Plugin entry scripts can be three-line stubs that
   just call these.

The pre-tool hook is check-only: it inspects existing locks and denies
when a peer holds the target file. It does not acquire on behalf of the
write tool, so ``run_post_tool_use_hook`` is a no-op kept as a stable
entry point for plugin configs that still wire ``PostToolUse``.
"""

def __init__(self, config: RuntimeConfig):
    self.config = config

# -- swarm-mcp CLI resolution -------------------------------------------

def swarm_cmd(self) -> Optional[list[str]]:
    explicit = os.environ.get("SWARM_MCP_BIN")
    if explicit:
        cmd = self._command_from_explicit(explicit)
        if cmd:
            return cmd

    bin_ = shutil.which("swarm-mcp")
    if bin_:
        return [bin_]

    # Walk up from this file to the swarm-mcp repo root. _shared lives at
    # <repo>/integrations/_shared/, so parents[2] = <repo>.
    repo_root = Path(__file__).resolve().parents[2]
    src_cli = repo_root / "src" / "cli.ts"
    if src_cli.exists() and shutil.which("bun"):
        return ["bun", "run", str(src_cli)]

    dist_cli = repo_root / "dist" / "cli.js"
    if dist_cli.exists() and shutil.which("node"):
        return ["node", str(dist_cli)]

    return None

@staticmethod
def _command_from_explicit(value: str) -> Optional[list[str]]:
    parts = shlex.split(value)
    if not parts:
        return None
    if shutil.which(parts[0]):
        return parts
    target = Path(parts[0]).expanduser()
    if target.exists() and target.suffix == ".ts" and shutil.which("bun"):
        return ["bun", "run", str(target), *parts[1:]]
    if target.exists() and target.suffix == ".js" and shutil.which("node"):
        return ["node", str(target), *parts[1:]]
    return None

def run_swarm(self, args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
    cmd = self.swarm_cmd()
    if not cmd:
        return 127, "", "swarm-mcp CLI not found"
    try:
        proc = subprocess.run(
            [*cmd, *args],
            capture_output=True,
            text=True,
            timeout=timeout,
            cwd=self.session_cwd(),
        )
        return proc.returncode, proc.stdout, proc.stderr
    except subprocess.TimeoutExpired:
        return 124, "", "swarm-mcp CLI timed out"
    except Exception as exc:  # pragma: no cover -- surface in stderr
        return 1, "", f"swarm-mcp CLI failed: {exc}"

# -- Identity / scope / label -------------------------------------------

@staticmethod
def session_cwd() -> str:
    return (
        os.environ.get("SWARM_MCP_DIRECTORY")
        or os.environ.get("TERMINAL_CWD")
        or os.getcwd()
    )

def _env(self, suffix: str) -> Optional[str]:
    return os.environ.get(f"SWARM_{self.config.env_prefix}_{suffix}")

@staticmethod
def _truthy(value: Optional[str]) -> bool:
    return contract.truthy(value)

def plugin_role(self) -> str:
    """Return the adapter behavior role, not the swarm-visible skill role."""
    raw = (self._env("ROLE") or os.environ.get("SWARM_ROLE") or "worker").strip().lower()
    return raw if raw in {"worker", "gateway"} else "worker"

def lease_seconds(self) -> int:
    raw = self._env("LEASE_SECONDS") or os.environ.get("SWARM_LEASE_SECONDS")
    if not raw:
        return 86400
    try:
        value = int(raw)
    except ValueError:
        return 86400
    return max(0, value)

def scope_arg(self) -> Optional[str]:
    return (
        self._env("SCOPE")
        or os.environ.get("SWARM_HERMES_SCOPE")
        or os.environ.get("SWARM_MCP_SCOPE")
    )

def file_root_arg(self) -> Optional[str]:
    return (
        self._env("FILE_ROOT")
        or os.environ.get("SWARM_HERMES_FILE_ROOT")
        or os.environ.get("SWARM_MCP_FILE_ROOT")
    )

def _raw_identity(self) -> str:
    return (
        self._env("IDENTITY")
        or os.environ.get("SWARM_HERMES_IDENTITY")
        or os.environ.get("AGENT_IDENTITY")
        or os.environ.get("SWARM_IDENTITY")
        or ""
    ).strip()

def resolved_identity_name(self) -> str:
    """Identity name from env, or empty string when none is configured.

    Returns the cleaned identity name (e.g. ``work``) when the launcher
    exported ``AGENT_IDENTITY`` / ``SWARM_<prefix>_IDENTITY`` /
    ``SWARM_IDENTITY``. Returns ``""`` when no identity signal is present;
    callers that mint labels accept the empty string and just omit the
    ``identity:`` token, and ``run_session_start_hook`` uses
    ``has_identity_signal()`` to skip registration entirely in that case
    rather than registering an unlabeled instance that would defeat the
    cross-identity boundary.
    """
    return contract.identity_name(self._raw_identity())

def has_identity_signal(self) -> bool:
    """True when the launcher exported an identity env we can register under.

    Used by ``run_session_start_hook`` to gate the auto-register call.
    Raw binaries (``claude``/``codex``/``hermes`` without the per-profile
    launcher alias) miss all of these and stay out of the swarm.
    """
    return bool(self.resolved_identity_name())

def identity_token(self) -> str:
    return contract.identity_token(self.resolved_identity_name())

def identity_name(self) -> str:
    return self.resolved_identity_name()

def work_tracker_config(self, cwd: str = "", scope: Optional[str] = None) -> dict:
    return contract.work_tracker_config(
        self.config.env_prefix,
        cwd or self.session_cwd(),
        scope if scope is not None else self.scope_arg(),
        self.identity_name(),
    )

def agent_role_token(self) -> str:
    """Return ``role:<name>`` if the agent has a swarm-visible skill role.

    Sources, in order:
    1. ``SWARM_<prefix>_AGENT_ROLE`` env var (e.g. ``SWARM_CC_AGENT_ROLE``).
    2. ``.swarm-role`` file walking up from cwd to the coordination scope.
    3. ``SWARM_AGENT_ROLE`` (un-prefixed shared fallback).
    4. ``role:planner`` when the plugin behavior role is ``gateway``.

    Plugin-mode knobs (``SWARM_<prefix>_ROLE`` carrying ``worker``/``gateway``)
    otherwise drive behavior, not routing. Gateway mode is the exception:
    a gateway is planner-shaped unless the operator explicitly set a more
    specific agent role.
    """
    raw = (self._env("AGENT_ROLE") or "").strip()
    if not raw:
        raw = self._role_from_file()
    if not raw:
        raw = (os.environ.get("SWARM_AGENT_ROLE") or "").strip()
    if not raw and self.plugin_role() == "gateway":
        raw = "planner"
    return contract.role_token(raw)

@staticmethod
def _normalize_role(value: str) -> str:
    return contract.role_token(value)

def _role_doctrine_lines(self, role_token: str) -> list[str]:
    """Resolve ``role:<name>`` to a pointer at the matching skill reference.

    The bundled swarm-mcp skill keeps role doctrine in
    ``skills/swarm-mcp/references/<role>.md``. We hand the agent the
    absolute path so it can read the doctrine on turn 1 regardless of
    whether the skill itself has been auto-loaded yet.

    ``generalist`` (and any role without a dedicated reference) falls
    back to ``roles-and-teams.md`` which surveys all roles.
    """
    if not role_token.startswith("role:"):
        return []
    role_name = role_token.split(":", 1)[1].strip().lower()
    if not role_name:
        return []

    # _shared lives at <repo>/integrations/_shared/, so parents[2] = <repo>.
    repo_root = Path(__file__).resolve().parents[2]
    refs = repo_root / "skills" / "swarm-mcp" / "references"
    target = refs / f"{role_name}.md"
    if not target.is_file():
        target = refs / "roles-and-teams.md"
    if not target.is_file():
        return []

    return [
        "",
        f"You're registered as `{role_token}`. Before your first work decision,",
        f"read `{target}` — it contains the role-specific doctrine (when to",
        "delegate via `request_task` vs. do hands-on, how to claim/update tasks,",
        "what peers to coordinate with). The bundled `swarm-mcp` skill is the",
        "broader source of truth.",
    ]

def _role_from_file(self) -> str:
    """Walk up from cwd to scope (or filesystem root) looking for ``.swarm-role``.

    First match wins. File contents are a single role token on the first
    non-blank, non-comment line. Use this for repo-wide defaults — drop
    ``echo implementer > .swarm-role`` at the repo root and every adapter
    in that scope picks it up without env-var ceremony.
    """
    return contract.role_from_file(self.session_cwd(), self.scope_arg())

@staticmethod
def session_short(session_id: str) -> str:
    return contract.session_short(session_id)

def derived_label(self, session_id: str) -> str:
    return contract.build_label(
        contract.LabelConfig(
            runtime_name=self.config.runtime_name,
            env_prefix=self.config.env_prefix,
            plugin_role=self.plugin_role(),
            session_id=session_id,
            override_label=self._env("LABEL") or os.environ.get("SWARM_HERMES_LABEL") or "",
            identity=self.resolved_identity_name(),
            agent_role=(self._env("AGENT_ROLE") or self._role_from_file() or os.environ.get("SWARM_AGENT_ROLE") or ""),
        )
    )

def _with_session_token(self, label: str, session_id: str) -> str:
    return contract.with_session_token(label, session_id)

# -- Per-session scratch dir --------------------------------------------

def _scratch_root(self) -> Path:
    return Path(tempfile.gettempdir()) / self.config.scratch_dir_name

def session_scratch(self, session_id: str) -> Path:
    p = self._scratch_root() / (session_id or "default")
    p.mkdir(parents=True, exist_ok=True)
    return p

def write_session_meta(self, session_id: str, meta: dict) -> None:
    if not session_id:
        return
    (self.session_scratch(session_id) / "meta.json").write_text(json.dumps(meta))

def read_session_meta(self, session_id: str) -> dict:
    if not session_id:
        return {}
    path = self.session_scratch(session_id) / "meta.json"
    if not path.exists():
        return {}
    try:
        return json.loads(path.read_text())
    except Exception:
        return {}

# -- Hook IO ------------------------------------------------------------

@staticmethod
def read_hook_input(stdin) -> dict:
    try:
        raw = stdin.read()
    except Exception:
        return {}
    if not raw:
        return {}
    try:
        return json.loads(raw)
    except json.JSONDecodeError:
        return {}

def write_paths_for_tool(self, tool_name: str, tool_input: object) -> list[str]:
    if tool_name not in self.config.write_tools:
        return []
    return self.config.extract_paths(tool_name, tool_input)

@staticmethod
def emit_block(reason: str) -> None:
    payload = {
        "hookSpecificOutput": {
            "hookEventName": "PreToolUse",
            "permissionDecision": "deny",
            "permissionDecisionReason": reason,
        }
    }
    print(json.dumps(payload))

@staticmethod
def parse_instances_json(stdout: str) -> list[dict]:
    try:
        data = json.loads(stdout)
    except json.JSONDecodeError:
        return []
    if isinstance(data, list):
        return [item for item in data if isinstance(item, dict)]
    if isinstance(data, dict):
        for key in ("instances", "result", "data"):
            value = data.get(key)
            if isinstance(value, list):
                return [item for item in value if isinstance(item, dict)]
    return []

def list_locks(self, scope: Optional[str]) -> list[dict]:
    """Read-only enumeration of every active lock in scope.

    Returns the parsed ``swarm-mcp locks --json`` array, or an empty list
    on any CLI failure. The CLI's startup ``prune()`` clears stale
    instances' locks before this returns, so callers see live locks only.
    """
    args = ["locks", "--json"]
    if scope:
        args.extend(["--scope", scope])
    rc, out, _ = self.run_swarm(args, timeout=4.0)
    if rc != 0:
        return []
    try:
        data = json.loads(out)
    except json.JSONDecodeError:
        return []
    return [row for row in data if isinstance(row, dict)] if isinstance(data, list) else []

def find_peer_lock_conflict(
    self,
    scope: Optional[str],
    own_instance_id: str,
    paths: list[str],
) -> Optional[dict]:
    """Return the first lock row blocking ``paths``, or ``None``.

    "Blocking" means: held by an instance other than ``own_instance_id``,
    on a file in ``paths``. Same-instance locks (the agent declared a
    wider critical section earlier in the session) are not conflicts —
    the agent is free to keep editing its own reservation.
    """
    if not paths or not own_instance_id:
        return None
    target_set = set(paths)
    for row in self.list_locks(scope):
        if row.get("type") != "lock":
            continue
        if row.get("file") not in target_set:
            continue
        holder = row.get("instance_id")
        if isinstance(holder, str) and holder and holder != own_instance_id:
            return row
    return None

# -- High-level hook entry points ---------------------------------------

def _register_args(self, session_id: str, cwd: str) -> tuple[list[str], dict[str, str]]:
    label = self.derived_label(session_id)
    directory = cwd or self.session_cwd()
    scope = self.scope_arg()
    file_root = self.file_root_arg()

    cli_args = ["register", directory, "--label", label, "--json"]
    pretty_args: dict[str, str] = {
        "directory": directory,
        "label": label,
    }
    if scope:
        cli_args.extend(["--scope", scope])
        pretty_args["scope"] = scope
    if file_root:
        cli_args.extend(["--file-root", file_root])
        pretty_args["file_root"] = file_root

    lease = self.lease_seconds()
    if lease > 0:
        cli_args.extend(["--lease-seconds", str(lease)])
        pretty_args["lease_seconds"] = str(lease)
    return cli_args, pretty_args

@staticmethod
def _parse_json_object(text: str) -> dict:
    try:
        parsed = json.loads(text)
    except json.JSONDecodeError:
        return {}
    return parsed if isinstance(parsed, dict) else {}

def _herdr_identity(self) -> dict[str, object]:
    herdr_pane = os.environ.get("HERDR_PANE_ID") or os.environ.get("HERDR_PANE")
    if not herdr_pane:
        return {}
    payload: dict[str, object] = {
        "schema_version": 1,
        "backend": "herdr",
        "handle_kind": "pane",
        "handle": herdr_pane,
        "pane_id": herdr_pane,
    }
    socket_path = contract.resolved_herdr_socket_path(self.identity_name())
    if socket_path:
        payload["socket_path"] = socket_path
    workspace_id = os.environ.get("HERDR_WORKSPACE_ID")
    if workspace_id:
        payload["workspace_id"] = workspace_id
    return self._canonicalize_herdr_identity(payload, herdr_pane)

def _canonicalize_herdr_identity(
    self, identity: dict[str, object], requested_pane_id: str
) -> dict[str, object]:
    herdr_bin = os.environ.get("SWARM_HERDR_BIN") or shutil.which("herdr")
    if not herdr_bin:
        return identity

    env = os.environ.copy()
    socket_path = identity.get("socket_path")
    if isinstance(socket_path, str) and socket_path:
        env["HERDR_SOCKET_PATH"] = socket_path
    try:
        proc = subprocess.run(
            [herdr_bin, "pane", "get", requested_pane_id],
            capture_output=True,
            text=True,
            timeout=5,
            env=env,
        )
    except Exception:
        return identity
    if proc.returncode != 0:
        return identity

    try:
        payload = json.loads(proc.stdout or "{}")
    except json.JSONDecodeError:
        return identity
    pane = payload.get("result", {}).get("pane") if isinstance(payload, dict) else None
    if not isinstance(pane, dict):
        return identity

    canonical = pane.get("pane_id") or pane.get("id") or requested_pane_id
    if not isinstance(canonical, str) or not canonical:
        return identity

    next_identity = dict(identity)
    prior_handle = next_identity.get("handle")
    prior_pane = next_identity.get("pane_id")
    raw_handle_aliases = next_identity.get("handle_aliases")
    raw_pane_aliases = next_identity.get("pane_aliases")
    existing_aliases = []
    if isinstance(raw_handle_aliases, list):
        existing_aliases.extend(raw_handle_aliases)
    if isinstance(raw_pane_aliases, list):
        existing_aliases.extend(raw_pane_aliases)
    aliases = [
        item
        for item in [
            *existing_aliases,
            prior_handle if prior_handle != canonical else None,
            prior_pane if prior_pane != canonical else None,
            requested_pane_id if requested_pane_id != canonical else None,
        ]
        if isinstance(item, str) and item
    ]
    next_identity["backend"] = "herdr"
    next_identity["handle_kind"] = "pane"
    next_identity["handle"] = canonical
    next_identity["pane_id"] = canonical
    if aliases:
        unique_aliases = sorted(set(aliases))
        next_identity["handle_aliases"] = unique_aliases
        next_identity["pane_aliases"] = unique_aliases
    workspace_id = pane.get("workspace_id")
    if isinstance(workspace_id, str) and workspace_id:
        next_identity["workspace_id"] = workspace_id
    tab_id = pane.get("tab_id")
    if isinstance(tab_id, str) and tab_id:
        next_identity["tab_id"] = tab_id
    return next_identity

def _publish_herdr_identity(self, instance_id: str, scope: str) -> bool:
    identity = self._herdr_identity()
    if not identity:
        return False
    blob = json.dumps(identity, separators=(",", ":"))
    ok = True
    for key in [f"identity/workspace/herdr/{instance_id}", f"identity/herdr/{instance_id}"]:
        args = ["kv", "set", key, blob, "--as", instance_id]
        if scope:
            args.extend(["--scope", scope])
        rc, _, _ = self.run_swarm(args, timeout=4.0)
        ok = ok and rc == 0
    return ok

def _publish_work_tracker_config(
    self,
    instance_id: str,
    scope: str,
    tracker: dict,
) -> bool:
    if not tracker:
        return False
    key = contract.work_tracker_key(str(tracker.get("identity") or self.identity_name()))
    blob = json.dumps(tracker, separators=(",", ":"))
    args = ["kv", "set", key, blob, "--as", instance_id]
    if scope:
        args.extend(["--scope", scope])
    rc, _, _ = self.run_swarm(args, timeout=4.0)
    return rc == 0

def _delete_herdr_identity(self, instance_id: str, scope: str) -> None:
    for key in [f"identity/workspace/herdr/{instance_id}", f"identity/herdr/{instance_id}"]:
        args = ["kv", "del", key, "--as", instance_id]
        if scope:
            args.extend(["--scope", scope])
        self.run_swarm(args, timeout=4.0)

def _herdr_agent_source(self, instance_id: str) -> str:
    return herdr_agent_report.agent_source(self.config.runtime_name, instance_id)

def _report_herdr_agent(self, instance_id: str, state: str, message: str = "") -> bool:
    try:
        return herdr_agent_report.report_agent(
            agent=self.config.runtime_name,
            state=state,
            source=self._herdr_agent_source(instance_id),
            message=message or None,
        )
    except Exception:
        return False

def _release_herdr_agent(self, instance_id: str) -> bool:
    try:
        return herdr_agent_report.release_agent(
            agent=self.config.runtime_name,
            source=self._herdr_agent_source(instance_id),
        )
    except Exception:
        return False

def _deregister_instance(self, instance_id: str, scope: str) -> None:
    args = ["deregister", "--as", instance_id, "--json"]
    if scope:
        args.extend(["--scope", scope])
    self.run_swarm(args, timeout=4.0)

def build_session_start_context(
    self,
    session_id: str,
    cwd: str,
    meta: dict,
    registration_error: str = "",
) -> str:
    instance_id = str(meta.get("instance_id") or "")
    plugin_role = str(meta.get("plugin_role") or self.plugin_role())
    label = str(meta.get("label") or self.derived_label(session_id))
    scope = str(meta.get("scope") or self.scope_arg() or "")
    role_token = self.agent_role_token()
    tracker = meta.get("work_tracker") if isinstance(meta.get("work_tracker"), dict) else {}

    if instance_id:
        lines = [
            "## swarm coordination is active",
            "",
            f"Instance `{instance_id}` · scope `{scope}` · mode `{plugin_role}`",
            f"Label: `{label}`",
            "",
            "Call the swarm `bootstrap` tool to rehydrate state, then act on any pending work.",
            "See the `swarm-mcp` skill for the coordination protocol.",
            "If bootstrap says you are not registered, retry bootstrap with "
            f"`adopt_instance_id=\"{instance_id}\"` or call register with the "
            "same `adopt_instance_id`; this is the SessionStart lease id "
            "that already owns any published workspace identity.",
        ]
        if meta.get("herdr_identity_published"):
            lines.append(
                f"Workspace identity published at `identity/workspace/herdr/{instance_id}` for peer wakeups."
            )
        if tracker:
            key = contract.work_tracker_key(str(tracker.get("identity") or ""))
            verb = "published" if meta.get("work_tracker_published") else "detected"
            lines.append(
                f"Configured work tracker {verb} at `{key}`: "
                f"provider `{tracker.get('provider')}`, MCP `{tracker.get('mcp')}`. "
                "Use only that same-identity tracker for durable tracker updates."
            )
    else:
        _, register_args = self._register_args(session_id, cwd)
        pretty_args = json.dumps(register_args, indent=2)
        lines = [
            "## swarm coordination is available",
            "",
            f"SessionStart hook could not auto-register this session (mode: `{plugin_role}`).",
        ]
        if registration_error:
            lines.append(f"Hook error: `{registration_error[:240]}`")
        lines.extend(
            [
                "",
                "Call the `register` tool with these exact args, then call `bootstrap`",
                "and follow the `swarm-mcp` skill:",
                "",
                "```json",
                pretty_args,
                "```",
            ]
        )
        if tracker:
            key = contract.work_tracker_key(str(tracker.get("identity") or ""))
            blob = json.dumps(tracker, separators=(",", ":"))
            lines.extend(
                [
                    "",
                    f"After `register` succeeds, publish the configured work tracker at `{key}`:",
                    f'`kv_set key="{key}" value=\'{blob}\'`',
                ]
            )

    if plugin_role == "gateway":
        lines.append("")
        lines.append(
            "Gateway/lead mode: make trivial, low-risk edits locally when that is fastest; "
            "route medium or large implementation work through the swarm `dispatch` tool. "
            "For non-trivial human-trackable work, read `bootstrap.work_tracker`; if configured "
            "and the matching MCP is available, create/link the tracker item before `dispatch`, "
            "include the tracker URL/ID in the swarm task, and update the tracker with the final "
            "durable outcome if the worker did not. "
            "Do not fall back to native subagents. See the `swarm-mcp` skill (planner "
            "reference) for the full conductor protocol."
        )

        soul_path = self.config.soul_path
        if soul_path is not None:
            try:
                soul_text = soul_path.read_text().rstrip()
            except OSError:
                soul_text = ""
            if soul_text:
                lines.extend(["", "## identity / SOUL", "", soul_text])

    if role_token:
        doctrine_lines = self._role_doctrine_lines(role_token)
        if doctrine_lines:
            lines.extend(doctrine_lines)
    else:
        env_var = f"SWARM_{self.config.env_prefix}_AGENT_ROLE"
        lines.append("")
        lines.append(
            f"No `role:` detected. Set `{env_var}` or drop a `.swarm-role` "
            "file in the repo to add a routing role to this session's label."
        )

    identity = self._herdr_identity()
    if identity and not instance_id:
        blob = json.dumps(identity, separators=(",", ":"))
        lines.extend(
            [
                "",
                "After `register` returns an instance id, also publish your workspace handle:",
                f'`kv_set key="identity/workspace/herdr/<id>" value=\'{blob}\'`',
            ]
        )

    return "

".join(lines)

def run_session_start_hook(self, stdin) -> int:
    payload = self.read_hook_input(stdin)
    session_id = str(payload.get("session_id") or "")
    cwd = str(payload.get("cwd") or self.session_cwd())
    source = str(payload.get("source") or "startup")

    label = self.derived_label(session_id)
    scope = self.scope_arg() or ""
    tracker = self.work_tracker_config(cwd, scope)
    meta = {
        "label": label,
        "session_short": self.session_short(session_id),
        "scope": scope,
        "file_root": self.file_root_arg() or "",
        "cwd": cwd,
        "plugin_role": self.plugin_role(),
        "herdr_pane_id": os.environ.get("HERDR_PANE_ID")
        or os.environ.get("HERDR_PANE")
        or "",
    }
    if tracker:
        meta["work_tracker"] = tracker
    self.write_session_meta(session_id, meta)

    if source not in {"startup", "resume"} or not session_id:
        return 0

    # Gate auto-registration on an explicit identity signal so raw
    # binaries (no profile launcher) stay out of the swarm. The
    # SWARM_MCP_ALLOW_UNLABELED escape hatch keeps the old behavior
    # available from trusted shells.
    if not self.has_identity_signal() and not contract.truthy(
        os.environ.get("SWARM_MCP_ALLOW_UNLABELED")
    ):
        _warn_skip_registration_once(self.config.env_prefix)
        return 0

    registration_error = ""
    register_cli_args, _ = self._register_args(session_id, cwd)
    rc, out, err = self.run_swarm(register_cli_args, timeout=8.0)
    if rc == 0:
        registered = self._parse_json_object(out)
        instance_id = str(registered.get("id") or registered.get("instance_id") or "")
        if instance_id:
            scope = str(registered.get("scope") or meta.get("scope") or "")
            meta.update(
                {
                    "instance_id": instance_id,
                    "scope": scope,
                    "file_root": str(registered.get("file_root") or meta.get("file_root") or ""),
                    "herdr_identity_published": self._publish_herdr_identity(
                        instance_id,
                        scope,
                    ),
                    "herdr_agent_reported": self._report_herdr_agent(
                        instance_id,
                        "idle",
                        "swarm session registered",
                    ),
                    "work_tracker_published": self._publish_work_tracker_config(
                        instance_id,
                        scope,
                        tracker,
                    ) if tracker else False,
                }
            )
            self.write_session_meta(session_id, meta)
        else:
            registration_error = "register returned no instance id"
    else:
        registration_error = (err.strip() or out.strip() or f"swarm-mcp register exited {rc}")

    context = self.build_session_start_context(
        session_id,
        cwd,
        meta,
        registration_error,
    )
    out = {
        "hookSpecificOutput": {
            "hookEventName": "SessionStart",
            "additionalContext": context,
        }
    }
    print(json.dumps(out))
    return 0

def run_pre_tool_use_hook(self, stdin) -> int:
    """Check-only enforcement: deny when a peer holds a lock on a write target.

    The hook never acquires a lock itself. Per-edit serialization isn't the
    hazard worth catching (the race window is sub-millisecond and the
    write tool's own anchor checks defend logical conflicts); what matters
    is that a peer-declared critical section (``lock_file`` with a note)
    actually blocks other peers' writes. This hook is what enforces that.

    Same-instance locks are *not* conflicts — if this agent already
    declared a wider critical section, its own subsequent writes pass
    through normally.
    """
    payload = self.read_hook_input(stdin)
    tool_name = str(payload.get("tool_name") or "")
    if tool_name not in self.config.write_tools:
        return 0

    tool_input = payload.get("tool_input")
    paths = self.write_paths_for_tool(tool_name, tool_input)
    if not paths:
        return 0

    session_id = str(payload.get("session_id") or "")
    meta = self.read_session_meta(session_id)
    scope = meta.get("scope") or self.scope_arg()
    own_instance_id = str(meta.get("instance_id") or "")
    if not own_instance_id:
        # Without a known own-id we can't distinguish own vs peer locks.
        # Coordination is opt-in; fail open.
        return 0

    conflict = self.find_peer_lock_conflict(scope, own_instance_id, paths)
    if conflict is None:
        return 0

    file = str(conflict.get("file") or "")
    holder = str(conflict.get("instance_id") or "")
    holder_short = holder[:8] if holder else "peer"
    owner = conflict.get("owner")
    owner_label = ""
    if isinstance(owner, dict):
        raw_label = owner.get("label")
        if isinstance(raw_label, str):
            owner_label = raw_label.strip()
    raw_label = conflict.get("owner_label")
    if isinstance(raw_label, str) and raw_label.strip():
        owner_label = raw_label.strip()
    workspace = conflict.get("workspace")
    pane_id = ""
    if isinstance(workspace, dict):
        raw_pane = workspace.get("pane_id")
        if isinstance(raw_pane, str):
            pane_id = raw_pane.strip()
    raw_pane = conflict.get("pane_id")
    if isinstance(raw_pane, str) and raw_pane.strip():
        pane_id = raw_pane.strip()
    holder_display = owner_label or holder_short
    if pane_id:
        holder_display = f"{holder_display} (pane {pane_id})"
    note = str(conflict.get("content") or "").strip()
    reason = (
        f"swarm lock blocked {tool_name} for {file}: held by {holder_display}"
        + (f" ({note})" if note else "")
    )
    self.emit_block(reason)
    return 0

def run_post_tool_use_hook(self, stdin) -> int:
    """No-op under check-only enforcement.

    The pre-tool hook never acquires a lock, so there is nothing to
    release. Kept as a stable entry point so plugin configs that still
    wire ``PostToolUse`` to this core continue to load cleanly; new
    plugin configs should omit the hook entirely.
    """
    return 0

def _resolve_instance_id(
    self, session_id: str, scope: Optional[str]
) -> Optional[str]:
    short = self.session_short(session_id)
    if not short:
        return None
    args = ["instances", "--json"]
    if scope:
        args.extend(["--scope", scope])
    rc, out, _ = self.run_swarm(args, timeout=4.0)
    if rc != 0:
        return None
    marker = f"session:{short}"
    for inst in self.parse_instances_json(out):
        label = " ".join(
            str(inst.get(k) or "") for k in ("label", "labels", "role", "name")
        )
        if marker in label:
            inst_id = inst.get("id") or inst.get("instance_id")
            if isinstance(inst_id, str) and inst_id:
                return inst_id
    return None

def run_session_end_hook(self, stdin) -> int:
    payload = self.read_hook_input(stdin)
    session_id = str(payload.get("session_id") or "")
    meta = self.read_session_meta(session_id)
    scope = meta.get("scope") or self.scope_arg()

    instance_id = str(meta.get("instance_id") or "") or self._resolve_instance_id(session_id, scope)
    if instance_id:
        self._release_herdr_agent(instance_id)
        self._delete_herdr_identity(instance_id, scope)
        self._deregister_instance(instance_id, scope)

    if session_id:
        try:
            shutil.rmtree(self.session_scratch(session_id), ignore_errors=True)
        except Exception:
            pass
    return 0