Compare commits
1 Commits
fix/plugin
...
feat/kanba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60e75674e7 |
164
gateway/run.py
164
gateway/run.py
@@ -2293,6 +2293,12 @@ class GatewayRunner:
|
||||
# so human-in-the-loop workflows hear back without polling.
|
||||
asyncio.create_task(self._kanban_notifier_watcher())
|
||||
|
||||
# Start background kanban dispatcher — spawns workers for ready
|
||||
# tasks. Gated by `kanban.dispatch_in_gateway` (default True).
|
||||
# When false, users run `hermes kanban daemon` externally or
|
||||
# simply don't use kanban; this loop becomes a no-op.
|
||||
asyncio.create_task(self._kanban_dispatcher_watcher())
|
||||
|
||||
# Start background reconnection watcher for platforms that failed at startup
|
||||
if self._failed_platforms:
|
||||
logger.info(
|
||||
@@ -2703,6 +2709,164 @@ class GatewayRunner:
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
async def _kanban_dispatcher_watcher(self) -> None:
|
||||
"""Embedded kanban dispatcher — one tick every `dispatch_interval_seconds`.
|
||||
|
||||
Gated by `kanban.dispatch_in_gateway` in config.yaml (default True).
|
||||
When true, the gateway hosts the single dispatcher for this profile:
|
||||
no separate `hermes kanban daemon` process needed. When false, the
|
||||
loop exits immediately and an external daemon is expected.
|
||||
|
||||
Each tick calls :func:`kanban_db.dispatch_once` inside
|
||||
``asyncio.to_thread`` so the SQLite WAL lock never blocks the
|
||||
event loop. Failures in one tick don't stop subsequent ticks —
|
||||
same pattern as `_kanban_notifier_watcher`.
|
||||
|
||||
Shutdown: the loop checks ``self._running`` between ticks; gateway
|
||||
stop() flips it to False and cancels pending tasks, and the
|
||||
in-flight ``to_thread`` returns on its own after the current
|
||||
``dispatch_once`` call finishes (typically <1ms on an idle board).
|
||||
"""
|
||||
# Read config once at boot. If the user flips the flag later, they
|
||||
# restart the gateway; same pattern as every other background
|
||||
# watcher here. Honours HERMES_KANBAN_DISPATCH_IN_GATEWAY env var
|
||||
# as an escape hatch (false-y value disables without editing YAML).
|
||||
try:
|
||||
from hermes_cli.config import load_config as _load_config
|
||||
except Exception:
|
||||
logger.warning("kanban dispatcher: config loader unavailable; disabled")
|
||||
return
|
||||
env_override = os.environ.get("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "").strip().lower()
|
||||
if env_override in ("0", "false", "no", "off"):
|
||||
logger.info("kanban dispatcher: disabled via HERMES_KANBAN_DISPATCH_IN_GATEWAY env")
|
||||
return
|
||||
|
||||
try:
|
||||
cfg = _load_config()
|
||||
except Exception as exc:
|
||||
logger.warning("kanban dispatcher: cannot load config (%s); disabled", exc)
|
||||
return
|
||||
kanban_cfg = cfg.get("kanban", {}) if isinstance(cfg, dict) else {}
|
||||
if not kanban_cfg.get("dispatch_in_gateway", True):
|
||||
logger.info(
|
||||
"kanban dispatcher: disabled via config kanban.dispatch_in_gateway=false"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
from hermes_cli import kanban_db as _kb
|
||||
except Exception:
|
||||
logger.warning("kanban dispatcher: kanban_db not importable; dispatcher disabled")
|
||||
return
|
||||
|
||||
interval = float(kanban_cfg.get("dispatch_interval_seconds", 60) or 60)
|
||||
if interval < 1.0:
|
||||
interval = 1.0 # sanity floor — tighter than this is a footgun
|
||||
|
||||
# Initial delay so the gateway finishes wiring adapters before the
|
||||
# dispatcher spawns workers (those workers may hit gateway notify
|
||||
# subscriptions etc.). Matches the notifier watcher's delay.
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# Health telemetry mirrored from `_cmd_daemon`: warn when ready
|
||||
# queue is non-empty but spawns are 0 for N consecutive ticks —
|
||||
# usually means broken PATH, missing venv, or credential loss.
|
||||
HEALTH_WINDOW = 6
|
||||
bad_ticks = 0
|
||||
last_warn_at = 0
|
||||
|
||||
def _tick_once() -> "Optional[object]":
|
||||
"""Run one dispatch_once; return result or None on error.
|
||||
|
||||
Runs in a worker thread via `asyncio.to_thread`."""
|
||||
conn = None
|
||||
try:
|
||||
conn = _kb.connect()
|
||||
try:
|
||||
_kb.init_db() # idempotent, handles first-run
|
||||
except Exception:
|
||||
pass
|
||||
return _kb.dispatch_once(conn)
|
||||
except Exception:
|
||||
logger.exception("kanban dispatcher: tick failed")
|
||||
return None
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _ready_nonempty() -> bool:
|
||||
"""Cheap probe: is there at least one ready+assigned+unclaimed task?"""
|
||||
conn = None
|
||||
try:
|
||||
conn = _kb.connect()
|
||||
row = conn.execute(
|
||||
"SELECT 1 FROM tasks "
|
||||
"WHERE status = 'ready' AND assignee IS NOT NULL "
|
||||
" AND claim_lock IS NULL LIMIT 1"
|
||||
).fetchone()
|
||||
return row is not None
|
||||
except Exception:
|
||||
return False
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
logger.info(
|
||||
"kanban dispatcher: embedded in gateway (interval=%.1fs)", interval
|
||||
)
|
||||
while self._running:
|
||||
try:
|
||||
res = await asyncio.to_thread(_tick_once)
|
||||
if res is not None and getattr(res, "spawned", None):
|
||||
# Quiet by default — only log when something actually
|
||||
# happened, so an idle gateway stays silent.
|
||||
logger.info(
|
||||
"kanban dispatcher: tick spawned=%d reclaimed=%d "
|
||||
"crashed=%d timed_out=%d promoted=%d auto_blocked=%d",
|
||||
len(res.spawned),
|
||||
res.reclaimed,
|
||||
len(res.crashed) if hasattr(res.crashed, "__len__") else 0,
|
||||
len(res.timed_out) if hasattr(res.timed_out, "__len__") else 0,
|
||||
res.promoted,
|
||||
len(res.auto_blocked) if hasattr(res.auto_blocked, "__len__") else 0,
|
||||
)
|
||||
# Health telemetry
|
||||
ready_pending = await asyncio.to_thread(_ready_nonempty)
|
||||
spawned_any = bool(res and getattr(res, "spawned", None))
|
||||
if ready_pending and not spawned_any:
|
||||
bad_ticks += 1
|
||||
else:
|
||||
bad_ticks = 0
|
||||
if bad_ticks >= HEALTH_WINDOW:
|
||||
now = int(time.time())
|
||||
if now - last_warn_at >= 300:
|
||||
logger.warning(
|
||||
"kanban dispatcher stuck: ready queue non-empty for "
|
||||
"%d consecutive ticks but 0 workers spawned. Check "
|
||||
"profile health (venv, PATH, credentials) and "
|
||||
"`hermes kanban list --status ready`.",
|
||||
bad_ticks,
|
||||
)
|
||||
last_warn_at = now
|
||||
except asyncio.CancelledError:
|
||||
logger.debug("kanban dispatcher: cancelled")
|
||||
raise
|
||||
except Exception:
|
||||
logger.exception("kanban dispatcher: unexpected watcher error")
|
||||
|
||||
# Sleep in 1s slices so shutdown is snappy — otherwise a stop()
|
||||
# waits up to `interval` seconds for the current sleep to finish.
|
||||
slept = 0.0
|
||||
while slept < interval and self._running:
|
||||
await asyncio.sleep(min(1.0, interval - slept))
|
||||
slept += 1.0
|
||||
|
||||
async def _platform_reconnect_watcher(self) -> None:
|
||||
"""Background task that periodically retries connecting failed platforms.
|
||||
|
||||
|
||||
@@ -937,6 +937,24 @@ DEFAULT_CONFIG = {
|
||||
"max_parallel_jobs": None,
|
||||
},
|
||||
|
||||
# Kanban multi-agent coordination — controls the dispatcher loop that
|
||||
# spawns workers for ready tasks. The dispatcher ticks every N seconds
|
||||
# (default 60), reclaims stale claims, promotes dependency-satisfied
|
||||
# todos to ready, and fires `hermes -p <assignee> chat -q ...` for
|
||||
# each claimable ready task. One dispatcher per profile is sufficient;
|
||||
# running more than one on the same kanban.db will race for claims.
|
||||
"kanban": {
|
||||
# Run the dispatcher inside the gateway process. On by default —
|
||||
# the cost is ~300µs every `dispatch_interval_seconds` when idle,
|
||||
# and gateway is the supervisor users already have. Set to false
|
||||
# only if you run the dispatcher as a separate systemd unit or
|
||||
# don't want the gateway to spawn workers.
|
||||
"dispatch_in_gateway": True,
|
||||
# Seconds between dispatcher ticks (idle or not). Lower = snappier
|
||||
# pickup of newly-ready tasks; higher = less SQL pressure.
|
||||
"dispatch_interval_seconds": 60,
|
||||
},
|
||||
|
||||
# execute_code settings — controls the tool used for programmatic tool calls.
|
||||
"code_execution": {
|
||||
# Execution mode:
|
||||
|
||||
@@ -95,6 +95,60 @@ def _parse_workspace_flag(value: str) -> tuple[str, Optional[str]]:
|
||||
)
|
||||
|
||||
|
||||
def _check_dispatcher_presence() -> tuple[bool, str]:
|
||||
"""Return ``(running, message)``.
|
||||
|
||||
- ``running=True``: a gateway is alive for this HERMES_HOME and its
|
||||
config has ``kanban.dispatch_in_gateway`` on (default). Message
|
||||
is a short status line.
|
||||
- ``running=False``: either no gateway is running, or the gateway
|
||||
is running but the config flag is off. Message is human guidance
|
||||
explaining the next step.
|
||||
|
||||
Used by ``hermes kanban create`` (and callers) to warn when a task
|
||||
will sit in ``ready`` because nothing is there to pick it up.
|
||||
Defensive against import failures and config-read errors — if the
|
||||
probe itself errors, we return ``(True, "")`` so we don't spam
|
||||
false warnings (better to miss a warning than to cry wolf).
|
||||
"""
|
||||
try:
|
||||
from gateway.status import get_running_pid # type: ignore
|
||||
except Exception:
|
||||
return (True, "") # can't probe — silent
|
||||
try:
|
||||
pid = get_running_pid()
|
||||
except Exception:
|
||||
return (True, "") # probe errored — silent
|
||||
|
||||
# Even if the gateway is up, dispatch_in_gateway may be off.
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
cfg = load_config()
|
||||
dispatch_on = bool(cfg.get("kanban", {}).get("dispatch_in_gateway", True))
|
||||
except Exception:
|
||||
dispatch_on = True # can't tell — assume default
|
||||
|
||||
if pid and dispatch_on:
|
||||
return (True, f"gateway pid={pid}, dispatch enabled")
|
||||
if pid and not dispatch_on:
|
||||
return (
|
||||
False,
|
||||
"Gateway is running but kanban.dispatch_in_gateway=false in "
|
||||
"config.yaml — the task will sit in 'ready' until you flip it "
|
||||
"back on and restart the gateway, OR run the legacy "
|
||||
"standalone daemon (`hermes kanban daemon --force`)."
|
||||
)
|
||||
return (
|
||||
False,
|
||||
"No gateway is running — the task will sit in 'ready' until you "
|
||||
"start it. Run:\n"
|
||||
" hermes gateway start\n"
|
||||
"The gateway hosts an embedded dispatcher (tick interval 60s by "
|
||||
"default); your task will be picked up on the next tick after "
|
||||
"the gateway comes up."
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Argparse builder
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -239,10 +293,10 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
|
||||
f"(default: {kb.DEFAULT_SPAWN_FAILURE_LIMIT})")
|
||||
p_disp.add_argument("--json", action="store_true")
|
||||
|
||||
# --- daemon ---
|
||||
# --- daemon (deprecated) ---
|
||||
p_daemon = sub.add_parser(
|
||||
"daemon",
|
||||
help="Run the dispatcher continuously until SIGINT/SIGTERM",
|
||||
help="DEPRECATED — dispatcher now runs in the gateway. Use `hermes gateway start`.",
|
||||
)
|
||||
p_daemon.add_argument("--interval", type=float, default=60.0,
|
||||
help="Seconds between dispatch ticks (default: 60)")
|
||||
@@ -254,6 +308,11 @@ def build_parser(parent_subparsers: argparse._SubParsersAction) -> argparse.Argu
|
||||
help="Write the daemon's PID to this file on start")
|
||||
p_daemon.add_argument("--verbose", "-v", action="store_true",
|
||||
help="Log each tick's outcome to stdout")
|
||||
# Undocumented escape hatch for users who truly cannot run the gateway.
|
||||
# Intentionally excluded from --help so nobody discovers it casually and
|
||||
# keeps the old double-dispatcher pattern alive.
|
||||
p_daemon.add_argument("--force", action="store_true",
|
||||
help=argparse.SUPPRESS)
|
||||
|
||||
# --- watch ---
|
||||
p_watch = sub.add_parser(
|
||||
@@ -501,14 +560,14 @@ def _cmd_init(args: argparse.Namespace) -> int:
|
||||
print("No profiles found under ~/.hermes/profiles/.")
|
||||
print("Create one with `hermes -p <name> setup` before assigning tasks.")
|
||||
print()
|
||||
print("Next step: run the dispatcher so ready tasks actually get picked up.")
|
||||
print(" # Foreground (interactive, Ctrl-C to stop):")
|
||||
print(" hermes kanban daemon")
|
||||
print("Next step: start the gateway so ready tasks actually get picked up.")
|
||||
print(" hermes gateway start")
|
||||
print()
|
||||
print(" # As a systemd user unit (persists across logins):")
|
||||
print(" systemctl --user enable --now hermes-kanban-dispatcher.service")
|
||||
print()
|
||||
print("Without a running dispatcher, tasks stay in 'ready' forever.")
|
||||
print(
|
||||
"The gateway hosts an embedded dispatcher that ticks every 60 seconds\n"
|
||||
"by default (config: kanban.dispatch_interval_seconds). Without a\n"
|
||||
"running gateway, tasks stay in 'ready' forever."
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
@@ -570,6 +629,18 @@ def _cmd_create(args: argparse.Namespace) -> int:
|
||||
print(json.dumps(_task_to_dict(task), indent=2, ensure_ascii=False))
|
||||
else:
|
||||
print(f"Created {task_id} ({task.status}, assignee={task.assignee or '-'})")
|
||||
|
||||
# Warn when the task would sit in `ready` because no dispatcher is
|
||||
# present. Only warn on ready+assigned tasks — triage/todo are
|
||||
# expected to sit idle until promoted, and unassigned tasks
|
||||
# can't be dispatched. Skipped in --json mode so the stdout
|
||||
# stream stays strictly machine-parseable for callers (the JSON
|
||||
# response itself carries enough info for them to decide if
|
||||
# they want to check dispatcher presence separately).
|
||||
if task.status == "ready" and task.assignee:
|
||||
running, message = _check_dispatcher_presence()
|
||||
if not running and message:
|
||||
print(f"\n⚠ {message}", file=sys.stderr)
|
||||
return 0
|
||||
|
||||
|
||||
@@ -918,7 +989,42 @@ def _cmd_dispatch(args: argparse.Namespace) -> int:
|
||||
|
||||
|
||||
def _cmd_daemon(args: argparse.Namespace) -> int:
|
||||
"""Run the dispatcher continuously. Foreground-safe, signal-clean."""
|
||||
"""Deprecated — the dispatcher now runs inside the gateway.
|
||||
|
||||
Left in as a stub so users with the old command in scripts/systemd
|
||||
units get a clear migration message instead of a cryptic
|
||||
"no such command" error. A ``--force`` escape hatch keeps the old
|
||||
standalone daemon alive for the rare edge case where someone truly
|
||||
cannot run the gateway (e.g. running on a host that forbids
|
||||
long-lived background services), but the default path exits 2
|
||||
with guidance so nobody accidentally keeps running two dispatchers
|
||||
against the same kanban.db.
|
||||
"""
|
||||
# --force lets power users keep the standalone loop for one more
|
||||
# release cycle. Undocumented in `--help` so nobody discovers it
|
||||
# casually — intentional.
|
||||
if not getattr(args, "force", False):
|
||||
print(
|
||||
"hermes kanban daemon: DEPRECATED — the dispatcher now runs\n"
|
||||
"inside the gateway. To use kanban:\n"
|
||||
"\n"
|
||||
" hermes gateway start # starts the gateway + embedded dispatcher\n"
|
||||
"\n"
|
||||
"Ready tasks will be picked up on the next dispatcher tick\n"
|
||||
"(default: every 60 seconds). Configure via config.yaml:\n"
|
||||
"\n"
|
||||
" kanban:\n"
|
||||
" dispatch_in_gateway: true # default\n"
|
||||
" dispatch_interval_seconds: 60\n"
|
||||
"\n"
|
||||
"Running both the gateway AND this standalone daemon will\n"
|
||||
"race for claims. If you truly need the old standalone\n"
|
||||
"daemon (no gateway available), rerun with --force.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
return 2
|
||||
|
||||
# Legacy path — same logic as before, kept behind --force.
|
||||
# Make sure the DB exists before printing "started" so the user sees the
|
||||
# correct DB path and any init error surfaces immediately.
|
||||
kb.init_db()
|
||||
@@ -932,8 +1038,14 @@ def _cmd_daemon(args: argparse.Namespace) -> int:
|
||||
print(f"warning: could not write pidfile {pidfile}: {exc}", file=sys.stderr)
|
||||
|
||||
verbose = bool(getattr(args, "verbose", False))
|
||||
print(f"Kanban dispatcher running (interval={args.interval}s, pid={os.getpid()}). "
|
||||
f"Ctrl-C to stop.")
|
||||
print(
|
||||
f"Kanban dispatcher running STANDALONE via --force "
|
||||
f"(interval={args.interval}s, pid={os.getpid()}). "
|
||||
f"Ctrl-C to stop. NOTE: if a gateway is also running with "
|
||||
f"dispatch_in_gateway=true (default), you have two dispatchers "
|
||||
f"racing for claims.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
|
||||
# Health telemetry: warn when every tick finds ready work but fails to
|
||||
# spawn any worker. Catches broken profiles, PATH drift, missing venv,
|
||||
|
||||
12
plugins/kanban/dashboard/dist/index.js
vendored
12
plugins/kanban/dashboard/dist/index.js
vendored
@@ -428,7 +428,17 @@
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(body),
|
||||
}).then(loadBoard);
|
||||
}).then(function (res) {
|
||||
// Surface dispatcher-presence warnings (e.g. "no gateway is
|
||||
// running") via the existing error banner channel. Not fatal —
|
||||
// the task was created successfully — but the user should know
|
||||
// their ready task will sit idle until the gateway is up.
|
||||
if (res && res.warning) {
|
||||
setError("Task created, but: " + res.warning);
|
||||
}
|
||||
loadBoard();
|
||||
return res;
|
||||
});
|
||||
}, [loadBoard]);
|
||||
|
||||
const toggleSelected = useCallback(function (id, additive) {
|
||||
|
||||
@@ -331,7 +331,22 @@ def create_task(payload: CreateTaskBody):
|
||||
skills=payload.skills,
|
||||
)
|
||||
task = kanban_db.get_task(conn, task_id)
|
||||
return {"task": _task_dict(task) if task else None}
|
||||
body: dict[str, Any] = {"task": _task_dict(task) if task else None}
|
||||
# Surface a dispatcher-presence warning so the UI can show a
|
||||
# banner when a `ready` task would otherwise sit idle because no
|
||||
# gateway is running (or dispatch_in_gateway=false). Only emit
|
||||
# for ready+assigned tasks; triage/todo are expected to wait,
|
||||
# and unassigned tasks can't be dispatched regardless.
|
||||
if task and task.status == "ready" and task.assignee:
|
||||
try:
|
||||
from hermes_cli.kanban import _check_dispatcher_presence
|
||||
running, message = _check_dispatcher_presence()
|
||||
if not running and message:
|
||||
body["warning"] = message
|
||||
except Exception:
|
||||
# Probe failure must never block the create itself.
|
||||
pass
|
||||
return body
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
finally:
|
||||
|
||||
@@ -1,11 +1,26 @@
|
||||
# DEPRECATED — the kanban dispatcher now runs inside the gateway by
|
||||
# default (config key: kanban.dispatch_in_gateway, default true). To
|
||||
# migrate:
|
||||
#
|
||||
# systemctl --user disable --now hermes-kanban-dispatcher.service
|
||||
# # then make sure a gateway is running; e.g. a systemd user unit
|
||||
# # for `hermes gateway start`. The gateway hosts the dispatcher.
|
||||
#
|
||||
# This unit is kept for users who truly cannot run the gateway (host
|
||||
# policy forbids long-lived services, etc.). It now invokes the
|
||||
# standalone dispatcher via the explicit --force flag, so nobody
|
||||
# accidentally keeps two dispatchers racing against the same
|
||||
# kanban.db. Running this unit AND a gateway with
|
||||
# dispatch_in_gateway=true is NOT supported.
|
||||
|
||||
[Unit]
|
||||
Description=Hermes Kanban dispatcher (hermes kanban daemon)
|
||||
Description=Hermes Kanban dispatcher (DEPRECATED standalone daemon — prefer gateway-embedded dispatch)
|
||||
Documentation=https://hermes-agent.nousresearch.com/docs/user-guide/features/kanban
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
Type=simple
|
||||
ExecStart=/usr/bin/env hermes kanban daemon --interval 60 --pidfile %t/hermes-kanban-dispatcher.pid
|
||||
ExecStart=/usr/bin/env hermes kanban daemon --force --interval 60 --pidfile %t/hermes-kanban-dispatcher.pid
|
||||
Restart=on-failure
|
||||
RestartSec=5
|
||||
# Log to the journal via stdout/stderr; the dispatcher also writes per-task
|
||||
|
||||
@@ -10,6 +10,7 @@ parity across every registered verb.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
@@ -2454,3 +2455,259 @@ def test_legacy_db_without_skills_column_migrates(tmp_path):
|
||||
assert "skills" in keys
|
||||
assert row["skills"] is None
|
||||
conn.close()
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gateway-embedded dispatcher: config, CLI warnings, daemon deprecation stub
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_config_default_dispatch_in_gateway_is_true():
|
||||
"""Default config must enable gateway-embedded dispatch out of the box.
|
||||
Flipping this default to false is a user-visible behaviour change and
|
||||
should require a conscious migration."""
|
||||
from hermes_cli.config import DEFAULT_CONFIG
|
||||
kanban = DEFAULT_CONFIG.get("kanban", {})
|
||||
assert kanban.get("dispatch_in_gateway") is True, (
|
||||
"kanban.dispatch_in_gateway default should be True; got "
|
||||
f"{kanban.get('dispatch_in_gateway')!r}"
|
||||
)
|
||||
interval = kanban.get("dispatch_interval_seconds")
|
||||
assert isinstance(interval, (int, float)) and interval >= 1, (
|
||||
f"dispatch_interval_seconds must be a positive number, got {interval!r}"
|
||||
)
|
||||
|
||||
|
||||
def test_check_dispatcher_presence_silent_when_gateway_running(monkeypatch):
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 12345)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": True}},
|
||||
)
|
||||
running, msg = kb_cli._check_dispatcher_presence()
|
||||
assert running is True
|
||||
# Either empty (if import failed defensively) or includes the pid.
|
||||
assert msg == "" or "12345" in msg
|
||||
|
||||
|
||||
def test_check_dispatcher_presence_warns_when_no_gateway(monkeypatch):
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": True}},
|
||||
)
|
||||
running, msg = kb_cli._check_dispatcher_presence()
|
||||
assert running is False
|
||||
assert "hermes gateway start" in msg
|
||||
|
||||
|
||||
def test_check_dispatcher_presence_warns_when_flag_off(monkeypatch):
|
||||
"""Gateway is up but dispatch_in_gateway=false -> warning."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 999)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": False}},
|
||||
)
|
||||
running, msg = kb_cli._check_dispatcher_presence()
|
||||
assert running is False
|
||||
assert "dispatch_in_gateway" in msg
|
||||
|
||||
|
||||
def test_check_dispatcher_presence_silent_on_probe_error(monkeypatch):
|
||||
"""If the probe itself errors, we stay silent."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
def _raise():
|
||||
raise RuntimeError("boom")
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", _raise)
|
||||
running, msg = kb_cli._check_dispatcher_presence()
|
||||
assert running is True
|
||||
assert msg == ""
|
||||
|
||||
|
||||
def _make_create_ns(**overrides):
|
||||
"""Build a Namespace suitable for kb_cli._cmd_create()."""
|
||||
ns = argparse.Namespace(
|
||||
title="x", body=None, assignee="worker",
|
||||
created_by="user", workspace="scratch", tenant=None,
|
||||
priority=0, parent=None, triage=False,
|
||||
idempotency_key=None, max_runtime=None, skills=None,
|
||||
json=False,
|
||||
)
|
||||
for k, v in overrides.items():
|
||||
setattr(ns, k, v)
|
||||
return ns
|
||||
|
||||
|
||||
def test_cli_create_warns_when_no_gateway(kanban_home, monkeypatch, capsys):
|
||||
"""ready+assigned task + no gateway -> warning on stderr."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": True}},
|
||||
)
|
||||
ns = _make_create_ns(title="warn-me", assignee="worker")
|
||||
assert kb_cli._cmd_create(ns) == 0
|
||||
captured = capsys.readouterr()
|
||||
# Stderr has the warning prefix + guidance.
|
||||
assert "hermes gateway start" in captured.err
|
||||
|
||||
|
||||
def test_cli_create_silent_when_gateway_up(kanban_home, monkeypatch, capsys):
|
||||
"""gateway running + dispatch enabled -> no warning."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: 4242)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": True}},
|
||||
)
|
||||
ns = _make_create_ns(title="silent", assignee="worker")
|
||||
assert kb_cli._cmd_create(ns) == 0
|
||||
captured = capsys.readouterr()
|
||||
assert "hermes gateway start" not in captured.err
|
||||
|
||||
|
||||
def test_cli_create_no_warn_on_triage(kanban_home, monkeypatch, capsys):
|
||||
"""Triage tasks can't be dispatched -> no warning."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": True}},
|
||||
)
|
||||
ns = _make_create_ns(title="triage-task", assignee=None, triage=True)
|
||||
assert kb_cli._cmd_create(ns) == 0
|
||||
err = capsys.readouterr().err
|
||||
assert "hermes gateway start" not in err
|
||||
|
||||
|
||||
def test_cli_create_no_warn_unassigned(kanban_home, monkeypatch, capsys):
|
||||
"""Unassigned tasks can't be dispatched -> no warning."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
monkeypatch.setattr("gateway.status.get_running_pid", lambda: None)
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.config.load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": True}},
|
||||
)
|
||||
ns = _make_create_ns(title="nobody", assignee=None)
|
||||
assert kb_cli._cmd_create(ns) == 0
|
||||
err = capsys.readouterr().err
|
||||
assert "hermes gateway start" not in err
|
||||
|
||||
|
||||
def test_cli_daemon_without_force_prints_deprecation_exits_2(kanban_home, capsys):
|
||||
"""`hermes kanban daemon` (no --force) is a deprecation stub."""
|
||||
from hermes_cli import kanban as kb_cli
|
||||
ns = argparse.Namespace(
|
||||
force=False, interval=60.0, max=None, failure_limit=3,
|
||||
pidfile=None, verbose=False,
|
||||
)
|
||||
rc = kb_cli._cmd_daemon(ns)
|
||||
assert rc == 2
|
||||
err = capsys.readouterr().err
|
||||
assert "DEPRECATED" in err
|
||||
assert "hermes gateway start" in err
|
||||
|
||||
|
||||
def test_cli_daemon_help_marks_deprecated():
|
||||
"""The argparse help string on `daemon` mentions deprecation so users
|
||||
scanning `--help` see the migration before running the stub."""
|
||||
import argparse as _ap
|
||||
from hermes_cli import kanban as kb_cli
|
||||
root = _ap.ArgumentParser()
|
||||
subs = root.add_subparsers()
|
||||
kb_cli.build_parser(subs)
|
||||
# Walk the subparser tree to find the daemon action.
|
||||
daemon_help = None
|
||||
for action in root._actions:
|
||||
if isinstance(action, _ap._SubParsersAction):
|
||||
for name, parser in action.choices.items():
|
||||
if name == "kanban":
|
||||
for sub_action in parser._actions:
|
||||
if isinstance(sub_action, _ap._SubParsersAction):
|
||||
for sname, _ in sub_action.choices.items():
|
||||
if sname == "daemon":
|
||||
daemon_help = sub_action._choices_actions
|
||||
break
|
||||
# _choices_actions is a list of _ChoicesPseudoAction-like objects with .help
|
||||
found_deprecation = False
|
||||
if daemon_help:
|
||||
for act in daemon_help:
|
||||
if getattr(act, "dest", "") == "daemon":
|
||||
if "DEPRECATED" in (act.help or ""):
|
||||
found_deprecation = True
|
||||
break
|
||||
assert found_deprecation, (
|
||||
"daemon subparser help should be marked DEPRECATED so users see "
|
||||
"the migration guidance in `hermes kanban --help` output"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gateway embedded dispatcher watcher
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_gateway_dispatcher_watcher_respects_config_flag_off(monkeypatch):
|
||||
"""dispatch_in_gateway=false -> watcher exits fast, no loop."""
|
||||
import asyncio
|
||||
from gateway.run import GatewayRunner
|
||||
import hermes_cli.config as _cfg_mod
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._running = True
|
||||
|
||||
monkeypatch.setattr(
|
||||
_cfg_mod, "load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": False}},
|
||||
)
|
||||
asyncio.run(
|
||||
asyncio.wait_for(
|
||||
runner._kanban_dispatcher_watcher(),
|
||||
timeout=3.0,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_gateway_dispatcher_watcher_respects_env_override(monkeypatch):
|
||||
"""HERMES_KANBAN_DISPATCH_IN_GATEWAY=0 disables without touching config."""
|
||||
import asyncio
|
||||
from gateway.run import GatewayRunner
|
||||
monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "0")
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._running = True
|
||||
asyncio.run(
|
||||
asyncio.wait_for(
|
||||
runner._kanban_dispatcher_watcher(),
|
||||
timeout=3.0,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch):
|
||||
"""Truthy env value doesn't force-enable — config still decides.
|
||||
(We only treat explicit falses as an override; unset or truthy
|
||||
defers to config.)"""
|
||||
import asyncio
|
||||
from gateway.run import GatewayRunner
|
||||
import hermes_cli.config as _cfg_mod
|
||||
|
||||
monkeypatch.setenv("HERMES_KANBAN_DISPATCH_IN_GATEWAY", "yes")
|
||||
monkeypatch.setattr(
|
||||
_cfg_mod, "load_config",
|
||||
lambda: {"kanban": {"dispatch_in_gateway": False}},
|
||||
)
|
||||
|
||||
runner = object.__new__(GatewayRunner)
|
||||
runner._running = True
|
||||
# config says false, env is truthy — watcher should still exit
|
||||
# (because config is authoritative when env isn't a falsey override).
|
||||
asyncio.run(
|
||||
asyncio.wait_for(
|
||||
runner._kanban_dispatcher_watcher(),
|
||||
timeout=3.0,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -820,3 +820,70 @@ def test_create_task_without_skills_defaults_to_empty_list(client):
|
||||
# dataclasses.asdict which keeps it None. The drawer's
|
||||
# `t.skills && t.skills.length > 0` guard handles both null and [].
|
||||
assert task.get("skills") in (None, [])
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatcher-presence warning in POST /tasks response
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_create_task_includes_warning_when_no_dispatcher(client, monkeypatch):
|
||||
"""ready+assigned task + no gateway -> response has `warning` field
|
||||
so the dashboard UI can surface a banner."""
|
||||
# Force the dispatcher probe to report "not running".
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.kanban._check_dispatcher_presence",
|
||||
lambda: (False, "No gateway is running — start `hermes gateway start`."),
|
||||
)
|
||||
r = client.post(
|
||||
"/api/plugins/kanban/tasks",
|
||||
json={"title": "warn-me", "assignee": "worker"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
data = r.json()
|
||||
assert data.get("warning")
|
||||
assert "gateway" in data["warning"].lower()
|
||||
|
||||
|
||||
def test_create_task_no_warning_when_dispatcher_up(client, monkeypatch):
|
||||
"""Dispatcher running -> no `warning` field in the response."""
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.kanban._check_dispatcher_presence",
|
||||
lambda: (True, ""),
|
||||
)
|
||||
r = client.post(
|
||||
"/api/plugins/kanban/tasks",
|
||||
json={"title": "silent", "assignee": "worker"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert "warning" not in r.json() or not r.json()["warning"]
|
||||
|
||||
|
||||
def test_create_task_no_warning_on_triage(client, monkeypatch):
|
||||
"""Triage tasks never get the warning (they can't be dispatched
|
||||
anyway until promoted)."""
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.kanban._check_dispatcher_presence",
|
||||
lambda: (False, "oh no"),
|
||||
)
|
||||
r = client.post(
|
||||
"/api/plugins/kanban/tasks",
|
||||
json={"title": "triage-task", "assignee": "worker", "triage": True},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert "warning" not in r.json() or not r.json()["warning"]
|
||||
|
||||
|
||||
def test_create_task_probe_error_does_not_break_create(client, monkeypatch):
|
||||
"""Probe failure must never break task creation."""
|
||||
def _raise():
|
||||
raise RuntimeError("probe crashed")
|
||||
monkeypatch.setattr(
|
||||
"hermes_cli.kanban._check_dispatcher_presence", _raise,
|
||||
)
|
||||
r = client.post(
|
||||
"/api/plugins/kanban/tasks",
|
||||
json={"title": "resilient", "assignee": "worker"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.json()["task"]["title"] == "resilient"
|
||||
|
||||
@@ -44,7 +44,7 @@ def _check_kanban_mode() -> bool:
|
||||
set in its env, which the dispatcher sets when spawning a worker.
|
||||
|
||||
Humans running ``hermes chat`` see zero kanban tools. Workers spawned
|
||||
by ``hermes kanban daemon`` see all seven.
|
||||
by the kanban dispatcher (gateway-embedded by default) see all seven.
|
||||
"""
|
||||
return bool(os.environ.get("HERMES_KANBAN_TASK"))
|
||||
|
||||
|
||||
10
toolsets.py
10
toolsets.py
@@ -210,10 +210,12 @@ TOOLSETS = {
|
||||
"kanban": {
|
||||
"description": (
|
||||
"Kanban multi-agent coordination — only active when the agent "
|
||||
"is spawned by `hermes kanban daemon` (HERMES_KANBAN_TASK env "
|
||||
"set). Lets workers mark tasks done with structured handoffs, "
|
||||
"block for human input, heartbeat during long ops, comment "
|
||||
"on threads, and (for orchestrators) fan out into child tasks."
|
||||
"is spawned by the kanban dispatcher (HERMES_KANBAN_TASK env "
|
||||
"set). The dispatcher runs inside the gateway by default; see "
|
||||
"`kanban.dispatch_in_gateway` in config.yaml. Lets workers mark "
|
||||
"tasks done with structured handoffs, block for human input, "
|
||||
"heartbeat during long ops, comment on threads, and (for "
|
||||
"orchestrators) fan out into child tasks."
|
||||
),
|
||||
"tools": [
|
||||
"kanban_show", "kanban_complete", "kanban_block",
|
||||
|
||||
@@ -111,12 +111,12 @@ for sku in 1001 1002 1003 1004; do
|
||||
done
|
||||
```
|
||||
|
||||
Start the daemon and walk away:
|
||||
Start the gateway and walk away — it hosts the embedded dispatcher
|
||||
that picks up all three specialist profiles' tasks on the same
|
||||
kanban.db:
|
||||
|
||||
```bash
|
||||
hermes kanban daemon --assignee translator &
|
||||
hermes kanban daemon --assignee transcriber &
|
||||
hermes kanban daemon --assignee copywriter &
|
||||
hermes gateway start
|
||||
```
|
||||
|
||||
Now filter the board to `content-ops` (or just search for "Transcribe") and you get this:
|
||||
|
||||
@@ -52,7 +52,7 @@ They coexist: a kanban worker may call `delegate_task` internally during its run
|
||||
- `scratch` (default) — fresh tmp dir under `~/.hermes/kanban/workspaces/<id>/`.
|
||||
- `dir:<path>` — an existing shared directory (Obsidian vault, mail ops dir, per-account folder). **Must be an absolute path.** Relative paths like `dir:../tenants/foo/` are rejected at dispatch because they'd resolve against whatever CWD the dispatcher happens to be in, which is ambiguous and a confused-deputy escape vector. The path is otherwise trusted — it's your box, your filesystem, the worker runs with your uid. This is the trusted-local-user threat model; kanban is single-host by design.
|
||||
- `worktree` — a git worktree under `.worktrees/<id>/` for coding tasks. Worker-side `git worktree add` creates it.
|
||||
- **Dispatcher** — a long-lived loop that, every N seconds (default 60): reclaims stale claims, reclaims crashed workers (PID gone but TTL not yet expired), promotes ready tasks, atomically claims, spawns assigned profiles. Runs as `hermes kanban daemon` (foreground) or as a systemd user service. After ~5 consecutive spawn failures on the same task the dispatcher auto-blocks it with the last error as the reason — prevents thrashing on tasks whose profile doesn't exist, workspace can't mount, etc.
|
||||
- **Dispatcher** — a long-lived loop that, every N seconds (default 60): reclaims stale claims, reclaims crashed workers (PID gone but TTL not yet expired), promotes ready tasks, atomically claims, spawns assigned profiles. Runs **inside the gateway** by default (`kanban.dispatch_in_gateway: true`). After ~5 consecutive spawn failures on the same task the dispatcher auto-blocks it with the last error as the reason — prevents thrashing on tasks whose profile doesn't exist, workspace can't mount, etc.
|
||||
- **Tenant** — optional string namespace. One specialist fleet can serve multiple businesses (`--tenant business-a`) with data isolation by workspace path and memory key prefix.
|
||||
|
||||
## Quick start
|
||||
@@ -61,8 +61,8 @@ They coexist: a kanban worker may call `delegate_task` internally during its run
|
||||
# 1. Create the board
|
||||
hermes kanban init
|
||||
|
||||
# 2. Start the dispatcher (foreground; Ctrl-C to stop)
|
||||
hermes kanban daemon &
|
||||
# 2. Start the gateway (hosts the embedded dispatcher)
|
||||
hermes gateway start
|
||||
|
||||
# 3. Create a task
|
||||
hermes kanban create "research AI funding landscape" --assignee researcher
|
||||
@@ -75,22 +75,32 @@ hermes kanban list
|
||||
hermes kanban stats
|
||||
```
|
||||
|
||||
### Running the dispatcher as a service
|
||||
### Gateway-embedded dispatcher (default)
|
||||
|
||||
For production, install the systemd user unit shipped at
|
||||
`plugins/kanban/systemd/hermes-kanban-dispatcher.service`:
|
||||
The dispatcher runs inside the gateway process. Nothing to install, no
|
||||
separate service to manage — if the gateway is up, ready tasks get picked
|
||||
up on the next tick (60s by default).
|
||||
|
||||
```bash
|
||||
mkdir -p ~/.config/systemd/user
|
||||
cp plugins/kanban/systemd/hermes-kanban-dispatcher.service \
|
||||
~/.config/systemd/user/
|
||||
systemctl --user daemon-reload
|
||||
systemctl --user enable --now hermes-kanban-dispatcher.service
|
||||
systemctl --user status hermes-kanban-dispatcher
|
||||
journalctl --user -u hermes-kanban-dispatcher -f # follow logs
|
||||
```yaml
|
||||
# config.yaml
|
||||
kanban:
|
||||
dispatch_in_gateway: true # default
|
||||
dispatch_interval_seconds: 60 # default
|
||||
```
|
||||
|
||||
Without a running dispatcher `ready` tasks stay where they are — `hermes kanban init` will remind you of this on first run.
|
||||
Override the config flag at runtime via `HERMES_KANBAN_DISPATCH_IN_GATEWAY=0`
|
||||
for debugging. Standard gateway supervision applies: run `hermes gateway
|
||||
start` directly, or wire the gateway up as a systemd user unit (see the
|
||||
gateway docs). Without a running gateway, `ready` tasks stay where they are
|
||||
until one comes up — `hermes kanban create` warns about this at creation
|
||||
time.
|
||||
|
||||
Running `hermes kanban daemon` as a separate process is **deprecated**;
|
||||
use the gateway. If you truly cannot run the gateway (headless host
|
||||
policy forbids long-lived services, etc.) a `--force` escape hatch keeps
|
||||
the old standalone daemon alive for one release cycle, but running both
|
||||
a gateway-embedded dispatcher AND a standalone daemon against the same
|
||||
`kanban.db` causes claim races and is not supported.
|
||||
|
||||
### Idempotent create (for automation / webhooks)
|
||||
|
||||
@@ -345,7 +355,7 @@ hermes kanban runs <id> [--json] # attempt history (one ro
|
||||
hermes kanban assignees [--json] # profiles on disk + per-assignee task counts
|
||||
hermes kanban dispatch [--dry-run] [--max N] # one-shot pass
|
||||
[--failure-limit N] [--json]
|
||||
hermes kanban daemon [--interval SECS] [--max N] # long-lived loop
|
||||
hermes kanban daemon --force # DEPRECATED — standalone dispatcher (use `hermes gateway start` instead)
|
||||
[--failure-limit N] [--pidfile PATH] [-v]
|
||||
hermes kanban stats [--json] # per-status + per-assignee counts
|
||||
hermes kanban log <id> [--tail BYTES] # worker log from ~/.hermes/kanban/logs/
|
||||
|
||||
Reference in New Issue
Block a user