from future import annotations
import io import json import os import shutil import tempfile import unittest from contextlib import redirect_stdout from pathlib import Path from unittest import mock from uuid import uuid4
from integrations._shared import herdr_agent_report from integrations._shared.swarm_hook_core import HookCore, RuntimeConfig
def _extract_paths(tool_name: str, tool_input: object) -> list[str]: if tool_name != "Write" or not isinstance(tool_input, dict): return [] value = tool_input.get("file_path") return [value] if isinstance(value, str) else []
class HookCoreLifecycleTests(unittest.TestCase): def setUp(self) -> None: # Default to a launched-via-launcher session so the identity gate in # run_session_start_hook passes; no-identity tests clear this explicitly. self.env = mock.patch.dict( os.environ, {"AGENT_IDENTITY": "test"}, clear=True ) self.env.start() # Reset the per-prefix warn-once latch so tests can re-trigger it. from integrations._shared import swarm_hook_core as _shc _shc._WARNED_SKIP_REGISTRATION.clear() self.scratch_name = f"swarm-test-{uuid4().hex}" self.core = HookCore( RuntimeConfig( runtime_name="test-runtime", env_prefix="TEST", scratch_dir_name=self.scratch_name, write_tools=frozenset({"Write"}), extract_paths=_extract_paths, ) )
def tearDown(self) -> None:
self.env.stop()
shutil.rmtree(Path(tempfile.gettempdir()) / self.scratch_name, ignore_errors=True)
def run_hook(self, payload: dict) -> str:
out = io.StringIO()
with redirect_stdout(out):
self.core.run_session_start_hook(io.StringIO(json.dumps(payload)))
return out.getvalue()
def test_session_start_registers_and_publishes_herdr_identity(self) -> None:
os.environ["HERDR_PANE_ID"] = "pane-1"
os.environ["HERDR_SOCKET_PATH"] = "/tmp/herdr.sock"
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-1", "scope": "/repo", "file_root": "/repo"}),
"",
)
if args[:2] == ["kv", "set"]:
return 0, "", ""
return 1, "", "unexpected"
pane_payload = {
"result": {
"pane": {
"pane_id": "workspace-1",
"workspace_id": "workspace",
"tab_id": "workspace:1",
}
}
}
with (
mock.patch.object(self.core, "run_swarm", side_effect=fake_run),
mock.patch("integrations._shared.swarm_hook_core.shutil.which", return_value="herdr"),
mock.patch(
"integrations._shared.swarm_hook_core.subprocess.run",
return_value=mock.Mock(
returncode=0,
stdout=json.dumps(pane_payload),
stderr="",
),
),
):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
meta = self.core.read_session_meta("abc-123")
self.assertEqual(meta["instance_id"], "inst-1")
self.assertEqual(meta["scope"], "/repo")
self.assertTrue(meta["herdr_identity_published"])
self.assertEqual(calls[0][:3], ["register", "/repo", "--label"])
self.assertIn("test-runtime", calls[0][3])
self.assertEqual(calls[1][:3], ["kv", "set", "identity/workspace/herdr/inst-1"])
self.assertEqual(calls[2][:3], ["kv", "set", "identity/herdr/inst-1"])
identity = json.loads(calls[1][3])
self.assertEqual(identity["backend"], "herdr")
self.assertEqual(identity["handle_kind"], "pane")
self.assertEqual(identity["handle"], "workspace-1")
self.assertEqual(identity["handle_aliases"], ["pane-1"])
self.assertEqual(identity["pane_id"], "workspace-1")
self.assertEqual(identity["pane_aliases"], ["pane-1"])
self.assertEqual(identity["workspace_id"], "workspace")
self.assertEqual(identity["tab_id"], "workspace:1")
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn("swarm coordination is active", rendered)
self.assertIn("Instance `inst-1`", rendered)
self.assertIn("`bootstrap`", rendered)
self.assertIn('adopt_instance_id="inst-1"', rendered)
self.assertNotIn("Call the `register` tool", rendered)
def test_session_start_personal_gateway_loads_herdr_socket_from_profile_env(self) -> None:
profile_dir = tempfile.mkdtemp(prefix="swarm-test-profile-")
self.addCleanup(shutil.rmtree, profile_dir, ignore_errors=True)
expected_socket = "/run/herdr-personal-test.sock"
with open(os.path.join(profile_dir, "personal.env"), "w") as handle:
handle.write(f"HERDR_SOCKET_PATH={expected_socket}
")
os.environ["SWARM_TEST_IDENTITY"] = "personal"
os.environ["SWARM_TEST_ROLE"] = "gateway"
os.environ["SWARM_MCP_PROFILE_DIR"] = profile_dir
os.environ["HERDR_PANE_ID"] = "pane-1"
calls: list[list[str]] = []
herdr_envs: list[dict[str, str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-1", "scope": "/repo", "file_root": "/repo"}),
"",
)
if args[:2] == ["kv", "set"]:
return 0, "", ""
return 1, "", "unexpected"
pane_payload = {
"result": {
"pane": {
"pane_id": "workspace-1",
"workspace_id": "workspace",
"tab_id": "workspace:1",
}
}
}
def fake_subprocess_run(*args, **kwargs):
herdr_envs.append(kwargs.get("env", {}))
return mock.Mock(returncode=0, stdout=json.dumps(pane_payload), stderr="")
with (
mock.patch.object(self.core, "run_swarm", side_effect=fake_run),
mock.patch("integrations._shared.swarm_hook_core.shutil.which", return_value="herdr"),
mock.patch(
"integrations._shared.swarm_hook_core.subprocess.run",
side_effect=fake_subprocess_run,
),
):
self.run_hook({"session_id": "abc-123", "cwd": "/repo", "source": "startup"})
self.assertEqual(herdr_envs[0]["HERDR_SOCKET_PATH"], expected_socket)
identity = json.loads(calls[1][3])
self.assertEqual(identity["socket_path"], expected_socket)
self.assertEqual(identity["backend"], "herdr")
def test_session_start_publishes_configured_work_tracker(self) -> None:
os.environ["SWARM_TEST_IDENTITY"] = "personal"
os.environ["SWARM_TEST_WORK_TRACKER"] = json.dumps(
{
"provider": "github_issues",
"mcp": "github_personal",
"repo": "Volpestyle/swarm-mcp",
}
)
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-1", "scope": "/repo", "file_root": "/repo"}),
"",
)
if args[:2] == ["kv", "set"]:
return 0, "", ""
return 1, "", "unexpected"
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
self.assertEqual(calls[1][:3], ["kv", "set", "config/work_tracker/personal"])
tracker = json.loads(calls[1][3])
self.assertEqual(tracker["identity"], "personal")
self.assertEqual(tracker["provider"], "github_issues")
self.assertEqual(tracker["mcp"], "github_personal")
self.assertEqual(tracker["repo"], "Volpestyle/swarm-mcp")
self.assertEqual(tracker["source"], "env:SWARM_TEST_WORK_TRACKER")
meta = self.core.read_session_meta("abc-123")
self.assertTrue(meta["work_tracker_published"])
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn("Configured work tracker published", rendered)
self.assertIn("MCP `github_personal`", rendered)
def test_session_start_reports_herdr_agent_status_when_env_present(self) -> None:
os.environ["HERDR_PANE_ID"] = "pane-1"
os.environ["HERDR_SOCKET_PATH"] = "/tmp/herdr.sock"
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-1", "scope": "/repo", "file_root": "/repo"}),
"",
)
return 0, "{}", ""
with (
mock.patch.object(self.core, "run_swarm", side_effect=fake_run),
mock.patch.object(self.core, "_publish_herdr_identity", return_value=True),
mock.patch(
"integrations._shared.swarm_hook_core.herdr_agent_report.report_agent",
return_value=True,
) as report_agent,
):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
report_agent.assert_called_once_with(
agent="test-runtime",
state="idle",
source="swarm-mcp:test-runtime:inst-1",
message="swarm session registered",
)
meta = self.core.read_session_meta("abc-123")
self.assertTrue(meta["herdr_agent_reported"])
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn("Instance `inst-1`", rendered)
def test_session_start_ignores_herdr_agent_report_exceptions(self) -> None:
os.environ["HERDR_PANE_ID"] = "pane-1"
os.environ["HERDR_SOCKET_PATH"] = "/tmp/herdr.sock"
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-1", "scope": "/repo", "file_root": "/repo"}),
"",
)
return 0, "{}", ""
with (
mock.patch.object(self.core, "run_swarm", side_effect=fake_run),
mock.patch.object(self.core, "_publish_herdr_identity", return_value=True),
mock.patch(
"integrations._shared.swarm_hook_core.herdr_agent_report.report_agent",
side_effect=RuntimeError("socket unavailable"),
),
):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
meta = self.core.read_session_meta("abc-123")
self.assertEqual(meta["instance_id"], "inst-1")
self.assertFalse(meta["herdr_agent_reported"])
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn("swarm coordination is active", rendered)
def test_session_start_override_label_preserves_session_token(self) -> None:
os.environ["SWARM_TEST_LABEL"] = "identity:personal role:researcher topic:debug"
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-override", "scope": "/repo", "file_root": "/repo"}),
"",
)
return 0, "{}", ""
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
self.assertEqual(calls[0][:3], ["register", "/repo", "--label"])
self.assertIn("identity:personal role:researcher topic:debug", calls[0][3])
self.assertIn("session:abc123", calls[0][3])
meta = self.core.read_session_meta("abc-123")
self.assertIn("session:abc123", meta["label"])
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn('adopt_instance_id="inst-override"', rendered)
def test_session_start_skips_register_when_no_identity_signal(self) -> None:
# Raw binaries (no AGENT_IDENTITY) must not auto-register; they would
# otherwise land as identity:unknown ghost peers.
os.environ.pop("AGENT_IDENTITY", None)
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
return 0, "", ""
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
self.assertEqual(calls, [])
self.assertEqual(output, "")
meta = self.core.read_session_meta("abc-123")
self.assertNotIn("instance_id", meta)
def test_session_start_allow_unlabeled_opt_back_in(self) -> None:
# SWARM_MCP_ALLOW_UNLABELED=1 restores legacy unlabeled registration.
os.environ.pop("AGENT_IDENTITY", None)
os.environ["SWARM_MCP_ALLOW_UNLABELED"] = "1"
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-raw", "scope": "/repo", "file_root": "/repo"}),
"",
)
return 0, "", ""
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run):
self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
self.assertEqual(calls[0][0], "register")
self.assertNotIn("identity:", calls[0][3])
def test_session_start_falls_back_to_manual_context_when_register_fails(self) -> None:
with mock.patch.object(
self.core,
"run_swarm",
return_value=(127, "", "swarm-mcp CLI not found"),
):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
meta = self.core.read_session_meta("abc-123")
self.assertNotIn("instance_id", meta)
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn("could not auto-register", rendered)
self.assertIn("Call the `register` tool", rendered)
def test_gateway_session_start_surfaces_mode_and_planner_label(self) -> None:
os.environ["SWARM_TEST_ROLE"] = "gateway"
os.environ["SWARM_MCP_BIN"] = "bun run /repo/src/cli.ts"
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
if args[0] == "register":
return (
0,
json.dumps({"id": "inst-1", "scope": "/repo", "file_root": "/repo"}),
"",
)
return 0, "{}", ""
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run):
output = self.run_hook(
{"session_id": "abc-123", "cwd": "/repo", "source": "startup"}
)
self.assertIn("mode:gateway", calls[0][3])
self.assertIn("role:planner", calls[0][3])
rendered = json.loads(output)["hookSpecificOutput"]["additionalContext"]
self.assertIn("mode `gateway`", rendered)
self.assertIn("Gateway/lead mode", rendered)
self.assertIn("bootstrap.work_tracker", rendered)
self.assertIn("`dispatch`", rendered)
self.assertIn("`swarm-mcp` skill", rendered)
def test_gateway_session_start_appends_configured_soul(self) -> None:
soul_dir = tempfile.mkdtemp(prefix="swarm-test-soul-")
self.addCleanup(shutil.rmtree, soul_dir, ignore_errors=True)
soul_path = Path(soul_dir) / "SOUL.md"
soul_path.write_text("runtime identity prompt
") core = HookCore( RuntimeConfig( runtime_name="test-runtime", env_prefix="TEST", scratch_dir_name=f"swarm-test-{uuid4().hex}", write_tools=frozenset({"Write"}), extract_paths=_extract_paths, soul_path=soul_path, ) ) self.addCleanup( shutil.rmtree, Path(tempfile.gettempdir()) / core.config.scratch_dir_name, True, ) os.environ["SWARM_TEST_ROLE"] = "gateway"
with mock.patch.object(
core,
"run_swarm",
return_value=(0, json.dumps({"id": "inst-1", "scope": "/repo"}), ""),
):
out = io.StringIO()
with redirect_stdout(out):
core.run_session_start_hook(
io.StringIO(json.dumps({"session_id": "abc-123", "cwd": "/repo"}))
)
rendered = json.loads(out.getvalue())["hookSpecificOutput"]["additionalContext"]
self.assertIn("## identity / SOUL", rendered)
self.assertIn("runtime identity prompt", rendered)
def test_session_end_deletes_identity_and_deregisters(self) -> None:
self.core.write_session_meta(
"abc-123",
{"instance_id": "inst-1", "scope": "/repo", "label": "test-runtime"},
)
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
return 0, "{}", ""
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run):
self.core.run_session_end_hook(io.StringIO(json.dumps({"session_id": "abc-123"})))
self.assertEqual(calls[0][:3], ["kv", "del", "identity/workspace/herdr/inst-1"])
self.assertEqual(calls[1][:3], ["kv", "del", "identity/herdr/inst-1"])
self.assertEqual(calls[2][:3], ["deregister", "--as", "inst-1"])
self.assertFalse(self.core.session_scratch("abc-123").joinpath("meta.json").exists())
def test_session_end_releases_herdr_agent_when_env_present(self) -> None:
os.environ["HERDR_PANE_ID"] = "pane-1"
os.environ["HERDR_SOCKET_PATH"] = "/tmp/herdr.sock"
self.core.write_session_meta(
"abc-123",
{"instance_id": "inst-1", "scope": "/repo", "label": "test-runtime"},
)
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
return 0, "{}", ""
with (
mock.patch.object(self.core, "run_swarm", side_effect=fake_run),
mock.patch(
"integrations._shared.swarm_hook_core.herdr_agent_report.release_agent",
return_value=True,
) as release_agent,
):
self.core.run_session_end_hook(io.StringIO(json.dumps({"session_id": "abc-123"})))
release_agent.assert_called_once_with(
agent="test-runtime",
source="swarm-mcp:test-runtime:inst-1",
)
self.assertEqual(calls[0][:3], ["kv", "del", "identity/workspace/herdr/inst-1"])
self.assertEqual(calls[1][:3], ["kv", "del", "identity/herdr/inst-1"])
self.assertEqual(calls[2][:3], ["deregister", "--as", "inst-1"])
def test_herdr_report_agent_skips_without_required_env(self) -> None:
os.environ["HERDR_PANE_ID"] = "pane-1"
with mock.patch(
"integrations._shared.herdr_agent_report.socket.socket"
) as socket_factory:
ok = herdr_agent_report.report_agent(
agent="codex",
state="idle",
source="swarm-mcp:codex:inst-1",
)
self.assertFalse(ok)
socket_factory.assert_not_called()
def test_herdr_report_agent_returns_false_on_socket_error(self) -> None:
os.environ["HERDR_PANE_ID"] = "pane-1"
os.environ["HERDR_SOCKET_PATH"] = "/tmp/herdr.sock"
with mock.patch(
"integrations._shared.herdr_agent_report.socket.socket",
side_effect=OSError("connect failed"),
):
ok = herdr_agent_report.report_agent(
agent="codex",
state="idle",
source="swarm-mcp:codex:inst-1",
)
self.assertFalse(ok)
def _run_pre_tool(
self,
payload: dict,
locks_response: list[dict] | None = None,
register_calls: list[list[str]] | None = None,
) -> str:
"""Invoke run_pre_tool_use_hook with a stubbed `swarm-mcp locks` response."""
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
if register_calls is not None:
register_calls.append(args)
if args and args[0] == "locks":
return 0, json.dumps(locks_response or []), ""
return 0, "[]", ""
out = io.StringIO()
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run), redirect_stdout(out):
self.core.run_pre_tool_use_hook(io.StringIO(json.dumps(payload)))
return out.getvalue()
def test_pre_tool_allows_write_when_no_locks_exist(self) -> None:
self.core.write_session_meta(
"abc-123", {"instance_id": "inst-self", "scope": "/repo"}
)
calls: list[list[str]] = []
output = self._run_pre_tool(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
},
locks_response=[],
register_calls=calls,
)
self.assertEqual(output, "")
self.assertEqual(calls[0][:1], ["locks"])
self.assertIn("--scope", calls[0])
self.assertEqual(calls[0][calls[0].index("--scope") + 1], "/repo")
# No acquire / release calls — check-only.
self.assertFalse(any(args[0] == "lock" for args in calls))
self.assertFalse(any(args[0] == "unlock" for args in calls))
def test_pre_tool_blocks_write_when_peer_holds_target(self) -> None:
self.core.write_session_meta(
"abc-123", {"instance_id": "inst-self", "scope": "/repo"}
)
output = self._run_pre_tool(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
},
locks_response=[
{
"id": "lock-1",
"instance_id": "inst-peer-xyz",
"file": "/repo/file.txt",
"type": "lock",
"content": "refactoring auth flow",
"created_at": 0,
}
],
)
decision = json.loads(output)
block = decision["hookSpecificOutput"]
self.assertEqual(block["hookEventName"], "PreToolUse")
self.assertEqual(block["permissionDecision"], "deny")
reason = block["permissionDecisionReason"]
self.assertIn("swarm lock blocked Write", reason)
self.assertIn("/repo/file.txt", reason)
self.assertIn("inst-pee", reason) # 8-char prefix of holder
self.assertIn("refactoring auth flow", reason)
def test_pre_tool_lock_reason_uses_owner_label_and_pane_when_available(self) -> None:
self.core.write_session_meta(
"abc-123", {"instance_id": "inst-self", "scope": "/repo"}
)
output = self._run_pre_tool(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
},
locks_response=[
{
"id": "lock-1",
"instance_id": "inst-peer-xyz",
"owner_label": "role:implementer name:bob",
"pane_id": "pane-3",
"file": "/repo/file.txt",
"type": "lock",
"content": "",
"created_at": 0,
}
],
)
reason = json.loads(output)["hookSpecificOutput"]["permissionDecisionReason"]
self.assertIn("held by role:implementer name:bob (pane pane-3)", reason)
def test_pre_tool_allows_write_when_lock_is_self_held(self) -> None:
"""Re-entrant: an agent that declared a wider critical section keeps editing."""
self.core.write_session_meta(
"abc-123", {"instance_id": "inst-self", "scope": "/repo"}
)
output = self._run_pre_tool(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
},
locks_response=[
{
"id": "lock-1",
"instance_id": "inst-self",
"file": "/repo/file.txt",
"type": "lock",
"content": "agent-declared critical section",
"created_at": 0,
}
],
)
self.assertEqual(output, "")
def test_pre_tool_allows_write_when_peer_holds_unrelated_file(self) -> None:
self.core.write_session_meta(
"abc-123", {"instance_id": "inst-self", "scope": "/repo"}
)
output = self._run_pre_tool(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
},
locks_response=[
{
"id": "lock-1",
"instance_id": "inst-peer-xyz",
"file": "/repo/other.txt",
"type": "lock",
"content": "different file",
"created_at": 0,
}
],
)
self.assertEqual(output, "")
def test_pre_tool_fails_open_when_own_instance_id_unknown(self) -> None:
"""Coordination is opt-in. Without an instance_id we can't distinguish own vs peer locks."""
self.core.write_session_meta("abc-123", {"scope": "/repo"})
calls: list[list[str]] = []
output = self._run_pre_tool(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
},
locks_response=[
{
"id": "lock-1",
"instance_id": "inst-peer-xyz",
"file": "/repo/file.txt",
"type": "lock",
"content": "should not matter — we don't know own id",
"created_at": 0,
}
],
register_calls=calls,
)
self.assertEqual(output, "")
# No locks call should fire when we bail before identity check.
self.assertFalse(any(args[0] == "locks" for args in calls))
def test_post_tool_use_hook_is_a_noop(self) -> None:
"""Check-only model: nothing was acquired, so post-hook does nothing."""
calls: list[list[str]] = []
def fake_run(args: list[str], timeout: float = 8.0) -> tuple[int, str, str]:
calls.append(args)
return 0, "{}", ""
out = io.StringIO()
with mock.patch.object(self.core, "run_swarm", side_effect=fake_run), redirect_stdout(out):
rc = self.core.run_post_tool_use_hook(
io.StringIO(
json.dumps(
{
"session_id": "abc-123",
"tool_name": "Write",
"tool_input": {"file_path": "/repo/file.txt"},
}
)
)
)
self.assertEqual(rc, 0)
self.assertEqual(out.getvalue(), "")
self.assertEqual(calls, [])
if name == "main": unittest.main()
