Compare commits

...

25 Commits

Author SHA1 Message Date
alt-glitch 4d810d9591 style(comfyui): format SKILL.md — table alignment, YAML tags, onboarding hint 2026-04-29 13:50:17 +05:30
alt-glitch fff37dd79e feat(skills): add comfyui skill — CLI-driven image/video/audio generation
Agent-friendly ComfyUI skill using the comfyui-skill CLI (invoked via uvx,
no persistent install). The CLI wraps ComfyUI's REST API into named 'skills'
with parameter schemas — the agent works with friendly args like
{"prompt": "a cat"} instead of raw node graphs.

- SKILL.md: setup/onboarding, core workflow, decision tree, multi-server,
  image upload, model/node discovery, queue management, pitfalls
- references/cli-reference.md: complete command map (27 leaf commands)
- references/api-notes.md: underlying REST endpoints for debugging
- scripts/comfyui_setup.sh: workspace initialization

Based on comfyui-skill-cli by HuangYuChuh, original skill work by kshitijk4poor.
2026-04-29 12:12:09 +05:30
teknium1 fa9383d27b feat(curator): umbrella-first prompt, inherit parent config, unbounded iterations
Based on three live test runs against 346 agent-created skills on the
author's own setup (~6.5 min, opus-4.7, 86 API calls), the curator
prompt needed three sharpenings before it consistently produced real
umbrella consolidation instead of passive audit output:

**Umbrella-first framing.** The original 'decide keep/patch/archive/
consolidate' framing lets opus default to 'keep' whenever two skills
aren't byte-identical. The new prompt explicitly tells the reviewer
that pairwise distinctness is the wrong bar — the right question is
'would a human maintainer write this as N separate skills, or one
skill with N labeled subsections?' Expect 10-25 prefix clusters; merge
each into an umbrella via one of three methods.

**Three concrete consolidation methods.** (a) Merge into an existing
umbrella (patch the broadest skill, archive siblings); (b) Create a
new umbrella SKILL.md (skill_manage action=create); (c) Demote
session-specific detail into references/, templates/, or scripts/
under the umbrella via skill_manage action=write_file, then archive
the narrow sibling. This matches the support-file vocabulary the
review-prompt side already uses (PR #17213).

**Two observed bailouts pre-empted:** 'usage counters are zero so I
can't judge' (rule 4: judge on content, not use_count) and 'each has
a distinct trigger' (rule 5: pairwise distinctness is the wrong bar).

**Config-aware parent inheritance.** _run_llm_review() was building
AIAgent() without explicit provider/model, hitting an auto-resolve
path that returned empty credentials → HTTP 400 'No models provided'
against OpenRouter. Fork now inherits the user's main provider and
model (via load_config + resolve_runtime_provider) before spawning —
runs on whatever the user is currently on, OAuth-backed or
pool-backed included.

**Unbounded iteration ceiling.** max_iterations=8 was way too low for
an umbrella-build pass over hundreds of skills. A live pass takes
50-100 API calls (scanning, clustering, skill_view'ing candidates,
patching umbrellas, mv'ing siblings). Raised to 9999 — the natural
stopping criterion is 'no more clusters worth processing', not an
arbitrary tool-call budget.

**Tests updated:** test_curator_review_prompt_has_invariants accepts
DO NOT / MUST NOT and drops 'keep' from the required-verb set (the
umbrella-first prompt correctly deemphasizes 'keep' as a first-class
decision label since passive keep-everything is the failure mode
being prevented). Added test_curator_review_prompt_is_umbrella_first
asserting the umbrella framing, class-level thinking, references/
+ templates/ + scripts/ support-file mentions, and the 'use_count
is not evidence of value' pre-emption. Added
test_curator_review_prompt_offers_support_file_actions asserting
skill_manage action=create and action=write_file are both named.

**Live validation on author's setup:**
- Run 1 (old prompt): 3 archives, stopped after surveying — typical passive outcome
- Run 2 (consolidation prompt): 44 archives, 3 patches, surfaced the 50-skill mlops reorg duplicate bug but didn't umbrella
- Run 3 (this prompt): 249 archives + 18 new class-level umbrellas created, reducing agent-created skills from 346 → 118 with every archived skill's content preserved as references/ under its umbrella. Pinned skill untouched. Full report in PR description.
2026-04-28 22:33:33 -07:00
Teknium 019d4c1c3f feat(curator): hook into the gateway's cron-ticker thread
Long-running gateways need the curator to fire on cadence without
restarts. Piggy-back on the existing cron ticker thread (which already
runs image/document cache cleanup every hour on the same pattern)
instead of spawning a dedicated timer thread.

- New CURATOR_EVERY = 60 ticks (poll hourly at default 60s interval).
  The inner config.interval_hours gate controls the real cadence, so
  60 of these 60 hourly pokes are cheap no-ops and one runs the review.
- Removed the boot-time call added in the prior commit — the ticker
  covers boot + every hour thereafter. Avoids double-running.

Handles the weekly-default-on-24/7-gateway gap flagged in review.
2026-04-28 22:33:33 -07:00
Teknium a12f7aa8bb fix(curator): default cycle is every 7 days, not 24 hours
Weekly is closer to how skill churn actually works — most agent-created
skills don't change multiple times per day, so a daily review is pure
cost without benefit. Bumping the default to 7 days reduces aux-model
spend while still catching drift and staleness on the timescales that
matter (30d stale, 90d archive).

Changes:
- DEFAULT_INTERVAL_HOURS: 24 -> 168 (7 days)
- config.yaml default: interval_hours: 24 -> 24 * 7
- CLI status line renders as '7d' when interval is a whole-day multiple
- Test `test_old_run_eligible` decoupled from the exact default: it now
  uses 2 * get_interval_hours() so future tweaks don't break it
2026-04-28 22:33:33 -07:00
Teknium 0d31864e3b fix(curator): defense-in-depth gates against bundled/hub skills
Previous invariants only gated the primary entry points
(apply_automatic_transitions, archive_skill, CLI pin). Several paths
were unprotected:

  - bump_view / bump_use / bump_patch / set_state / set_pinned wrote
    usage records unconditionally, which is confusing noise in
    .usage.json even though the review list filtered them out
  - restore_skill did not check whether a bundled skill now shadows
    the archived name
  - CLI unpin was asymmetric with CLI pin — it had no gate

Fixes:
  - _mutate() (the shared counter / state writer) now drops silently
    when the skill is not agent-created. .usage.json never gains a
    record for a bundled or hub-installed skill.
  - restore_skill() refuses to restore under a name that is now
    bundled or hub-installed (would shadow upstream).
  - CLI unpin gate matches CLI pin.

New tests:
  - 5 provenance-guard tests on skill_usage (one per mutator)
  - 1 end-to-end test that hammers every mutator at a bundled skill
    and a hub skill, asserts both are untouched on disk, and asserts
    the sidecar stays clean
  - 2 CLI tests proving pin/unpin refuse bundled skills symmetrically

64/64 tests passing (29 skill_usage + 27 curator + 8 new guards).
2026-04-28 22:33:33 -07:00
Teknium c8b7e7268a refactor(curator): point review prompt at existing tools
The LLM review prompt mentioned bespoke `archive_skill` and `pin_skill`
tools that are not registered as model tools. Swap the prompt to rely
on the real surface:

  - skill_manage action=patch  — for patching and consolidation
  - terminal                   — to `mv` skill dirs into .archive/

Also drop `pin` from the model's decision list — pinning is a user
opt-out for `hermes curator pin <skill>`, not something the model
should do autonomously.

Decision list is now: keep / patch / consolidate / archive.

Tests updated: prompt-invariant test now asserts the existing tools
are referenced and that bespoke tool names do NOT appear. New test
prevents `pin` from being re-added as a model decision.
2026-04-28 22:33:33 -07:00
Teknium bc79e227e6 feat(curator): background skill maintenance (issue #7816)
Adds the Curator — an auxiliary-model background task that periodically
reviews AGENT-CREATED skills and keeps the collection tidy: tracks usage,
transitions unused skills through active → stale → archived, and spawns
a forked AIAgent to consolidate overlaps and patch drift.

Default: enabled, inactivity-triggered (no cron daemon). Runs on CLI
startup and gateway boot when the last run is older than interval_hours
(default 24) AND the agent has been idle for min_idle_hours (default 2).

Invariants (all load-bearing):
- Never touches bundled or hub-installed skills (.bundled_manifest +
  .hub/lock.json double-filter)
- Never auto-deletes — archive only. Archives are recoverable
  via `hermes curator restore <skill>`
- Pinned skills bypass all auto-transitions
- Uses the aux client; never touches the main session's prompt cache

New files:
- tools/skill_usage.py — sidecar .usage.json telemetry, atomic writes,
  provenance filter
- agent/curator.py — orchestrator: config, idle gating, state-machine
  transitions (pure, no LLM), forked-agent review prompt
- hermes_cli/curator.py — `hermes curator {status,run,pause,resume,
  pin,unpin,restore}` subcommand
- tests/tools/test_skill_usage.py — 29 tests
- tests/agent/test_curator.py — 25 tests

Modified files (surgical patches):
- tools/skills_tool.py — bump view_count on successful skill_view
- tools/skill_manager_tool.py — bump patch_count on skill_manage
  patch/edit/write_file/remove_file; forget record on delete
- hermes_cli/config.py — add curator: section to DEFAULT_CONFIG
- hermes_cli/commands.py — add /curator CommandDef with subcommands
- hermes_cli/main.py — register `hermes curator` subparser via
  register_cli() from hermes_cli.curator
- cli.py — /curator slash-command dispatch + startup hook
- gateway/run.py — gateway-boot hook (mirrors CLI)

Validation:
- 54 new tests across skill_usage + curator, all passing in 3s
- 346 tests across all touched files' neighbors green
- 2783 tests across hermes_cli/ + gateway/test_run_progress_topics.py green
- CLI smoke: `hermes curator status/pause/resume` work end-to-end

Companion to PR #16026 (class-first skill review prompt) — together
they form a loop: the review prompt stops near-duplicate skill creation
at the source, and the curator prunes/consolidates what still accumulates.

Refs #7816.
2026-04-28 22:33:33 -07:00
Mil Wang (from Dev Box) 88602376d4 fix: resolve external_dirs relative to HERMES_HOME instead of cwd (#9949)
Relative entries in skills.external_dirs were resolved against the
process cwd via Path.resolve(), making them silently fail when Hermes
was launched from a different directory.

Resolve relative paths against get_hermes_home() for consistent
behavior across CLI, gateway, and cron contexts. Absolute paths
and env-var/tilde expansion are unchanged.
2026-04-28 22:29:09 -07:00
teknium1 ded12f0968 chore(release): map LyleLengyel@gmail.com -> mcndjxlefnd 2026-04-28 22:26:09 -07:00
Lyle Lengyel 80e474f11f fix(gateway,terminal): expand shell tilde in terminal.cwd before subprocess
Commit 3c42064e made config.yaml the single source of truth for
TERMINAL_CWD, but the config bridge passes cwd values verbatim to
os.environ. When a user sets terminal.cwd: ~/ in config.yaml, the
literal string '~/'' reaches subprocess.Popen, which the kernel
rejects because it does not expand shell tilde syntax.

This patch adds three defensive layers:

1. gateway/run.py — expanduser at config bridge time so TERMINAL_CWD
   is always an absolute path.

2. tools/terminal_tool.py — expanduser when reading TERMINAL_CWD in
   _get_env_config(), guarding against stale or manually-set env vars.

3. tools/environments/local.py — expanduser in LocalEnvironment before
   passing cwd to subprocess.Popen, the final safety net.

Includes regression tests in test_config_cwd_bridge.py for nested
terminal.cwd, top-level cwd alias, and precedence ordering.

Refs: 3c42064e
2026-04-28 22:26:09 -07:00
JackJin 88e07c42b4 fix(cli): prevent .env sanitizer from splitting GLM_API_KEY by LM_API_KEY suffix
The known-key splitter in `_sanitize_env_lines` used substring matching
to find concatenated KEY=VALUE pairs. When a registered key was a suffix
of another (LM_API_KEY is a suffix of GLM_API_KEY), the shorter key's
needle would match inside the longer one, causing the sanitizer to
rewrite `GLM_API_KEY=...` as `G\nLM_API_KEY=...` and silently break
Z.AI/GLM auth (and similarly `GLM_BASE_URL` -> `G\nLM_BASE_URL`).

Drop matches whose needle range is fully contained within a longer
overlapping match. Two regression tests cover the suffix-collision case
and confirm a real concatenation that happens to start with the longer
key still splits where it should.

Fixes #17138
2026-04-28 22:22:45 -07:00
Brooklyn Nicholson 97a2474b39 review(copilot): point reload.env docstring at hermes_cli.config.reload_env 2026-04-28 22:22:30 -07:00
Brooklyn Nicholson 6b4ef00a2c review(copilot): keep /reload cli_only since gateway has no handler 2026-04-28 22:22:30 -07:00
Brooklyn Nicholson 4858e26eaa feat(tui): port classic CLI /reload (.env hot-reload) to TUI
Classic CLI exposes ``/reload`` (re-reads ~/.hermes/.env into
``os.environ`` via ``hermes_cli.config.reload_env``) so newly added API
keys take effect without restarting the session.  The TUI was missing
the parity command, so users had to Ctrl+C out and ``hermes --tui``
again whenever they added or rotated a credential.

Three small wires:

* New ``reload.env`` JSON-RPC method in ``tui_gateway/server.py`` that
  delegates to ``hermes_cli.config.reload_env`` and returns the count
  of vars updated.
* New ``/reload`` slash command in ``ui-tui/src/app/slash/commands/ops.ts``
  matching the existing ``/reload-mcp`` pattern (native RPC, no slash
  worker).
* Drop ``cli_only=True`` from the ``reload`` ``CommandDef`` in
  ``hermes_cli/commands.py`` so help/menus surface it in the TUI too.
  ``reload_env`` itself is environment-agnostic.

Same caveat as classic CLI: the *currently constructed* agent's
credential pool / provider routing does not auto-rebuild.  Users who
want a brand-new credential resolution should follow with ``/new``.

Tests:
* New ``test_reload_env_rpc_calls_hermes_cli_reload_env`` confirms
  RPC delegates and reports the count.
* New ``test_reload_env_rpc_surfaces_errors`` confirms exceptions are
  rendered as JSON-RPC errors.
* ``createSlashHandler.test.ts`` slash-parity matrix extended with
  ``['/reload', 'reload.env', {}]`` so we can't regress the routing.

Validation:
  scripts/run_tests.sh tests/test_tui_gateway_server.py — 92/92.
  scripts/run_tests.sh tests/hermes_cli/test_commands.py — 128/128.
  cd ui-tui && npm run type-check — clean; npm test --run — 390/390.
2026-04-28 22:22:30 -07:00
Teknium dcd7b717f8 fix(gateway): linearize tool-progress bubbles with content messages (#17280)
After PR #7885 (97b0cd51e) added content-side segment breaks for
natural mid-turn assistant messages, the tool-progress task in
gateway/run.py was not updated to match. progress_msg_id and
progress_lines persisted for the whole run, so after a tool batch
produced bubble B1 followed by content bubble C1, the next tool.started
kept editing the OLD bubble B1 above C1 — making the chat appear out
of order on Telegram, Discord, and Slack.

Add on_new_message callback to GatewayStreamConsumer, fired at the
four sites where a fresh content bubble lands on the platform:
  - _send_or_edit first-send branch (NOT edits)
  - _send_commentary
  - _send_new_chunk (overflow split)
  - each successful chunk of _send_fallback_final

Gateway supplies a lambda that enqueues ('__reset__',) into the
progress_queue. send_progress_messages() handles the marker in both
the main loop and the CancelledError drain path, clearing
progress_msg_id, progress_lines, and the dedup state so the next
tool.started opens a fresh bubble below the new content.

Result: each tool batch appears in chronological order below the
preceding content. When no content appears between tool batches,
tools still group in one bubble (CLI-style compactness).

Co-authored-by: teknium1 <teknium@users.noreply.github.com>
2026-04-28 22:17:33 -07:00
Tranquil-Flow ac855bba0e fix(cli): respect terminal.cwd config in local terminal backend
init_session() runs a login shell bootstrap that sources profile scripts
(.bashrc, .bash_profile, etc.) before capturing pwd. If any profile
script changes the working directory, the captured cwd overwrites the
configured terminal.cwd value — so terminal commands run in the wrong
directory despite the TUI banner showing the configured path.

Add an explicit 'builtin cd' to the configured cwd in the bootstrap
script, after profile sourcing but before pwd capture, ensuring the
configured terminal.cwd is always what gets recorded.

Fixes #14044
2026-04-28 22:16:08 -07:00
Brooklyn Nicholson f95c34f415 fix(browser): address Copilot round-4 on /browser connect
* Reject unsupported schemes (anything outside http/https/ws/wss) in
  cli.py /browser connect before probing or persisting, matching the
  gateway's existing 4015 path.
* Defend gateway browser.manage against `{"url": null}` and
  non-string urls: empty/null falls back to DEFAULT_BROWSER_CDP_URL,
  non-string returns a 4015 instead of slipping into the generic
  5031 catch via TypeError on `"://" in url`.
* Add regression tests for both null-url fallback and non-string
  rejection.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson 679a27498d fix(browser): address Copilot round-3 on /browser connect
* Gate `browser.progress` emit on truthy `session_id`. The TUI
  prints `messages` from the response when there's no session, so
  emitting events too would double-render. Now: with a session →
  events stream live; without one → bundled messages only.
* Resolve `system = platform.system()` once in `_browser_connect`
  and thread it through `try_launch_chrome_debug` and
  `_failure_messages` → `manual_chrome_debug_command`, so the
  generated hint is consistent (and tests are deterministic) on
  any host.
* Add `test_browser_manage_connect_no_session_skips_progress_events`
  to lock in the gating behavior.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson d1ee4915f3 fix(browser): address Copilot review on /browser connect
Fixes from Copilot's two passes on PR #17238:

* Validate parsed URL once: reject missing host, invalid port, and
  unsupported scheme up front so malformed inputs (e.g. http://:9222
  or http://localhost:abc) don't fall through to a generic 5031.
* Tighten _is_default_local_cdp to require a discovery-style path so
  ws://127.0.0.1:9222/devtools/browser/<id> is not collapsed to bare
  http://127.0.0.1:9222 (which would lose the path and break the
  connect).
* Move browser.manage into _LONG_HANDLERS so the up-to-10s
  launch-and-retry loop runs on the RPC pool instead of blocking the
  main dispatcher.
* try_launch_chrome_debug uses Windows-appropriate detach kwargs
  (creationflags=DETACHED_PROCESS|CREATE_NEW_PROCESS_GROUP) instead
  of POSIX-only start_new_session=True.
* manual_chrome_debug_command uses subprocess.list2cmdline on
  Windows so the printed instruction is cmd.exe-compatible.
* Mirror host/port validation in cli.py /browser connect so the
  classic CLI never persists an invalid BROWSER_CDP_URL.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson 26816d1f77 refactor(tui): tighten /browser connect plumbing
Split browser.manage into a small dispatcher with named connect/disconnect
helpers, fold _http_ok / _probe_urls / _normalize_cdp_url out of the nested
probe loop, collapse the failure-message scaffolding, and DRY the chrome
candidate path tables. Behaviour and event shape unchanged.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson e750829015 fix(tui): stream /browser connect progress as gateway events
Emit browser.progress JSON-RPC notifications during the connect work and render them in the TUI as system transcript lines, so users see the same step-by-step status the base CLI prints instead of nothing for ~1m followed by a final result.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson 7d39a45749 fix(tui): show /browser connect progress like CLI
Return CLI-style browser connect status messages from the gateway and render them in the TUI so local Chrome launch attempts are visible instead of ending in a silent delayed failure.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson 69ff114ee2 fix(browser): avoid bogus Chrome launch fallback
Detect an actual Chrome/Chromium executable before printing a manual CDP launch command, including common WSL-mounted Windows browser paths, so /browser connect does not suggest google-chrome when it is unavailable.
2026-04-28 22:11:10 -07:00
Brooklyn Nicholson f10a3df632 fix(tui): align /browser connect local CDP handling
Share Chrome CDP launch helpers between the classic CLI and TUI so default /browser connect uses loopback consistently, retries local Chrome launch, and reports a copyable manual-start command instead of claiming a dead connection.
2026-04-28 22:11:10 -07:00
35 changed files with 4402 additions and 270 deletions
+561
View File
@@ -0,0 +1,561 @@
"""Curator — background skill maintenance orchestrator.
The curator is an auxiliary-model task that periodically reviews agent-created
skills and maintains the collection. It runs inactivity-triggered (no cron
daemon): when the agent is idle and the last curator run was longer than
``interval_hours`` ago, ``maybe_run_curator()`` spawns a forked AIAgent to do
the review.
Responsibilities:
- Auto-transition lifecycle states based on last_used_at timestamps
- Spawn a background review agent that can pin / archive / consolidate /
patch agent-created skills via skill_manage
- Persist curator state (last_run_at, paused, etc.) in .curator_state
Strict invariants:
- Only touches agent-created skills (see tools/skill_usage.is_agent_created)
- Never auto-deletes — only archives. Archive is recoverable.
- Pinned skills bypass all auto-transitions
- Uses the auxiliary client; never touches the main session's prompt cache
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
import threading
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable, Dict, Optional
from hermes_constants import get_hermes_home
from tools import skill_usage
logger = logging.getLogger(__name__)
DEFAULT_INTERVAL_HOURS = 24 * 7 # 7 days
DEFAULT_MIN_IDLE_HOURS = 2
DEFAULT_STALE_AFTER_DAYS = 30
DEFAULT_ARCHIVE_AFTER_DAYS = 90
# ---------------------------------------------------------------------------
# .curator_state — persistent scheduler + status
# ---------------------------------------------------------------------------
def _state_file() -> Path:
return get_hermes_home() / "skills" / ".curator_state"
def _default_state() -> Dict[str, Any]:
return {
"last_run_at": None,
"last_run_duration_seconds": None,
"last_run_summary": None,
"paused": False,
"run_count": 0,
}
def load_state() -> Dict[str, Any]:
path = _state_file()
if not path.exists():
return _default_state()
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
base = _default_state()
base.update({k: v for k, v in data.items() if k in base or k.startswith("_")})
return base
except (OSError, json.JSONDecodeError) as e:
logger.debug("Failed to read curator state: %s", e)
return _default_state()
def save_state(data: Dict[str, Any]) -> None:
path = _state_file()
try:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=str(path.parent), prefix=".curator_state_", suffix=".tmp")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)
except BaseException:
try:
os.unlink(tmp)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to save curator state: %s", e, exc_info=True)
def set_paused(paused: bool) -> None:
state = load_state()
state["paused"] = bool(paused)
save_state(state)
def is_paused() -> bool:
return bool(load_state().get("paused"))
# ---------------------------------------------------------------------------
# Config access
# ---------------------------------------------------------------------------
def _load_config() -> Dict[str, Any]:
"""Read curator.* config from ~/.hermes/config.yaml. Tolerates missing file."""
try:
from hermes_cli.config import load_config
cfg = load_config()
except Exception as e:
logger.debug("Failed to load config for curator: %s", e)
return {}
if not isinstance(cfg, dict):
return {}
cur = cfg.get("curator") or {}
if not isinstance(cur, dict):
return {}
return cur
def is_enabled() -> bool:
"""Default ON when no config says otherwise."""
cfg = _load_config()
return bool(cfg.get("enabled", True))
def get_interval_hours() -> int:
cfg = _load_config()
try:
return int(cfg.get("interval_hours", DEFAULT_INTERVAL_HOURS))
except (TypeError, ValueError):
return DEFAULT_INTERVAL_HOURS
def get_min_idle_hours() -> float:
cfg = _load_config()
try:
return float(cfg.get("min_idle_hours", DEFAULT_MIN_IDLE_HOURS))
except (TypeError, ValueError):
return DEFAULT_MIN_IDLE_HOURS
def get_stale_after_days() -> int:
cfg = _load_config()
try:
return int(cfg.get("stale_after_days", DEFAULT_STALE_AFTER_DAYS))
except (TypeError, ValueError):
return DEFAULT_STALE_AFTER_DAYS
def get_archive_after_days() -> int:
cfg = _load_config()
try:
return int(cfg.get("archive_after_days", DEFAULT_ARCHIVE_AFTER_DAYS))
except (TypeError, ValueError):
return DEFAULT_ARCHIVE_AFTER_DAYS
# ---------------------------------------------------------------------------
# Idle / interval check
# ---------------------------------------------------------------------------
def _parse_iso(ts: Optional[str]) -> Optional[datetime]:
if not ts:
return None
try:
return datetime.fromisoformat(ts)
except (TypeError, ValueError):
return None
def should_run_now(now: Optional[datetime] = None) -> bool:
"""Return True if the curator should run immediately.
Gates:
- curator.enabled == True
- not paused
- last_run_at missing, OR older than interval_hours
The idle check (min_idle_hours) is applied at the call site where we know
whether an agent is actively running — here we only enforce the static
gates.
"""
if not is_enabled():
return False
if is_paused():
return False
state = load_state()
last = _parse_iso(state.get("last_run_at"))
if last is None:
return True
if now is None:
now = datetime.now(timezone.utc)
if last.tzinfo is None:
last = last.replace(tzinfo=timezone.utc)
interval = timedelta(hours=get_interval_hours())
return (now - last) >= interval
# ---------------------------------------------------------------------------
# Automatic state transitions (pure function, no LLM)
# ---------------------------------------------------------------------------
def apply_automatic_transitions(now: Optional[datetime] = None) -> Dict[str, int]:
"""Walk every agent-created skill and move active/stale/archived based on
last_used_at. Pinned skills are never touched. Returns a counter dict
describing what changed."""
from tools import skill_usage as _u
if now is None:
now = datetime.now(timezone.utc)
stale_cutoff = now - timedelta(days=get_stale_after_days())
archive_cutoff = now - timedelta(days=get_archive_after_days())
counts = {"marked_stale": 0, "archived": 0, "reactivated": 0, "checked": 0}
for row in _u.agent_created_report():
counts["checked"] += 1
name = row["name"]
if row.get("pinned"):
continue
last_used = _parse_iso(row.get("last_used_at"))
# If never used, treat as using created_at as the anchor so new skills
# don't immediately archive themselves.
anchor = last_used or _parse_iso(row.get("created_at")) or now
if anchor.tzinfo is None:
anchor = anchor.replace(tzinfo=timezone.utc)
current = row.get("state", _u.STATE_ACTIVE)
if anchor <= archive_cutoff and current != _u.STATE_ARCHIVED:
ok, _msg = _u.archive_skill(name)
if ok:
counts["archived"] += 1
elif anchor <= stale_cutoff and current == _u.STATE_ACTIVE:
_u.set_state(name, _u.STATE_STALE)
counts["marked_stale"] += 1
elif anchor > stale_cutoff and current == _u.STATE_STALE:
# Skill got used again after being marked stale — reactivate.
_u.set_state(name, _u.STATE_ACTIVE)
counts["reactivated"] += 1
return counts
# ---------------------------------------------------------------------------
# Review prompt for the forked agent
# ---------------------------------------------------------------------------
CURATOR_REVIEW_PROMPT = (
"You are running as Hermes' background skill CURATOR. This is an "
"UMBRELLA-BUILDING consolidation pass, not a passive audit and not a "
"duplicate-finder.\n\n"
"The goal of the skill collection is a LIBRARY OF CLASS-LEVEL "
"INSTRUCTIONS AND EXPERIENTIAL KNOWLEDGE. A collection of hundreds of "
"narrow skills where each one captures one session's specific bug is "
"a FAILURE of the library — not a feature. An agent searching skills "
"matches on descriptions, not on exact names; one broad umbrella "
"skill with labeled subsections beats five narrow siblings for "
"discoverability, not the other way around.\n\n"
"The right target shape is CLASS-LEVEL skills with rich SKILL.md "
"bodies + `references/`, `templates/`, and `scripts/` subfiles for "
"session-specific detail — not one-session-one-skill micro-entries.\n\n"
"Hard rules — do not violate:\n"
"1. DO NOT touch bundled or hub-installed skills. The candidate list "
"below is already filtered to agent-created skills only.\n"
"2. DO NOT delete any skill. Archiving (moving the skill's directory "
"into ~/.hermes/skills/.archive/) is the maximum destructive action. "
"Archives are recoverable; deletion is not.\n"
"3. DO NOT touch skills shown as pinned=yes. Skip them entirely.\n"
"4. DO NOT use usage counters as a reason to skip consolidation. The "
"counters are new and often mostly zero. Judge overlap on CONTENT, "
"not on use_count. 'use=0' is not evidence a skill is valuable; it's "
"absence of evidence either way.\n"
"5. DO NOT reject consolidation on the grounds that 'each skill has "
"a distinct trigger'. Pairwise distinctness is the wrong bar. The "
"right bar is: 'would a human maintainer write this as N separate "
"skills, or as one skill with N labeled subsections?' When the "
"answer is the latter, merge.\n\n"
"How to work — not optional:\n"
"1. Scan the full candidate list. Identify PREFIX CLUSTERS (skills "
"sharing a first word or domain keyword). Examples you are likely "
"to find: hermes-config-*, hermes-dashboard-*, gateway-*, codex-*, "
"ollama-*, anthropic-*, gemini-*, mcp-*, salvage-*, pr-*, "
"competitor-*, python-*, security-*, etc. Expect 10-25 clusters.\n"
"2. For each cluster with 2+ members, do NOT ask 'are these pairs "
"overlapping?' — ask 'what is the UMBRELLA CLASS these skills all "
"serve? Would a maintainer name that class and write one skill for "
"it?' If yes, pick (or create) the umbrella and absorb the siblings "
"into it.\n"
"3. Three ways to consolidate — use the right one per cluster:\n"
" a. MERGE INTO EXISTING UMBRELLA — one skill in the cluster is "
"already broad enough to be the umbrella (example: `pr-triage-"
"salvage` for the PR review cluster). Patch it to add a labeled "
"section for each sibling's unique insight, then archive the "
"siblings.\n"
" b. CREATE A NEW UMBRELLA SKILL.md — no existing member is broad "
"enough. Use skill_manage action=create to write a new class-level "
"skill whose SKILL.md covers the shared workflow and has short "
"labeled subsections. Archive the now-absorbed narrow siblings.\n"
" c. DEMOTE TO REFERENCES/TEMPLATES/SCRIPTS — a sibling has "
"narrow-but-valuable session-specific content. Move it into the "
"umbrella's appropriate support directory:\n"
" • `references/<topic>.md` for session-specific detail OR "
"condensed knowledge banks (quoted research, API docs excerpts, "
"domain notes, provider quirks, reproduction recipes)\n"
" • `templates/<name>.<ext>` for starter files meant to be "
"copied and modified\n"
" • `scripts/<name>.<ext>` for statically re-runnable actions "
"(verification scripts, fixture generators, probes)\n"
" Then archive the old sibling. Use `terminal` with `mkdir -p "
"~/.hermes/skills/<umbrella>/references/ && mv ... <umbrella>/"
"references/<topic>.md` (or templates/ / scripts/).\n"
"4. Also flag skills whose NAME is too narrow (contains a PR number, "
"a feature codename, a specific error string, an 'audit' / "
"'diagnosis' / 'salvage' session artifact). These almost always "
"belong as a subsection or support file under a class-level umbrella.\n"
"5. Iterate. After one consolidation round, scan the remaining set "
"and look for the NEXT umbrella opportunity. Don't stop after 3 "
"merges.\n\n"
"Your toolset:\n"
" - skills_list, skill_view — read the current landscape\n"
" - skill_manage action=patch — add sections to the umbrella\n"
" - skill_manage action=create — create a new umbrella SKILL.md\n"
" - skill_manage action=write_file — add a references/, templates/, "
"or scripts/ file under an existing skill (the skill must already "
"exist)\n"
" - terminal — mv a sibling into the archive "
"OR move its content into a support subfile\n\n"
"'keep' is a legitimate decision ONLY when the skill is already a "
"class-level umbrella and none of the proposed merges would improve "
"discoverability. 'This is narrow but distinct from its siblings' "
"is NOT a reason to keep — it's a reason to move it under an "
"umbrella as a subsection or support file.\n\n"
"Expected output: real umbrella-ification. Process every obvious "
"cluster. If you end the pass with fewer than 10 archives, you "
"stopped too early — go back and look at the clusters you left "
"alone.\n\n"
"When done, write a summary with: clusters processed, skills "
"patched/absorbed, skills demoted to references/templates/scripts, "
"skills archived, new umbrellas created, and clusters you "
"deliberately left alone with one line each."
)
# ---------------------------------------------------------------------------
# Orchestrator — spawn a forked AIAgent for the LLM review pass
# ---------------------------------------------------------------------------
def _render_candidate_list() -> str:
"""Human/agent-readable list of agent-created skills with usage stats."""
rows = skill_usage.agent_created_report()
if not rows:
return "No agent-created skills to review."
lines = [f"Agent-created skills ({len(rows)}):\n"]
for r in rows:
lines.append(
f"- {r['name']} "
f"state={r['state']} "
f"pinned={'yes' if r.get('pinned') else 'no'} "
f"use={r.get('use_count', 0)} "
f"view={r.get('view_count', 0)} "
f"patches={r.get('patch_count', 0)} "
f"last_used={r.get('last_used_at') or 'never'}"
)
return "\n".join(lines)
def run_curator_review(
on_summary: Optional[Callable[[str], None]] = None,
synchronous: bool = False,
) -> Dict[str, Any]:
"""Execute a single curator review pass.
Steps:
1. Apply automatic state transitions (pure, no LLM).
2. If there are agent-created skills, spawn a forked AIAgent that runs
the LLM review prompt against the current candidate list.
3. Update .curator_state with last_run_at and a one-line summary.
4. Invoke *on_summary* with a user-visible description.
If *synchronous* is True, the LLM review runs in the calling thread; the
default is to spawn a daemon thread so the caller returns immediately.
"""
start = datetime.now(timezone.utc)
counts = apply_automatic_transitions(now=start)
auto_summary_parts = []
if counts["marked_stale"]:
auto_summary_parts.append(f"{counts['marked_stale']} marked stale")
if counts["archived"]:
auto_summary_parts.append(f"{counts['archived']} archived")
if counts["reactivated"]:
auto_summary_parts.append(f"{counts['reactivated']} reactivated")
auto_summary = ", ".join(auto_summary_parts) if auto_summary_parts else "no changes"
# Persist state before the LLM pass so a crash mid-review still records
# the run and doesn't immediately re-trigger.
state = load_state()
state["last_run_at"] = start.isoformat()
state["run_count"] = int(state.get("run_count", 0)) + 1
state["last_run_summary"] = f"auto: {auto_summary}"
save_state(state)
def _llm_pass():
nonlocal auto_summary
try:
candidate_list = _render_candidate_list()
if "No agent-created skills" in candidate_list:
final_summary = f"auto: {auto_summary}; llm: skipped (no candidates)"
else:
prompt = f"{CURATOR_REVIEW_PROMPT}\n\n{candidate_list}"
llm_summary = _run_llm_review(prompt)
final_summary = f"auto: {auto_summary}; llm: {llm_summary}"
except Exception as e:
logger.debug("Curator LLM pass failed: %s", e, exc_info=True)
final_summary = f"auto: {auto_summary}; llm: error ({e})"
elapsed = (datetime.now(timezone.utc) - start).total_seconds()
state2 = load_state()
state2["last_run_duration_seconds"] = elapsed
state2["last_run_summary"] = final_summary
save_state(state2)
if on_summary:
try:
on_summary(f"curator: {final_summary}")
except Exception:
pass
if synchronous:
_llm_pass()
else:
t = threading.Thread(target=_llm_pass, daemon=True, name="curator-review")
t.start()
return {
"started_at": start.isoformat(),
"auto_transitions": counts,
"summary_so_far": auto_summary,
}
def _run_llm_review(prompt: str) -> str:
"""Spawn an AIAgent fork to run the curator review prompt. Returns a short
summary of what the model said in its final response."""
import contextlib
try:
from run_agent import AIAgent
except Exception as e:
return f"AIAgent import failed: {e}"
# Resolve provider + model the same way the CLI does, so the curator
# fork inherits the user's active main config rather than falling
# through to an empty provider/model pair (which sends HTTP 400
# "No models provided"). AIAgent() without explicit provider/model
# arguments hits an auto-resolution path that fails for OAuth-only
# providers and for pool-backed credentials.
_api_key = None
_base_url = None
_api_mode = None
_resolved_provider = None
_model_name = ""
try:
from hermes_cli.config import load_config
from hermes_cli.runtime_provider import resolve_runtime_provider
_cfg = load_config()
_m = _cfg.get("model", {}) if isinstance(_cfg.get("model"), dict) else {}
_provider = _m.get("provider") or "auto"
_model_name = _m.get("default") or _m.get("model") or ""
_rp = resolve_runtime_provider(
requested=_provider, target_model=_model_name
)
_api_key = _rp.get("api_key")
_base_url = _rp.get("base_url")
_api_mode = _rp.get("api_mode")
_resolved_provider = _rp.get("provider") or _provider
except Exception as e:
logger.debug("Curator provider resolution failed: %s", e, exc_info=True)
review_agent = None
try:
review_agent = AIAgent(
model=_model_name,
provider=_resolved_provider,
api_key=_api_key,
base_url=_base_url,
api_mode=_api_mode,
# Umbrella-building over a large skill collection is worth a
# high iteration ceiling — the pass typically takes 50-100
# API calls against hundreds of candidate skills. The
# single-session review path caps itself at a much smaller
# number because it's not doing a curation sweep.
max_iterations=9999,
quiet_mode=True,
platform="curator",
skip_context_files=True,
skip_memory=True,
)
# Disable recursive nudges — the curator must never spawn its own review.
review_agent._memory_nudge_interval = 0
review_agent._skill_nudge_interval = 0
# Redirect the forked agent's stdout/stderr to /dev/null while it
# runs so its tool-call chatter doesn't pollute the foreground
# terminal. The background-thread runner also hides it; this
# belt-and-suspenders path matters when a caller invokes
# run_curator_review(synchronous=True) from the CLI.
with open(os.devnull, "w") as _devnull, \
contextlib.redirect_stdout(_devnull), \
contextlib.redirect_stderr(_devnull):
result = review_agent.run_conversation(user_message=prompt)
final = ""
if isinstance(result, dict):
final = str(result.get("final_response") or "").strip()
return (final[:240] + "") if len(final) > 240 else (final or "no change")
except Exception as e:
return f"error: {e}"
finally:
if review_agent is not None:
try:
review_agent.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Public entrypoint for the session-start hook
# ---------------------------------------------------------------------------
def maybe_run_curator(
*,
idle_for_seconds: Optional[float] = None,
on_summary: Optional[Callable[[str], None]] = None,
) -> Optional[Dict[str, Any]]:
"""Best-effort: run a curator pass if all gates pass. Returns the result
dict if a pass was started, else None. Never raises."""
try:
if not should_run_now():
return None
# Idle gating: only enforce when the caller provided a measurement.
if idle_for_seconds is not None:
min_idle_s = get_min_idle_hours() * 3600.0
if idle_for_seconds < min_idle_s:
return None
return run_curator_review(on_summary=on_summary)
except Exception as e:
logger.debug("maybe_run_curator failed: %s", e, exc_info=True)
return None
+9 -1
View File
@@ -200,6 +200,9 @@ def get_external_skills_dirs() -> List[Path]:
if not isinstance(raw_dirs, list):
return []
from hermes_constants import get_hermes_home
hermes_home = get_hermes_home()
local_skills = get_skills_dir().resolve()
seen: Set[Path] = set()
result: List[Path] = []
@@ -210,7 +213,12 @@ def get_external_skills_dirs() -> List[Path]:
continue
# Expand ~ and environment variables
expanded = os.path.expanduser(os.path.expandvars(entry))
p = Path(expanded).resolve()
p = Path(expanded)
# Resolve relative paths against HERMES_HOME, not cwd
if not p.is_absolute():
p = (hermes_home / p).resolve()
else:
p = p.resolve()
if p == local_skills:
continue
if p in seen:
+91 -120
View File
@@ -80,6 +80,11 @@ _COMMAND_SPINNER_FRAMES = ("⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
from hermes_constants import get_hermes_home, display_hermes_home
from hermes_cli.browser_connect import (
DEFAULT_BROWSER_CDP_URL,
manual_chrome_debug_command,
try_launch_chrome_debug,
)
from hermes_cli.env_loader import load_hermes_dotenv
from utils import base_url_host_matches
@@ -240,65 +245,6 @@ def _parse_service_tier_config(raw: str) -> str | None:
logger.warning("Unknown service_tier '%s', ignoring", raw)
return None
def _get_chrome_debug_candidates(system: str) -> list[str]:
"""Return likely browser executables for local CDP auto-launch."""
candidates: list[str] = []
seen: set[str] = set()
def _add_candidate(path: str | None) -> None:
if not path:
return
normalized = os.path.normcase(os.path.normpath(path))
if normalized in seen:
return
if os.path.isfile(path):
candidates.append(path)
seen.add(normalized)
def _add_from_path(*names: str) -> None:
for name in names:
_add_candidate(shutil.which(name))
if system == "Darwin":
for app in (
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
"/Applications/Brave Browser.app/Contents/MacOS/Brave Browser",
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
):
_add_candidate(app)
elif system == "Windows":
_add_from_path(
"chrome.exe", "msedge.exe", "brave.exe", "chromium.exe",
"chrome", "msedge", "brave", "chromium",
)
for base in (
os.environ.get("ProgramFiles"),
os.environ.get("ProgramFiles(x86)"),
os.environ.get("LOCALAPPDATA"),
):
if not base:
continue
for parts in (
("Google", "Chrome", "Application", "chrome.exe"),
("Chromium", "Application", "chrome.exe"),
("Chromium", "Application", "chromium.exe"),
("BraveSoftware", "Brave-Browser", "Application", "brave.exe"),
("Microsoft", "Edge", "Application", "msedge.exe"),
):
_add_candidate(os.path.join(base, *parts))
else:
_add_from_path(
"google-chrome", "google-chrome-stable", "chromium-browser",
"chromium", "brave-browser", "microsoft-edge",
)
return candidates
def load_cli_config() -> Dict[str, Any]:
"""
Load CLI configuration from config files.
@@ -5979,7 +5925,29 @@ class HermesCLI:
print(f"(._.) Unknown cron command: {subcommand}")
print(" Available: list, add, edit, pause, resume, run, remove")
def _handle_curator_command(self, cmd: str):
"""Handle /curator slash command.
Delegates to hermes_cli.curator so the CLI and the `hermes curator`
subcommand share the same handler set.
"""
import shlex
tokens = shlex.split(cmd)[1:] if cmd else []
if not tokens:
tokens = ["status"]
try:
from hermes_cli.curator import cli_main
cli_main(tokens)
except SystemExit:
# argparse calls sys.exit() on --help or errors; swallow so we
# don't kill the interactive session.
pass
except Exception as exc:
print(f"(._.) curator: {exc}")
def _handle_skills_command(self, cmd: str):
"""Handle /skills slash command — delegates to hermes_cli.skills_hub."""
from hermes_cli.skills_hub import handle_skills_slash
@@ -6223,6 +6191,8 @@ class HermesCLI:
self.save_conversation()
elif canonical == "cron":
self._handle_cron_command(cmd_original)
elif canonical == "curator":
self._handle_curator_command(cmd_original)
elif canonical == "skills":
with self._busy_command(self._slow_command_status(cmd_original)):
self._handle_skills_command(cmd_original)
@@ -6606,34 +6576,7 @@ class HermesCLI:
Returns True if a launch command was executed (doesn't guarantee success).
"""
import subprocess as _sp
candidates = _get_chrome_debug_candidates(system)
if not candidates:
return False
# Dedicated profile dir so debug Chrome won't collide with normal Chrome
data_dir = str(_hermes_home / "chrome-debug")
os.makedirs(data_dir, exist_ok=True)
chrome = candidates[0]
try:
_sp.Popen(
[
chrome,
f"--remote-debugging-port={port}",
f"--user-data-dir={data_dir}",
"--no-first-run",
"--no-default-browser-check",
],
stdout=_sp.DEVNULL,
stderr=_sp.DEVNULL,
start_new_session=True, # detach from terminal
)
return True
except Exception:
return False
return try_launch_chrome_debug(port, system)
def _handle_browser_command(self, cmd: str):
"""Handle /browser connect|disconnect|status — manage live Chrome CDP connection."""
@@ -6642,13 +6585,44 @@ class HermesCLI:
parts = cmd.strip().split(None, 1)
sub = parts[1].lower().strip() if len(parts) > 1 else "status"
_DEFAULT_CDP = "http://127.0.0.1:9222"
_DEFAULT_CDP = DEFAULT_BROWSER_CDP_URL
current = os.environ.get("BROWSER_CDP_URL", "").strip()
if sub.startswith("connect"):
# Optionally accept a custom CDP URL: /browser connect ws://host:port
connect_parts = cmd.strip().split(None, 2) # ["/browser", "connect", "ws://..."]
cdp_url = connect_parts[2].strip() if len(connect_parts) > 2 else _DEFAULT_CDP
parsed_cdp = urlparse(cdp_url if "://" in cdp_url else f"http://{cdp_url}")
if parsed_cdp.scheme not in {"http", "https", "ws", "wss"}:
print()
print(
f" ⚠ Unsupported browser url scheme: {parsed_cdp.scheme or '(missing)'} "
"(expected one of: http, https, ws, wss)"
)
print()
return
try:
_port = parsed_cdp.port or (443 if parsed_cdp.scheme in {"https", "wss"} else 80)
except ValueError:
print()
print(f" ⚠ Invalid port in browser url: {cdp_url}")
print()
return
if not parsed_cdp.hostname:
print()
print(f" ⚠ Missing host in browser url: {cdp_url}")
print()
return
_host = parsed_cdp.hostname
if parsed_cdp.path.startswith("/devtools/browser/"):
cdp_url = parsed_cdp.geturl()
else:
cdp_url = parsed_cdp._replace(
path="",
params="",
query="",
fragment="",
).geturl()
# Clear any existing browser sessions so the next tool call uses the new backend
try:
@@ -6659,20 +6633,13 @@ class HermesCLI:
print()
# Extract port for connectivity checks
_port = 9222
try:
_port = int(cdp_url.rsplit(":", 1)[-1].split("/")[0])
except (ValueError, IndexError):
pass
# Check if Chrome is already listening on the debug port
import socket
_already_open = False
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect(("127.0.0.1", _port))
s.connect((_host, _port))
s.close()
_already_open = True
except (OSError, socket.timeout):
@@ -6690,7 +6657,7 @@ class HermesCLI:
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect(("127.0.0.1", _port))
s.connect((_host, _port))
s.close()
_already_open = True
break
@@ -6703,33 +6670,22 @@ class HermesCLI:
print(" Try again in a few seconds — the debug instance may still be starting")
else:
print(" ⚠ Could not auto-launch Chrome")
# Show manual instructions as fallback
_data_dir = str(_hermes_home / "chrome-debug")
sys_name = _plat.system()
if sys_name == "Darwin":
chrome_cmd = (
'open -a "Google Chrome" --args'
f" --remote-debugging-port=9222"
f' --user-data-dir="{_data_dir}"'
" --no-first-run --no-default-browser-check"
)
elif sys_name == "Windows":
chrome_cmd = (
f'chrome.exe --remote-debugging-port=9222'
f' --user-data-dir="{_data_dir}"'
f" --no-first-run --no-default-browser-check"
)
chrome_cmd = manual_chrome_debug_command(_port, sys_name)
if chrome_cmd:
print(f" Launch Chrome manually:")
print(f" {chrome_cmd}")
else:
chrome_cmd = (
f"google-chrome --remote-debugging-port=9222"
f' --user-data-dir="{_data_dir}"'
f" --no-first-run --no-default-browser-check"
)
print(f" Launch Chrome manually:")
print(f" {chrome_cmd}")
print(" No Chrome/Chromium executable found in this environment")
else:
print(f" ⚠ Port {_port} is not reachable at {cdp_url}")
if not _already_open:
print()
print("Browser not connected — start Chrome with remote debugging and retry /browser connect")
print()
return
os.environ["BROWSER_CDP_URL"] = cdp_url
# Eagerly start the CDP supervisor so pending_dialogs + frame_tree
# show up in the next browser_snapshot. No-op if already started.
@@ -9344,6 +9300,21 @@ class HermesCLI:
self._console_print(f"[dim {_tip_color}]✦ Tip: {_tip}[/]")
except Exception:
pass # Tips are non-critical — never break startup
# Curator — kick off a background skill-maintenance pass on startup
# if the schedule says we're due. Runs in a daemon thread so it
# never blocks the interactive loop. Best-effort; any failure is
# swallowed to avoid breaking session startup.
try:
from agent.curator import maybe_run_curator
maybe_run_curator(
idle_for_seconds=float("inf"), # CLI startup = fully idle
on_summary=lambda msg: self._console_print(
f"[dim #6b7684]💾 {msg}[/]"
),
)
except Exception:
pass
if self.preloaded_skills and not self._startup_skills_line_shown:
skills_label = ", ".join(self.preloaded_skills)
self._console_print(
+58
View File
@@ -286,6 +286,10 @@ if _config_path.exists():
# Only bridge explicit absolute paths from config.yaml.
if _cfg_key == "cwd" and str(_val) in (".", "auto", "cwd"):
continue
# Expand shell tilde in cwd so subprocess.Popen never
# receives a literal "~/" which the kernel rejects.
if _cfg_key == "cwd" and isinstance(_val, str):
_val = os.path.expanduser(_val)
if isinstance(_val, list):
os.environ[_env_var] = json.dumps(_val)
else:
@@ -2378,6 +2382,7 @@ class GatewayRunner:
# Discover and load event hooks
self.hooks.discover_and_load()
# Recover background processes from checkpoint (crash recovery)
try:
@@ -10221,6 +10226,20 @@ class GatewayRunner:
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
msg = progress_lines[-1] if progress_lines else base_msg
elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__":
# Content bubble just landed on the platform — close off
# the current tool-progress bubble so the next tool
# starts a fresh bubble below the content. Without this,
# tool lines keep editing the ORIGINAL progress message
# above the new content, making the chat appear out of
# order. Mirrors GatewayStreamConsumer.on_segment_break
# on the content side. (Issue: tool + content
# linearization regression after PR #7885.)
progress_msg_id = None
progress_lines = []
last_progress_msg[0] = None
repeat_count[0] = 0
continue
else:
msg = raw
progress_lines.append(msg)
@@ -10290,6 +10309,24 @@ class GatewayRunner:
_, base_msg, count = raw
if progress_lines:
progress_lines[-1] = f"{base_msg} (×{count + 1})"
elif isinstance(raw, tuple) and len(raw) >= 1 and raw[0] == "__reset__":
# Content-bubble marker during drain: close off
# the current progress bubble and start a fresh
# one for any tool lines that arrived after.
if can_edit and progress_lines and progress_msg_id:
_pending_text = "\n".join(progress_lines)
try:
await adapter.edit_message(
chat_id=source.chat_id,
message_id=progress_msg_id,
content=_pending_text,
)
except Exception:
pass
progress_msg_id = None
progress_lines = []
last_progress_msg[0] = None
repeat_count[0] = 0
else:
progress_lines.append(raw)
except Exception:
@@ -10495,6 +10532,11 @@ class GatewayRunner:
chat_id=source.chat_id,
config=_consumer_cfg,
metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None,
on_new_message=(
(lambda: progress_queue.put(("__reset__",)))
if progress_queue is not None
else None
),
)
if _want_stream_deltas:
def _stream_delta_cb(text: str) -> None:
@@ -11702,6 +11744,7 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
IMAGE_CACHE_EVERY = 60 # ticks — once per hour at default 60s interval
CHANNEL_DIR_EVERY = 5 # ticks — every 5 minutes
PASTE_SWEEP_EVERY = 60 # ticks — once per hour
CURATOR_EVERY = 60 # ticks — poll hourly (inner gate handles the real cadence)
logger.info("Cron ticker started (interval=%ds)", interval)
tick_count = 0
@@ -11753,6 +11796,21 @@ def _start_cron_ticker(stop_event: threading.Event, adapters=None, loop=None, in
except Exception as e:
logger.debug("Paste sweep error: %s", e)
# Curator — piggy-back on the existing cron ticker so long-running
# gateways get weekly skill maintenance without needing restarts.
# maybe_run_curator() is internally gated by config.interval_hours
# (7 days by default), so CURATOR_EVERY is just the poll rate — the
# real work only fires once per config interval.
if tick_count % CURATOR_EVERY == 0:
try:
from agent.curator import maybe_run_curator
maybe_run_curator(
idle_for_seconds=float("inf"),
on_summary=lambda msg: logger.info("curator: %s", msg),
)
except Exception as e:
logger.debug("Curator tick error: %s", e)
stop_event.wait(timeout=interval)
logger.info("Cron ticker stopped")
+35
View File
@@ -91,11 +91,20 @@ class GatewayStreamConsumer:
chat_id: str,
config: Optional[StreamConsumerConfig] = None,
metadata: Optional[dict] = None,
on_new_message: Optional[callable] = None,
):
self.adapter = adapter
self.chat_id = chat_id
self.cfg = config or StreamConsumerConfig()
self.metadata = metadata
# Fired whenever a fresh content bubble is created on the platform
# (first-send of a new message, commentary, overflow chunk, or
# fallback continuation). The gateway uses this to linearize the
# tool-progress bubble: when content resumes after a tool batch,
# the next tool.started should open a NEW progress bubble below
# the content, not edit the old bubble above it.
# Called with no arguments. Exceptions are swallowed.
self._on_new_message = on_new_message
self._queue: queue.Queue = queue.Queue()
self._accumulated = ""
self._message_id: Optional[str] = None
@@ -146,6 +155,16 @@ class GatewayStreamConsumer:
if text:
self._queue.put((_COMMENTARY, text))
def _notify_new_message(self) -> None:
"""Fire the on_new_message callback, swallowing any errors."""
cb = self._on_new_message
if cb is None:
return
try:
cb()
except Exception:
logger.debug("on_new_message callback error", exc_info=True)
def _reset_segment_state(self, *, preserve_no_edit: bool = False) -> None:
if preserve_no_edit and self._message_id == "__no_edit__":
return
@@ -529,6 +548,9 @@ class GatewayStreamConsumer:
self._message_id = str(result.message_id)
self._already_sent = True
self._last_sent_text = text
# Fresh content bubble — close off any stale tool bubble
# above so the next tool starts a new bubble below.
self._notify_new_message()
return str(result.message_id)
else:
self._edit_supported = False
@@ -661,6 +683,9 @@ class GatewayStreamConsumer:
sent_any_chunk = True
last_successful_chunk = chunk
last_message_id = result.message_id or last_message_id
# Each fallback chunk is a fresh platform message — notify
# so any stale tool-progress bubble gets closed off.
self._notify_new_message()
self._message_id = last_message_id
self._already_sent = True
@@ -744,6 +769,11 @@ class GatewayStreamConsumer:
# tool..."), not the final response. Setting already_sent would cause
# the final response to be incorrectly suppressed when there are
# multiple tool calls. See: https://github.com/NousResearch/hermes-agent/issues/10454
if result.success:
# Commentary counts as fresh content — close off any
# stale tool bubble above it so the next tool starts a
# new bubble below.
self._notify_new_message()
return result.success
except Exception as e:
logger.error("Commentary send error: %s", e)
@@ -973,6 +1003,11 @@ class GatewayStreamConsumer:
# every delta/tool boundary when platforms accept a
# message but do not return an editable message id.
self._message_id = "__no_edit__"
# Notify the gateway that a fresh content bubble was
# created so any accumulated tool-progress bubble above
# gets closed off — the next tool fires into a new
# bubble below, preserving chronological order.
self._notify_new_message()
return True
else:
# Initial send failed — disable streaming for this session
+138
View File
@@ -0,0 +1,138 @@
"""Shared helpers for attaching Hermes to a local Chrome CDP port."""
from __future__ import annotations
import os
import platform
import shlex
import shutil
import subprocess
from hermes_constants import get_hermes_home
DEFAULT_BROWSER_CDP_PORT = 9222
DEFAULT_BROWSER_CDP_URL = f"http://127.0.0.1:{DEFAULT_BROWSER_CDP_PORT}"
_DARWIN_APPS = (
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
"/Applications/Brave Browser.app/Contents/MacOS/Brave Browser",
"/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge",
)
_WINDOWS_INSTALL_PARTS = (
("Google", "Chrome", "Application", "chrome.exe"),
("Chromium", "Application", "chrome.exe"),
("Chromium", "Application", "chromium.exe"),
("BraveSoftware", "Brave-Browser", "Application", "brave.exe"),
("Microsoft", "Edge", "Application", "msedge.exe"),
)
_LINUX_BIN_NAMES = (
"google-chrome", "google-chrome-stable", "chromium-browser",
"chromium", "brave-browser", "microsoft-edge",
)
_WINDOWS_BIN_NAMES = (
"chrome.exe", "msedge.exe", "brave.exe", "chromium.exe",
"chrome", "msedge", "brave", "chromium",
)
def get_chrome_debug_candidates(system: str) -> list[str]:
candidates: list[str] = []
seen: set[str] = set()
def add(path: str | None) -> None:
if not path:
return
normalized = os.path.normcase(os.path.normpath(path))
if normalized in seen or not os.path.isfile(path):
return
candidates.append(path)
seen.add(normalized)
def add_install_paths(bases: tuple[str | None, ...]) -> None:
for base in filter(None, bases):
for parts in _WINDOWS_INSTALL_PARTS:
add(os.path.join(base, *parts))
if system == "Darwin":
for app in _DARWIN_APPS:
add(app)
return candidates
if system == "Windows":
for name in _WINDOWS_BIN_NAMES:
add(shutil.which(name))
add_install_paths((
os.environ.get("ProgramFiles"),
os.environ.get("ProgramFiles(x86)"),
os.environ.get("LOCALAPPDATA"),
))
return candidates
for name in _LINUX_BIN_NAMES:
add(shutil.which(name))
add_install_paths(("/mnt/c/Program Files", "/mnt/c/Program Files (x86)"))
return candidates
def chrome_debug_data_dir() -> str:
return str(get_hermes_home() / "chrome-debug")
def _chrome_debug_args(port: int) -> list[str]:
return [
f"--remote-debugging-port={port}",
f"--user-data-dir={chrome_debug_data_dir()}",
"--no-first-run",
"--no-default-browser-check",
]
def manual_chrome_debug_command(port: int = DEFAULT_BROWSER_CDP_PORT, system: str | None = None) -> str | None:
system = system or platform.system()
candidates = get_chrome_debug_candidates(system)
if candidates:
argv = [candidates[0], *_chrome_debug_args(port)]
return subprocess.list2cmdline(argv) if system == "Windows" else shlex.join(argv)
if system == "Darwin":
data_dir = chrome_debug_data_dir()
return (
f'open -a "Google Chrome" --args --remote-debugging-port={port} '
f'--user-data-dir="{data_dir}" --no-first-run --no-default-browser-check'
)
return None
def _detach_kwargs(system: str) -> dict:
if system != "Windows":
return {"start_new_session": True}
flags = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(
subprocess, "CREATE_NEW_PROCESS_GROUP", 0
)
return {"creationflags": flags} if flags else {}
def try_launch_chrome_debug(port: int = DEFAULT_BROWSER_CDP_PORT, system: str | None = None) -> bool:
system = system or platform.system()
candidates = get_chrome_debug_candidates(system)
if not candidates:
return False
os.makedirs(chrome_debug_data_dir(), exist_ok=True)
try:
subprocess.Popen(
[candidates[0], *_chrome_debug_args(port)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
**_detach_kwargs(system),
)
return True
except Exception:
return False
+3
View File
@@ -148,6 +148,9 @@ COMMAND_REGISTRY: list[CommandDef] = [
CommandDef("cron", "Manage scheduled tasks", "Tools & Skills",
cli_only=True, args_hint="[subcommand]",
subcommands=("list", "add", "create", "edit", "pause", "resume", "run", "remove")),
CommandDef("curator", "Background skill maintenance (status, run, pin, archive)",
"Tools & Skills", args_hint="[subcommand]",
subcommands=("status", "run", "pause", "resume", "pin", "unpin", "restore")),
CommandDef("reload", "Reload .env variables into the running session", "Tools & Skills",
cli_only=True),
CommandDef("reload-mcp", "Reload MCP servers from config", "Tools & Skills",
+43 -5
View File
@@ -915,6 +915,35 @@ DEFAULT_CONFIG = {
"guard_agent_created": False,
},
# Curator — background skill maintenance.
#
# Periodically reviews AGENT-CREATED skills (never bundled or
# hub-installed) and keeps the collection tidy: marks long-unused skills
# as stale, archives genuinely obsolete ones (archive only, never
# deletes), and spawns a forked aux-model agent to consolidate overlaps
# and patch drift. Runs inactivity-triggered from session start — no
# cron daemon.
#
# See `hermes curator status` for the last run summary.
"curator": {
"enabled": True,
# How long to wait between curator runs (hours). Default: 7 days.
"interval_hours": 24 * 7,
# Only run when the agent has been idle at least this long (hours).
"min_idle_hours": 2,
# Mark a skill as "stale" after this many days without use.
"stale_after_days": 30,
# Archive a skill (move to skills/.archive/) after this many days
# without use. Archived skills are recoverable — no auto-deletion.
"archive_after_days": 90,
# Optional per-task override for the curator's aux model. Leave null
# to use Hermes' main auxiliary client resolution.
"auxiliary": {
"provider": None,
"model": None,
},
},
# Honcho AI-native memory -- reads ~/.honcho/config.json as single source of truth.
# This section is only needed for hermes-specific overrides; everything else
# (apiKey, workspace, peerName, sessions, enabled) comes from the global config.
@@ -3710,18 +3739,27 @@ def _sanitize_env_lines(lines: list) -> list:
# Detect concatenated KEY=VALUE pairs on one line.
# Search for known KEY= patterns at any position in the line.
split_positions = []
# We collect full needle ranges so we can drop matches that are
# fully contained within a longer overlapping needle. Without this,
# suffix collisions corrupt the file: e.g. LM_API_KEY= inside
# GLM_API_KEY= would otherwise split the line into "G\nLM_API_KEY=...".
match_ranges: list[tuple[int, int]] = []
for key_name in known_keys:
needle = key_name + "="
idx = stripped.find(needle)
while idx >= 0:
split_positions.append(idx)
match_ranges.append((idx, idx + len(needle)))
idx = stripped.find(needle, idx + len(needle))
split_positions = sorted({
s for s, e in match_ranges
if not any(
s2 <= s and e2 >= e and (s2, e2) != (s, e)
for s2, e2 in match_ranges
)
})
if len(split_positions) > 1:
split_positions.sort()
# Deduplicate (shouldn't happen, but be safe)
split_positions = sorted(set(split_positions))
for i, pos in enumerate(split_positions):
end = split_positions[i + 1] if i + 1 < len(split_positions) else len(stripped)
part = stripped[pos:end].strip()
+232
View File
@@ -0,0 +1,232 @@
"""CLI subcommand: `hermes curator <subcommand>`.
Thin shell around agent/curator.py and tools/skill_usage.py. Renders a status
table, triggers a run, pauses/resumes, and pins/unpins skills.
This module intentionally has no side effects at import time — main.py wires
the argparse subparsers on demand.
"""
from __future__ import annotations
import argparse
import sys
from datetime import datetime, timezone
from typing import Optional
def _fmt_ts(ts: Optional[str]) -> str:
if not ts:
return "never"
try:
dt = datetime.fromisoformat(ts)
except (TypeError, ValueError):
return str(ts)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = datetime.now(timezone.utc) - dt
secs = int(delta.total_seconds())
if secs < 60:
return f"{secs}s ago"
if secs < 3600:
return f"{secs // 60}m ago"
if secs < 86400:
return f"{secs // 3600}h ago"
return f"{secs // 86400}d ago"
def _cmd_status(args) -> int:
from agent import curator
from tools import skill_usage
state = curator.load_state()
enabled = curator.is_enabled()
paused = state.get("paused", False)
last_run = state.get("last_run_at")
summary = state.get("last_run_summary") or "(none)"
runs = state.get("run_count", 0)
status_line = (
"ENABLED" if enabled and not paused else
"PAUSED" if paused else
"DISABLED"
)
print(f"curator: {status_line}")
print(f" runs: {runs}")
print(f" last run: {_fmt_ts(last_run)}")
print(f" last summary: {summary}")
_ih = curator.get_interval_hours()
_interval_label = (
f"{_ih // 24}d" if _ih % 24 == 0 and _ih >= 24
else f"{_ih}h"
)
print(f" interval: every {_interval_label}")
print(f" stale after: {curator.get_stale_after_days()}d unused")
print(f" archive after: {curator.get_archive_after_days()}d unused")
rows = skill_usage.agent_created_report()
if not rows:
print("\nno agent-created skills")
return 0
by_state = {"active": [], "stale": [], "archived": []}
pinned = []
for r in rows:
state_name = r.get("state", "active")
by_state.setdefault(state_name, []).append(r)
if r.get("pinned"):
pinned.append(r["name"])
print(f"\nagent-created skills: {len(rows)} total")
for state_name in ("active", "stale", "archived"):
bucket = by_state.get(state_name, [])
print(f" {state_name:10s} {len(bucket)}")
if pinned:
print(f"\npinned ({len(pinned)}): {', '.join(pinned)}")
# Show top 5 least-recently-used active skills
active = sorted(
by_state.get("active", []),
key=lambda r: r.get("last_used_at") or r.get("created_at") or "",
)[:5]
if active:
print("\nleast recently used (top 5):")
for r in active:
last = _fmt_ts(r.get("last_used_at"))
print(f" {r['name']:40s} use={r.get('use_count', 0):3d} last_used={last}")
return 0
def _cmd_run(args) -> int:
from agent import curator
if not curator.is_enabled():
print("curator: disabled via config; enable with `curator.enabled: true`")
return 1
print("curator: running review pass...")
def _on_summary(msg: str) -> None:
print(msg)
result = curator.run_curator_review(
on_summary=_on_summary,
synchronous=bool(args.synchronous),
)
auto = result.get("auto_transitions", {})
if auto:
print(
f"auto: checked={auto.get('checked', 0)} "
f"stale={auto.get('marked_stale', 0)} "
f"archived={auto.get('archived', 0)} "
f"reactivated={auto.get('reactivated', 0)}"
)
if not args.synchronous:
print("llm pass running in background — check `hermes curator status` later")
return 0
def _cmd_pause(args) -> int:
from agent import curator
curator.set_paused(True)
print("curator: paused")
return 0
def _cmd_resume(args) -> int:
from agent import curator
curator.set_paused(False)
print("curator: resumed")
return 0
def _cmd_pin(args) -> int:
from tools import skill_usage
if not skill_usage.is_agent_created(args.skill):
print(
f"curator: '{args.skill}' is bundled or hub-installed — cannot pin "
"(only agent-created skills participate in curation)"
)
return 1
skill_usage.set_pinned(args.skill, True)
print(f"curator: pinned '{args.skill}' (will bypass auto-transitions)")
return 0
def _cmd_unpin(args) -> int:
from tools import skill_usage
if not skill_usage.is_agent_created(args.skill):
print(
f"curator: '{args.skill}' is bundled or hub-installed — "
"there's nothing to unpin (curator only tracks agent-created skills)"
)
return 1
skill_usage.set_pinned(args.skill, False)
print(f"curator: unpinned '{args.skill}'")
return 0
def _cmd_restore(args) -> int:
from tools import skill_usage
ok, msg = skill_usage.restore_skill(args.skill)
print(f"curator: {msg}")
return 0 if ok else 1
# ---------------------------------------------------------------------------
# argparse wiring (called from hermes_cli.main)
# ---------------------------------------------------------------------------
def register_cli(parent: argparse.ArgumentParser) -> None:
"""Attach `curator` subcommands to *parent*.
main.py calls this with the ArgumentParser returned by
``subparsers.add_parser("curator", ...)``.
"""
parent.set_defaults(func=lambda a: (parent.print_help(), 0)[1])
subs = parent.add_subparsers(dest="curator_command")
p_status = subs.add_parser("status", help="Show curator status and skill stats")
p_status.set_defaults(func=_cmd_status)
p_run = subs.add_parser("run", help="Trigger a curator review now")
p_run.add_argument(
"--sync", "--synchronous", dest="synchronous", action="store_true",
help="Wait for the LLM review pass to finish (default: background thread)",
)
p_run.set_defaults(func=_cmd_run)
p_pause = subs.add_parser("pause", help="Pause the curator until resumed")
p_pause.set_defaults(func=_cmd_pause)
p_resume = subs.add_parser("resume", help="Resume a paused curator")
p_resume.set_defaults(func=_cmd_resume)
p_pin = subs.add_parser("pin", help="Pin a skill so the curator never auto-transitions it")
p_pin.add_argument("skill", help="Skill name")
p_pin.set_defaults(func=_cmd_pin)
p_unpin = subs.add_parser("unpin", help="Unpin a skill")
p_unpin.add_argument("skill", help="Skill name")
p_unpin.set_defaults(func=_cmd_unpin)
p_restore = subs.add_parser("restore", help="Restore an archived skill")
p_restore.add_argument("skill", help="Skill name")
p_restore.set_defaults(func=_cmd_restore)
def cli_main(argv=None) -> int:
"""Standalone entry (also usable by hermes_cli.main fallthrough)."""
parser = argparse.ArgumentParser(prog="hermes curator")
register_cli(parser)
args = parser.parse_args(argv)
fn = getattr(args, "func", None)
if fn is None:
parser.print_help()
return 0
return int(fn(args) or 0)
if __name__ == "__main__": # pragma: no cover
sys.exit(cli_main())
+20
View File
@@ -9230,6 +9230,26 @@ Examples:
except Exception as _exc:
logging.getLogger(__name__).debug("Plugin CLI discovery failed: %s", _exc)
# =========================================================================
# curator command — background skill maintenance
# =========================================================================
curator_parser = subparsers.add_parser(
"curator",
help="Background skill maintenance (curator) — status, run, pause, pin",
description=(
"The curator is an auxiliary-model background task that "
"periodically reviews agent-created skills, prunes stale ones, "
"consolidates overlaps, and archives obsolete skills. "
"Bundled and hub-installed skills are never touched. "
"Archives are recoverable; auto-deletion never happens."
),
)
try:
from hermes_cli.curator import register_cli as _register_curator_cli
_register_curator_cli(curator_parser)
except Exception as _exc:
logging.getLogger(__name__).debug("curator CLI wiring failed: %s", _exc)
# =========================================================================
# memory command
# =========================================================================
+346
View File
@@ -0,0 +1,346 @@
---
name: comfyui
description: "Use when generating images/video/audio with ComfyUI — import workflows, run them with friendly parameters, manage models and dependencies. Uses the comfyui-skill CLI over the REST API."
version: 3.0.0
requires: ComfyUI running locally or via Comfy Cloud; comfyui-skill CLI (auto-installed via uvx)
author: kshitijk4poor
license: MIT
platforms: [macos, linux, windows]
prerequisites:
commands: ["uv"]
setup:
help: "CLI auto-runs via uvx. ComfyUI install: https://docs.comfy.org/installation"
metadata:
hermes:
tags:
[
comfyui,
image-generation,
stable-diffusion,
flux,
creative,
generative-ai,
video-generation,
]
related_skills: [stable-diffusion-image-generation, image_gen]
category: creative
---
# ComfyUI
Generate images, video, and audio through ComfyUI using the `comfyui-skill` CLI.
The CLI wraps ComfyUI's REST API into an agent-friendly interface — workflows become
"skills" with named parameters (e.g., `prompt`, `seed`) instead of raw node graphs.
**Reference files in this skill:**
- `references/cli-reference.md` — complete command reference with all subcommands and options
- `references/api-notes.md` — underlying REST API routes (for debugging / advanced use)
- `scripts/comfyui_setup.sh` — workspace initialization script
## When to Use
- User asks to generate images with Stable Diffusion, SDXL, Flux, or other diffusion models
- User wants to run a specific ComfyUI workflow
- User wants to chain generative steps (txt2img → upscale → face restore)
- User needs ControlNet, inpainting, img2img, or other advanced pipelines
- User asks to manage ComfyUI queue, check models, or install custom nodes
- User wants video/audio generation via AnimateDiff, Hunyuan, AudioCraft, etc.
## How It Works
The `comfyui-skill` CLI turns ComfyUI workflows into callable "skills":
1. **Import** a workflow JSON (from editor or API format) → CLI extracts a parameter schema
2. **Run** with friendly args (`--args '{"prompt": "a cat"}'`) → CLI injects values into the right nodes
3. **Retrieve** outputs → CLI downloads generated files locally
The agent never sees raw node IDs or graph wiring. The CLI handles:
- Editor-format → API-format conversion (resolves reroutes, widget ordering via `/object_info`)
- Auto-upload of local images referenced in args
- Dependency checking (missing custom nodes, models)
- WebSocket streaming with polling fallback
- Multi-server routing
- Idempotent execution via `--job-id`
## CLI Invocation
The CLI is invoked via `uvx` (no persistent install needed):
```bash
uvx --from comfyui-skill-cli comfyui-skill [OPTIONS] COMMAND [ARGS]
```
For brevity in all examples below, we alias this:
```bash
# In execute_code / terminal, always use the full uvx form:
COMFY="uvx --from comfyui-skill-cli comfyui-skill"
```
**Always pass `--json` for structured output** the agent can parse:
```bash
$COMFY --json list
$COMFY --json run my-workflow --args '{"prompt": "a cat"}'
```
If `comfyui-skill` is already installed as a `uv tool` (`uv tool install comfyui-skill-cli`),
it's on PATH directly and `uvx` is not needed.
## Setup & Onboarding
### 1. ComfyUI Must Be Running
The CLI talks to a running ComfyUI server. If the user doesn't have one:
- Point them to https://docs.comfy.org/installation. If they ask for help in onboarding, read the docs and help them set things up.
- Supports: NVIDIA (CUDA), AMD (ROCm), Intel Arc, Apple Silicon (MPS), CPU-only
- Desktop app available for Windows/macOS; manual install for Linux
- Comfy Cloud available for users without a GPU (https://platform.comfy.org)
### 2. Initialize a Workspace
The CLI reads `config.json` and `data/` from its working directory. Run the
setup script or initialize manually:
```bash
bash scripts/comfyui_setup.sh
```
Or manually:
```bash
mkdir -p ~/.hermes/comfyui && cd ~/.hermes/comfyui
```
Then add a server:
```bash
$COMFY --json server add --id local --url http://127.0.0.1:8188 --name "Local ComfyUI"
```
For Comfy Cloud:
```bash
$COMFY --json server add --id cloud --url https://cloud.comfy.org \
--name "Comfy Cloud" --api-key "comfyui-xxxxxxxxxxxx"
```
### 3. Verify Connection
```bash
$COMFY --json server status
```
Should return `{"status": "online", ...}`. If offline, user needs to start ComfyUI.
### 4. Import a Workflow
Users typically have workflow JSON files from the ComfyUI editor:
```bash
$COMFY --json workflow import /path/to/workflow.json --name my-workflow
```
The CLI auto-detects format (editor or API), converts if needed, and extracts
a parameter schema. Both formats are accepted.
To import from the ComfyUI server's saved workflows:
```bash
$COMFY --json workflow import --from-server
```
## Core Workflow
### Step 1: List Available Skills
```bash
$COMFY --json list
```
Returns all imported workflows with their parameter schemas. Required params
must be provided; optional params have sensible defaults.
### Step 2: Check Dependencies (First Run)
```bash
$COMFY --json deps check my-workflow
```
Reports missing custom nodes and models. If `is_ready` is false:
```bash
# Install missing nodes (requires ComfyUI Manager)
$COMFY --json deps install my-workflow --all
# Missing models must be downloaded manually — CLI tells you which folder
```
### Step 3: Execute
**Blocking (recommended for most use):**
```bash
$COMFY --json run my-workflow --args '{"prompt": "a beautiful sunset", "seed": 42}'
```
Blocks until done, streams progress, downloads outputs.
**Non-blocking (for long jobs):**
```bash
# Submit
$COMFY --json submit my-workflow --args '{"prompt": "..."}'
# Returns: {"prompt_id": "abc-123"}
# Poll (each poll = separate command, do NOT loop in shell)
$COMFY --json status abc-123
# Returns: {"status": "running", "progress": {"value": 15, "max": 25}}
# When status = "success", outputs are in the response
```
### Step 4: Present Results
On success, the response contains output file paths. Show them to the user.
Images referenced in the output can be displayed via `vision_analyze` or
returned as file paths.
## Quick Decision Tree
| User says | Command |
| --------------------------------- | ---------------------------------------------- |
| "generate an image" / "draw" | `run <skill> --args '{"prompt": "..."}'` |
| "import this workflow" | `workflow import <path>` |
| "use this image" (img2img) | `upload <image>` then `run` with the reference |
| "inpaint this" | `upload <mask> --mask` then `run` |
| "what workflows do I have" | `list` |
| "what models are available" | `models list checkpoints` |
| "check if everything's installed" | `deps check <skill>` |
| "what failed" / "show history" | `history list <skill>` |
| "cancel that" | `cancel <prompt_id>` |
| "free up GPU memory" | `free` |
| "which nodes exist for X" | `nodes search <query>` |
## Multi-Server
Skills are addressed as `server_id/workflow_id`:
```bash
$COMFY --json list # all servers
$COMFY --json run local/txt2img --args '{...}' # specific server
$COMFY --json run cloud/flux --args '{...}' # different server
$COMFY --json server stats --all # VRAM/RAM across all servers
```
If `server_id` is omitted, the default server is used.
## Image Upload (img2img / Inpainting)
```bash
# Upload input image
$COMFY --json upload /path/to/photo.png
# Returns: {"filename": "photo.png", ...}
# Upload mask for inpainting
$COMFY --json upload /path/to/mask.png --mask --original photo.png
# Use in workflow args — if a param has type "image" and value is a local
# file path (starts with /, ./, ../, ~), the CLI auto-uploads it
$COMFY --json run inpaint --args '{"image": "/path/to/photo.png", "mask": "/path/to/mask.png", "prompt": "fill with flowers"}'
```
## Model Discovery
```bash
$COMFY --json models list # all folder types
$COMFY --json models list checkpoints # checkpoint files
$COMFY --json models list loras # LoRA files
$COMFY --json models list controlnet # ControlNet models
```
Model folders: `checkpoints`, `loras`, `vae`, `controlnet`, `clip`, `clip_vision`,
`upscale_models`, `embeddings`, `unet`, `diffusion_models`.
## Node Discovery
```bash
$COMFY --json nodes list # all nodes, grouped by category
$COMFY --json nodes list -c sampling # filter by category
$COMFY --json nodes info KSampler # full details of one node
$COMFY --json nodes search "upscale" # fuzzy search
```
## Queue & System
```bash
$COMFY --json queue list # running + pending jobs
$COMFY --json queue clear # clear pending
$COMFY --json cancel <prompt_id> # cancel specific job
$COMFY --json free # unload models + free VRAM
$COMFY --json server stats # system info (VRAM, RAM, GPU)
```
## Workflow Management
```bash
$COMFY --json workflow import <path> --name <id> # import from file
$COMFY --json workflow import --from-server # import from ComfyUI server
$COMFY --json workflow enable <skill_id> # enable
$COMFY --json workflow disable <skill_id> # disable
$COMFY --json workflow delete <skill_id> # delete
$COMFY --json info <skill_id> # show schema + details
```
## Idempotent Execution
For retries that shouldn't burn extra GPU:
```bash
$COMFY --json run my-workflow --args '{"prompt": "..."}' --job-id "unique-key-123"
```
If `unique-key-123` was already executed, returns the cached result instantly.
## Pitfalls
1. **Working directory matters** — The CLI reads `config.json` and `data/` from CWD.
Always `cd` to the workspace directory before running commands. If `list` returns
empty or `server status` fails, you're in the wrong directory.
2. **Editor format needs a live server** — Importing editor-format workflows requires
a running ComfyUI instance (calls `/object_info` to resolve widget ordering).
API-format imports work offline.
3. **Missing custom nodes** — Always `deps check` before first run of an imported
workflow. "class_type not found" means missing nodes.
4. **JSON args quoting** — Wrap `--args` in single quotes to prevent bash from
eating the double quotes: `--args '{"prompt": "a cat"}'`.
5. **Comfy Cloud differences** — Cloud uses `/api/` prefix and `X-API-Key` auth.
The CLI handles this transparently when configured with `--api-key`.
6. **Model names are exact** — Case-sensitive, includes extension. Use
`models list checkpoints` to discover installed models.
7. **Long generations** — Video and high-step workflows can take minutes. The `run`
command blocks and streams progress. For very long jobs, use `submit` + `status`.
8. **Concurrent limits (Cloud)** — Free/Standard: 1 job. Creator: 3. Pro: 5.
Extra submits queue automatically.
9. **Config portability** — Use `config export` / `config import` to transfer
setups between machines.
## Verification Checklist
- [ ] `uv` or `uvx` available on PATH
- [ ] `comfyui-skill --json server status` returns online
- [ ] Workspace dir has `config.json` and `data/`
- [ ] At least one workflow imported (`list` returns non-empty)
- [ ] `deps check` passes for imported workflows
- [ ] Test run completes and outputs are saved
@@ -0,0 +1,103 @@
# ComfyUI REST API Notes
The `comfyui-skill` CLI wraps these endpoints. This reference is for debugging,
understanding errors, or advanced use when the CLI doesn't cover a specific need.
## Endpoints the CLI Uses
| Endpoint | Method | CLI Command |
|----------|--------|-------------|
| `/system_stats` | GET | `server status`, `server stats` |
| `/prompt` | POST | `run`, `submit` |
| `/history/{prompt_id}` | GET | `status`, `run` (polling) |
| `/history` | GET | `history list --server` |
| `/queue` | GET | `queue list` |
| `/queue` | POST | `queue clear`, `queue delete` |
| `/interrupt` | POST | `cancel` |
| `/free` | POST | `free` |
| `/object_info` | GET | `nodes list`, `workflow import` (schema extraction) |
| `/object_info/{class}` | GET | `nodes info` |
| `/models` | GET | `models list` |
| `/models/{folder}` | GET | `models list <folder>`, `deps check` |
| `/view` | GET | `run` (output download) |
| `/upload/image` | POST | `upload` |
| `/upload/mask` | POST | `upload --mask` |
| `/node_replacements` | GET | `workflow import` (deprecated node detection) |
| `/internal/logs/raw` | GET | `logs show` |
| `/workflow_templates` | GET | `templates list` |
| `/global_subgraphs` | GET | `templates subgraphs` |
| `/v2/userdata` | GET | `workflow import --from-server` |
| `/ws` | WebSocket | `run` (real-time progress) |
### Cloud-specific
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/api/jobs` | GET | Job listing with filtering |
| `/api/jobs/{id}` | GET | Job details |
### ComfyUI Manager (optional plugin)
| Endpoint | Method | CLI Command |
|----------|--------|-------------|
| `/manager/queue/start` | GET | `deps install` |
| `/manager/queue/install` | POST | `deps install` (custom nodes) |
| `/manager/queue/install_model` | POST | `deps install --models` |
| `/manager/queue/status` | GET | `deps install` (progress) |
## Local vs Cloud Differences
| | Local | Cloud |
|---|---|---|
| Base URL | `http://127.0.0.1:8188` | `https://cloud.comfy.org` |
| Route prefix | none | `/api` |
| Auth | none or bearer token | `X-API-Key` header |
| Job status | Poll `/history/{id}` | `/api/jobs/{id}` |
| Output download | Direct bytes from `/view` | 302 redirect → signed URL |
| WebSocket | `ws://host:port/ws?clientId={uuid}` | `wss://host/ws?clientId={uuid}&token={key}` |
| Concurrent jobs | Sequential | Tier-limited (Free: 1, Creator: 3, Pro: 5) |
The CLI handles all of these differences transparently based on the server config.
## Workflow JSON Format (API Format)
```json
{
"node_id_string": {
"class_type": "NodeClassName",
"inputs": {
"param_name": "value",
"linked_input": ["source_node_id", output_index]
}
}
}
```
- Node IDs are strings (`"3"`, not `3`)
- Links: `["node_id", output_index]` — 0-based int
- `class_type` must match exactly (case-sensitive)
## POST /prompt Payload
```json
{
"prompt": { "<workflow>" },
"client_id": "uuid",
"extra_data": {
"api_key_comfy_org": "key-for-paid-api-nodes"
}
}
```
The CLI constructs this from the imported workflow + injected parameters.
## WebSocket Message Types
| Type | When | Key Fields |
|------|------|------------|
| `execution_start` | Prompt begins | `prompt_id` |
| `executing` | Node running (`null` = done) | `node`, `prompt_id` |
| `progress` | Sampling steps | `node`, `value`, `max` |
| `executed` | Node output ready | `node`, `output` |
| `execution_success` | All nodes done | `prompt_id` |
| `execution_error` | Failure | `exception_type`, `exception_message` |
@@ -0,0 +1,172 @@
# comfyui-skill CLI Reference
Complete command map for `comfyui-skill` v0.2.x.
**Invocation:** `uvx --from comfyui-skill-cli comfyui-skill [OPTIONS] COMMAND [ARGS]`
Or if installed as a tool: `comfyui-skill [OPTIONS] COMMAND [ARGS]`
## Global Options
| Option | Short | Description |
|--------|-------|-------------|
| `--version` | `-V` | Show version |
| `--json` | `-j` | JSON output (always use this for agent parsing) |
| `--output-format` | | `text`, `json`, or `stream-json` (NDJSON events) |
| `--server` | `-s` | Server ID override |
| `--dir` | `-d` | Data directory (default: CWD) |
| `--verbose` | `-v` | Verbose output |
| `--no-update-check` | | Skip CLI update check |
## Standalone Commands
### `list`
List all available skills across all enabled servers.
### `info <SKILL_ID>`
Show skill details and parameter schema. Skill ID format: `server_id/workflow_id` or `workflow_id`.
### `run <SKILL_ID> [OPTIONS]`
Execute a skill (blocking — waits for completion, streams progress).
| Option | Short | Description |
|--------|-------|-------------|
| `--args` | `-a` | JSON parameters (default: `{}`) |
| `--only` | | Comma-separated node IDs for partial execution |
| `--priority` | `-p` | Queue priority (lower = first, negative = jump queue; default: 0) |
| `--validate` | | Validate workflow without executing (dry run) |
| `--job-id` | | Idempotency key — reuse cached result if already executed |
### `submit <SKILL_ID> [OPTIONS]`
Submit a skill (non-blocking — returns `prompt_id` immediately). Same options as `run` except no streaming.
### `status <PROMPT_ID>`
Check execution status. Returns: `queued` (with `position`), `running` (with `progress`), `success` (with `outputs`), or `error`.
### `upload [FILE_PATH] [OPTIONS]`
Upload a file to ComfyUI for use in workflows.
| Option | Description |
|--------|-------------|
| `--from-output` | Reuse output from a previous prompt_id as input |
| `--mask` | Upload as mask (for inpainting) |
| `--original` | Original image filename (for mask upload) |
### `cancel <PROMPT_ID>`
Cancel a running or queued job.
### `free [OPTIONS]`
Release GPU memory.
| Option | Short | Description |
|--------|-------|-------------|
| `--models` | `-m` | Unload all models from VRAM |
| `--memory` | | Free all cached memory |
## Command Groups
### `server` — Manage ComfyUI Servers
| Subcommand | Description |
|------------|-------------|
| `server list` | List all configured servers |
| `server status [SERVER_ID]` | Check if server is online |
| `server stats [SERVER_ID]` | System stats: VRAM, RAM, GPU, versions (`--all` for all servers) |
| `server add` | Add server (`--id`, `--url` required; `--name`, `--output-dir`, `--auth`, `--api-key` optional) |
| `server enable <SERVER_ID>` | Enable a server |
| `server disable <SERVER_ID>` | Disable a server |
| `server remove <SERVER_ID>` | Remove a server |
### `workflow` — Manage Workflows
| Subcommand | Description |
|------------|-------------|
| `workflow import [JSON_PATH]` | Import workflow (`--name`, `--type` image/audio/video, `--from-server`, `--preview`, `--check-deps`) |
| `workflow enable <SKILL_ID>` | Enable a workflow |
| `workflow disable <SKILL_ID>` | Disable a workflow |
| `workflow delete <SKILL_ID>` | Delete a workflow |
### `models` — Discover Models
| Subcommand | Description |
|------------|-------------|
| `models list [FOLDER]` | List models in a folder (checkpoints, loras, vae, controlnet, etc.) |
### `nodes` — Discover Nodes
| Subcommand | Description |
|------------|-------------|
| `nodes list` | List all node classes (`-c` to filter by category) |
| `nodes info <NODE_CLASS>` | Full details of a node type |
| `nodes search <QUERY>` | Fuzzy search across names/categories |
### `deps` — Dependency Management
| Subcommand | Description |
|------------|-------------|
| `deps check <SKILL_ID>` | Check if dependencies are installed (returns `is_ready`) |
| `deps install <SKILL_ID>` | Install missing deps (`--repos` git URLs, `--models`, `--all`) |
### `history` — Execution History
| Subcommand | Description |
|------------|-------------|
| `history list [SKILL_ID]` | List history (`--server`, `--status`, `--limit`, `--sort`) |
| `history show <SKILL_ID> <RUN_ID>` | Show specific run details |
### `queue` — Queue Management
| Subcommand | Description |
|------------|-------------|
| `queue list` | Show running and pending jobs |
| `queue clear` | Clear all pending jobs |
| `queue delete <PROMPT_IDS...>` | Remove specific jobs from queue |
### `logs` — Server Logs
| Subcommand | Description |
|------------|-------------|
| `logs show` | Show recent server logs (`--lines` / `-n`, default: 50) |
### `templates` — Discover Templates
| Subcommand | Description |
|------------|-------------|
| `templates list` | Workflow templates from custom nodes |
| `templates subgraphs` | Reusable subgraph components |
### `config` — Configuration
| Subcommand | Description |
|------------|-------------|
| `config export` | Export config + workflows as bundle (`--output`, `--portable-only`) |
| `config import <INPUT_PATH>` | Import bundle (`--dry-run`, `--apply-environment`, `--no-overwrite`) |
## Config File Format
Located at `<workspace>/config.json`:
```json
{
"default_server": "local",
"servers": [
{
"id": "local",
"name": "Local ComfyUI",
"url": "http://127.0.0.1:8188",
"enabled": true,
"output_dir": "./outputs",
"auth": "",
"comfy_api_key": ""
}
]
}
```
**Server fields:**
- `id` — unique identifier (no spaces/slashes/dots)
- `url` — ComfyUI base URL
- `enabled` — whether server is active
- `output_dir` — where outputs are saved (relative to workspace)
- `auth` — bearer token for authenticated servers
- `comfy_api_key` — Comfy Cloud API key (also sent as `extra_data.api_key_comfy_org` in prompts)
+41
View File
@@ -0,0 +1,41 @@
#!/usr/bin/env bash
# Initialize a comfyui-skill workspace directory.
# Usage: bash scripts/comfyui_setup.sh [WORKSPACE_DIR]
#
# Creates the workspace, adds a default local server config,
# and verifies the connection.
set -euo pipefail
WORKSPACE="${1:-$HOME/.hermes/comfyui}"
COMFY="${COMFY:-uvx --from comfyui-skill-cli comfyui-skill}"
echo "==> Initializing ComfyUI skill workspace at: $WORKSPACE"
mkdir -p "$WORKSPACE"
cd "$WORKSPACE"
# If config.json doesn't exist, create it with a default local server
if [ ! -f config.json ]; then
echo "==> Creating default config (local server at 127.0.0.1:8188)"
$COMFY --json server add --id local --url http://127.0.0.1:8188 --name "Local ComfyUI"
echo "==> Config created: $WORKSPACE/config.json"
else
echo "==> config.json already exists, skipping"
fi
# Verify connection
echo "==> Checking server connection..."
if $COMFY --json server status 2>/dev/null | grep -q '"online"'; then
echo "==> ComfyUI is reachable!"
$COMFY --json server stats 2>/dev/null || true
else
echo "==> ComfyUI is not reachable at the configured URL."
echo " Start ComfyUI first, or update the server URL:"
echo " cd $WORKSPACE && $COMFY server add --id local --url <YOUR_URL>"
echo ""
echo " Install ComfyUI: https://docs.comfy.org/installation"
fi
echo ""
echo "==> Workspace ready: $WORKSPACE"
echo " Always cd here before running comfyui-skill commands."
+2
View File
@@ -261,6 +261,7 @@ AUTHOR_MAP = {
"154585401+LeonSGP43@users.noreply.github.com": "LeonSGP43",
"mgparkprint@gmail.com": "vlwkaos",
"tranquil_flow@protonmail.com": "Tranquil-Flow",
"LyleLengyel@gmail.com": "mcndjxlefnd",
"wangshengyang2004@163.com": "Wangshengyang2004",
"hasan.ali13381@gmail.com": "H-Ali13381",
"xienb@proton.me": "XieNBi",
@@ -412,6 +413,7 @@ AUTHOR_MAP = {
"tesseracttars@gmail.com": "tesseracttars-creator",
"tianliangjay@gmail.com": "xingkongliang",
"tranquil_flow@protonmail.com": "Tranquil-Flow",
"LyleLengyel@gmail.com": "mcndjxlefnd",
"unayung@gmail.com": "Unayung",
"vorvul.danylo@gmail.com": "WorldInnovationsDepartment",
"win4r@outlook.com": "win4r",
+480
View File
@@ -0,0 +1,480 @@
"""Tests for agent/curator.py — orchestrator, idle gating, state transitions.
LLM spawning is never exercised here — `_run_llm_review` is monkeypatched so
tests run fully offline and the curator module doesn't need real credentials.
"""
from __future__ import annotations
import importlib
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
@pytest.fixture
def curator_env(tmp_path, monkeypatch):
"""Isolated HERMES_HOME + freshly reloaded curator + skill_usage modules."""
home = tmp_path / ".hermes"
(home / "skills").mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
import tools.skill_usage as usage
importlib.reload(usage)
import agent.curator as curator
importlib.reload(curator)
# Neutralize the real LLM pass by default — tests opt in per-case.
monkeypatch.setattr(curator, "_run_llm_review", lambda prompt: "llm-stub")
# Default: no config file → curator defaults. Tests can override.
monkeypatch.setattr(curator, "_load_config", lambda: {})
return {"home": home, "curator": curator, "usage": usage}
def _write_skill(skills_dir: Path, name: str):
d = skills_dir / name
d.mkdir(parents=True, exist_ok=True)
(d / "SKILL.md").write_text(
f"---\nname: {name}\ndescription: x\n---\n", encoding="utf-8",
)
return d
# ---------------------------------------------------------------------------
# Config gates
# ---------------------------------------------------------------------------
def test_curator_enabled_default_true(curator_env):
assert curator_env["curator"].is_enabled() is True
def test_curator_disabled_via_config(curator_env, monkeypatch):
c = curator_env["curator"]
monkeypatch.setattr(c, "_load_config", lambda: {"enabled": False})
assert c.is_enabled() is False
assert c.should_run_now() is False
def test_curator_defaults(curator_env):
c = curator_env["curator"]
assert c.get_interval_hours() == 24 * 7 # 7 days
assert c.get_min_idle_hours() == 2
assert c.get_stale_after_days() == 30
assert c.get_archive_after_days() == 90
def test_curator_config_overrides(curator_env, monkeypatch):
c = curator_env["curator"]
monkeypatch.setattr(c, "_load_config", lambda: {
"interval_hours": 12,
"min_idle_hours": 0.5,
"stale_after_days": 7,
"archive_after_days": 60,
})
assert c.get_interval_hours() == 12
assert c.get_min_idle_hours() == 0.5
assert c.get_stale_after_days() == 7
assert c.get_archive_after_days() == 60
# ---------------------------------------------------------------------------
# should_run_now
# ---------------------------------------------------------------------------
def test_first_run_always_eligible(curator_env):
c = curator_env["curator"]
assert c.should_run_now() is True
def test_recent_run_blocks(curator_env):
c = curator_env["curator"]
c.save_state({
"last_run_at": datetime.now(timezone.utc).isoformat(),
"paused": False,
})
assert c.should_run_now() is False
def test_old_run_eligible(curator_env):
"""A run older than the configured interval should re-trigger. Use a
2x-interval cushion so the test doesn't become coupled to the exact
default — bumping DEFAULT_INTERVAL_HOURS shouldn't break it."""
c = curator_env["curator"]
long_ago = datetime.now(timezone.utc) - timedelta(
hours=c.get_interval_hours() * 2
)
c.save_state({"last_run_at": long_ago.isoformat(), "paused": False})
assert c.should_run_now() is True
def test_paused_blocks_even_if_stale(curator_env):
c = curator_env["curator"]
long_ago = datetime.now(timezone.utc) - timedelta(days=30)
c.save_state({"last_run_at": long_ago.isoformat(), "paused": True})
assert c.should_run_now() is False
def test_set_paused_roundtrip(curator_env):
c = curator_env["curator"]
c.set_paused(True)
assert c.is_paused() is True
c.set_paused(False)
assert c.is_paused() is False
# ---------------------------------------------------------------------------
# Automatic state transitions
# ---------------------------------------------------------------------------
def test_unused_skill_transitions_to_stale(curator_env):
c = curator_env["curator"]
u = curator_env["usage"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "old-skill")
# Record last-use well past stale_after_days (30 default)
long_ago = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
data = u.load_usage()
data["old-skill"] = u._empty_record()
data["old-skill"]["last_used_at"] = long_ago
data["old-skill"]["created_at"] = long_ago
u.save_usage(data)
counts = c.apply_automatic_transitions()
assert counts["marked_stale"] == 1
assert u.get_record("old-skill")["state"] == "stale"
def test_very_old_skill_gets_archived(curator_env):
c = curator_env["curator"]
u = curator_env["usage"]
skills_dir = curator_env["home"] / "skills"
skill_dir = _write_skill(skills_dir, "ancient")
super_old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
data = u.load_usage()
data["ancient"] = u._empty_record()
data["ancient"]["last_used_at"] = super_old
data["ancient"]["created_at"] = super_old
u.save_usage(data)
counts = c.apply_automatic_transitions()
assert counts["archived"] == 1
assert not skill_dir.exists()
assert (skills_dir / ".archive" / "ancient" / "SKILL.md").exists()
assert u.get_record("ancient")["state"] == "archived"
def test_pinned_skill_is_never_touched(curator_env):
c = curator_env["curator"]
u = curator_env["usage"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "precious")
super_old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
data = u.load_usage()
data["precious"] = u._empty_record()
data["precious"]["last_used_at"] = super_old
data["precious"]["created_at"] = super_old
data["precious"]["pinned"] = True
u.save_usage(data)
counts = c.apply_automatic_transitions()
assert counts["archived"] == 0
assert counts["marked_stale"] == 0
rec = u.get_record("precious")
assert rec["state"] == "active" # untouched
assert rec["pinned"] is True
def test_stale_skill_reactivates_on_recent_use(curator_env):
c = curator_env["curator"]
u = curator_env["usage"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "revived")
recent = datetime.now(timezone.utc).isoformat()
data = u.load_usage()
data["revived"] = u._empty_record()
data["revived"]["state"] = "stale"
data["revived"]["last_used_at"] = recent
data["revived"]["created_at"] = recent
u.save_usage(data)
counts = c.apply_automatic_transitions()
assert counts["reactivated"] == 1
assert u.get_record("revived")["state"] == "active"
def test_new_skill_without_last_used_not_immediately_archived(curator_env):
"""A freshly-created skill with no use history should not get archived
just because last_used_at is None."""
c = curator_env["curator"]
u = curator_env["usage"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "fresh")
# Bump nothing — record doesn't exist yet. Curator should create it
# and fall back to created_at which is ~now.
counts = c.apply_automatic_transitions()
assert counts["archived"] == 0
assert counts["marked_stale"] == 0
assert (skills_dir / "fresh").exists()
def test_bundled_skill_not_touched_by_transitions(curator_env):
c = curator_env["curator"]
u = curator_env["usage"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "bundled")
(skills_dir / ".bundled_manifest").write_text(
"bundled:abc\n", encoding="utf-8",
)
super_old = (datetime.now(timezone.utc) - timedelta(days=500)).isoformat()
data = u.load_usage()
data["bundled"] = u._empty_record()
data["bundled"]["last_used_at"] = super_old
u.save_usage(data)
counts = c.apply_automatic_transitions()
# bundled skills are excluded from the agent-created list entirely
assert counts["checked"] == 0
assert (skills_dir / "bundled").exists() # never moved
# ---------------------------------------------------------------------------
# run_curator_review orchestration
# ---------------------------------------------------------------------------
def test_run_review_records_state(curator_env):
c = curator_env["curator"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "a")
result = c.run_curator_review(synchronous=True)
assert "started_at" in result
state = c.load_state()
assert state["last_run_at"] is not None
assert state["run_count"] >= 1
assert state["last_run_summary"] is not None
def test_run_review_synchronous_invokes_llm_stub(curator_env, monkeypatch):
c = curator_env["curator"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "a")
calls = []
monkeypatch.setattr(
c, "_run_llm_review",
lambda prompt: (calls.append(prompt), "stubbed-summary")[1],
)
captured = []
c.run_curator_review(on_summary=lambda s: captured.append(s), synchronous=True)
assert len(calls) == 1
assert "skill CURATOR" in calls[0] or "CURATOR" in calls[0]
assert captured # on_summary was called
assert any("stubbed-summary" in s for s in captured)
def test_run_review_skips_llm_when_no_candidates(curator_env, monkeypatch):
c = curator_env["curator"]
# No skills in the dir → no candidates
calls = []
monkeypatch.setattr(
c, "_run_llm_review",
lambda prompt: (calls.append(prompt), "never-called")[1],
)
captured = []
c.run_curator_review(on_summary=lambda s: captured.append(s), synchronous=True)
assert calls == [] # LLM not invoked
assert any("skipped" in s for s in captured)
def test_maybe_run_curator_respects_disabled(curator_env, monkeypatch):
c = curator_env["curator"]
monkeypatch.setattr(c, "_load_config", lambda: {"enabled": False})
result = c.maybe_run_curator()
assert result is None
def test_maybe_run_curator_enforces_idle_gate(curator_env, monkeypatch):
c = curator_env["curator"]
monkeypatch.setattr(c, "_load_config", lambda: {"min_idle_hours": 2})
# idle less than the threshold
result = c.maybe_run_curator(idle_for_seconds=60.0)
assert result is None
def test_maybe_run_curator_runs_when_eligible(curator_env, monkeypatch):
c = curator_env["curator"]
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "a")
# Force idle over threshold
result = c.maybe_run_curator(idle_for_seconds=99999.0)
assert result is not None
assert "started_at" in result
def test_maybe_run_curator_swallows_exceptions(curator_env, monkeypatch):
c = curator_env["curator"]
def explode():
raise RuntimeError("boom")
monkeypatch.setattr(c, "should_run_now", explode)
# Must not raise
assert c.maybe_run_curator() is None
# ---------------------------------------------------------------------------
# Persistence
# ---------------------------------------------------------------------------
def test_state_file_survives_corrupt_read(curator_env):
c = curator_env["curator"]
c._state_file().write_text("not json", encoding="utf-8")
# Must fall back to default, not raise
assert c.load_state() == c._default_state()
def test_state_atomic_write_no_tmp_leftovers(curator_env):
c = curator_env["curator"]
c.save_state({"paused": True})
parent = c._state_file().parent
for p in parent.iterdir():
assert not p.name.startswith(".curator_state_"), f"tmp leftover: {p.name}"
def test_curator_review_prompt_has_invariants():
"""Core invariants must be in the review prompt text."""
from agent.curator import CURATOR_REVIEW_PROMPT
assert "MUST NOT" in CURATOR_REVIEW_PROMPT or "DO NOT" in CURATOR_REVIEW_PROMPT
assert "bundled" in CURATOR_REVIEW_PROMPT.lower()
assert "delete" in CURATOR_REVIEW_PROMPT.lower()
assert "pinned" in CURATOR_REVIEW_PROMPT.lower()
# Must describe the actions the reviewer can take. The exact vocabulary
# has tightened over time (the umbrella-first prompt drops 'keep' as a
# first-class decision verb, since passive keep-everything is the
# failure mode the prompt is trying to avoid), but the core merge /
# archive / patch trio must remain callable.
for verb in ("patch", "archive"):
assert verb in CURATOR_REVIEW_PROMPT.lower()
# Must mention consolidation (possibly via "merge" or "consolidat")
assert "consolidat" in CURATOR_REVIEW_PROMPT.lower() or "merge" in CURATOR_REVIEW_PROMPT.lower()
def test_curator_review_prompt_points_at_existing_tools_only():
"""The review prompt must rely on existing tools (skill_manage + terminal)
and must NOT reference bespoke curator tools that are not registered
model tools."""
from agent.curator import CURATOR_REVIEW_PROMPT
assert "skill_manage" in CURATOR_REVIEW_PROMPT
assert "skills_list" in CURATOR_REVIEW_PROMPT
assert "skill_view" in CURATOR_REVIEW_PROMPT
assert "terminal" in CURATOR_REVIEW_PROMPT.lower()
# These would be nice but aren't actually registered as tools — the
# curator uses skill_manage + terminal mv instead.
assert "archive_skill" not in CURATOR_REVIEW_PROMPT
assert "pin_skill" not in CURATOR_REVIEW_PROMPT
def test_curator_does_not_instruct_model_to_pin():
"""Pinning is a user opt-out, not a model decision. The prompt should
not tell the reviewer to pin skills autonomously."""
from agent.curator import CURATOR_REVIEW_PROMPT
# "pinned" appears in the invariant ("skip pinned skills"), but "pin"
# as a decision verb should not.
lines = CURATOR_REVIEW_PROMPT.split("\n")
decision_block = "\n".join(
l for l in lines
if l.strip().startswith(("keep", "patch", "archive", "consolidate", "pin "))
)
# No standalone "pin" action line
assert not any(l.strip().startswith("pin ") for l in lines), (
f"Found a pin action line in:\n{decision_block}"
)
def test_curator_review_prompt_is_umbrella_first():
"""The curator prompt must push umbrella-building / class-level thinking,
not pair-level 'are these two the same?' analysis."""
from agent.curator import CURATOR_REVIEW_PROMPT
lower = CURATOR_REVIEW_PROMPT.lower()
# Must frame the task as active umbrella-building, not a passive audit.
assert "umbrella" in lower, (
"must use UMBRELLA framing — the class-first abstraction the curator "
"is designed to produce"
)
# Must tell the reviewer not to stop at pair-level distinctness.
assert "class" in lower, "must reference class-level thinking"
# Must cover the three consolidation methods explicitly
assert "references/" in CURATOR_REVIEW_PROMPT, (
"must name references/ as a demotion target for session-specific content"
)
# templates/ and scripts/ make the umbrella a real class-level skill
assert "templates/" in CURATOR_REVIEW_PROMPT
assert "scripts/" in CURATOR_REVIEW_PROMPT
# Must say the counter argument: usage=0 is not a reason to skip
assert "use_count" in CURATOR_REVIEW_PROMPT or "counter" in lower, (
"must pre-empt the 'usage counters are zero, I can't judge' bailout"
)
def test_curator_review_prompt_offers_support_file_actions():
"""Support-file demotion (references/templates/scripts) must be one of
the three consolidation methods, alongside merge-into-existing and
create-new-umbrella."""
from agent.curator import CURATOR_REVIEW_PROMPT
# skill_manage action=write_file is how references/ are added to an
# existing skill — this is the create-adjacent action the curator needs
# to demote narrow siblings without touching their SKILL.md.
assert "write_file" in CURATOR_REVIEW_PROMPT
# Must offer creating a brand-new umbrella when no existing one fits
assert "action=create" in CURATOR_REVIEW_PROMPT or "create a new umbrella" in CURATOR_REVIEW_PROMPT.lower()
def test_cli_unpin_refuses_bundled_skill(curator_env, capsys):
"""hermes curator unpin must refuse bundled/hub skills too (matches pin)."""
from hermes_cli import curator as cli
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "ship-skill")
(skills_dir / ".bundled_manifest").write_text(
"ship-skill:abc\n", encoding="utf-8",
)
class _A:
skill = "ship-skill"
rc = cli._cmd_unpin(_A())
captured = capsys.readouterr()
assert rc == 1
assert "bundled" in captured.out.lower() or "hub" in captured.out.lower()
def test_cli_pin_refuses_bundled_skill(curator_env, capsys):
from hermes_cli import curator as cli
skills_dir = curator_env["home"] / "skills"
_write_skill(skills_dir, "ship-skill")
(skills_dir / ".bundled_manifest").write_text(
"ship-skill:abc\n", encoding="utf-8",
)
class _A:
skill = "ship-skill"
rc = cli._cmd_pin(_A())
captured = capsys.readouterr()
assert rc == 1
assert "bundled" in captured.out.lower() or "hub" in captured.out.lower()
+49 -5
View File
@@ -1,9 +1,11 @@
"""Tests for CLI browser CDP auto-launch helpers."""
import os
import subprocess
from unittest.mock import patch
from cli import HermesCLI
from hermes_cli.browser_connect import manual_chrome_debug_command
def _assert_chrome_debug_cmd(cmd, expected_chrome, expected_port):
@@ -26,13 +28,19 @@ class TestChromeDebugLaunch:
captured["kwargs"] = kwargs
return object()
with patch("cli.shutil.which", side_effect=lambda name: r"C:\Chrome\chrome.exe" if name == "chrome.exe" else None), \
patch("cli.os.path.isfile", side_effect=lambda path: path == r"C:\Chrome\chrome.exe"), \
with patch("hermes_cli.browser_connect.shutil.which", side_effect=lambda name: r"C:\Chrome\chrome.exe" if name == "chrome.exe" else None), \
patch("hermes_cli.browser_connect.os.path.isfile", side_effect=lambda path: path == r"C:\Chrome\chrome.exe"), \
patch("subprocess.Popen", side_effect=fake_popen):
assert HermesCLI._try_launch_chrome_debug(9333, "Windows") is True
_assert_chrome_debug_cmd(captured["cmd"], r"C:\Chrome\chrome.exe", 9333)
assert captured["kwargs"]["start_new_session"] is True
# Windows uses creationflags (POSIX-only start_new_session would raise).
assert "start_new_session" not in captured["kwargs"]
flags = captured["kwargs"].get("creationflags", 0)
expected = getattr(subprocess, "DETACHED_PROCESS", 0) | getattr(
subprocess, "CREATE_NEW_PROCESS_GROUP", 0
)
assert flags == expected
def test_windows_launch_falls_back_to_common_install_dirs(self, monkeypatch):
captured = {}
@@ -49,9 +57,45 @@ class TestChromeDebugLaunch:
monkeypatch.delenv("ProgramFiles(x86)", raising=False)
monkeypatch.delenv("LOCALAPPDATA", raising=False)
with patch("cli.shutil.which", return_value=None), \
patch("cli.os.path.isfile", side_effect=lambda path: path == installed), \
with patch("hermes_cli.browser_connect.shutil.which", return_value=None), \
patch("hermes_cli.browser_connect.os.path.isfile", side_effect=lambda path: path == installed), \
patch("subprocess.Popen", side_effect=fake_popen):
assert HermesCLI._try_launch_chrome_debug(9222, "Windows") is True
_assert_chrome_debug_cmd(captured["cmd"], installed, 9222)
def test_manual_command_uses_detected_linux_browser(self):
with patch("hermes_cli.browser_connect.shutil.which", side_effect=lambda name: "/usr/bin/chromium" if name == "chromium" else None), \
patch("hermes_cli.browser_connect.os.path.isfile", side_effect=lambda path: path == "/usr/bin/chromium"):
command = manual_chrome_debug_command(9222, "Linux")
assert command is not None
assert command.startswith("/usr/bin/chromium --remote-debugging-port=9222")
def test_manual_command_uses_wsl_windows_chrome_when_available(self):
chrome = "/mnt/c/Program Files/Google/Chrome/Application/chrome.exe"
with patch("hermes_cli.browser_connect.shutil.which", return_value=None), \
patch("hermes_cli.browser_connect.os.path.isfile", side_effect=lambda path: path == chrome):
command = manual_chrome_debug_command(9222, "Linux")
assert command is not None
# Linux/WSL uses POSIX shell quoting (single quotes around paths with spaces).
assert command.startswith(f"'{chrome}' --remote-debugging-port=9222")
def test_manual_command_uses_windows_quoting_on_windows(self):
chrome = r"C:\Program Files\Google\Chrome\Application\chrome.exe"
with patch("hermes_cli.browser_connect.shutil.which", side_effect=lambda name: chrome if name == "chrome.exe" else None), \
patch("hermes_cli.browser_connect.os.path.isfile", side_effect=lambda path: path == chrome):
command = manual_chrome_debug_command(9222, "Windows")
assert command is not None
# Windows uses cmd.exe-compatible quoting via subprocess.list2cmdline.
assert command.startswith(f'"{chrome}" --remote-debugging-port=9222')
assert "'" not in command
def test_manual_command_returns_none_when_linux_browser_missing(self):
with patch("hermes_cli.browser_connect.shutil.which", return_value=None), \
patch("hermes_cli.browser_connect.os.path.isfile", return_value=False):
assert manual_chrome_debug_command(9222, "Linux") is None
+35
View File
@@ -41,6 +41,10 @@ def _simulate_config_bridge(cfg: dict, initial_env: dict | None = None):
# TERMINAL_CWD. Mirrors the fix in gateway/run.py.
if cfg_key == "cwd" and str(val) in (".", "auto", "cwd"):
continue
# Expand shell tilde so subprocess.Popen never receives a literal
# "~/" which the kernel rejects.
if cfg_key == "cwd" and isinstance(val, str):
val = os.path.expanduser(val)
if isinstance(val, list):
env[env_var] = json.dumps(val)
else:
@@ -55,6 +59,8 @@ def _simulate_config_bridge(cfg: dict, initial_env: dict | None = None):
if alias_env not in env:
alias_val = cfg.get(alias_key)
if isinstance(alias_val, str) and alias_val.strip():
if alias_key == "cwd":
alias_val = os.path.expanduser(alias_val)
env[alias_env] = alias_val.strip()
# --- Replicate lines 144-147: MESSAGING_CWD fallback ---
@@ -205,3 +211,32 @@ class TestNestedTerminalCwdPlaceholderSkip:
assert result["TERMINAL_ENV"] == "docker"
assert result["TERMINAL_TIMEOUT"] == "300"
assert result["TERMINAL_CWD"] == "/from/env"
class TestTildeExpansion:
"""terminal.cwd values containing shell tilde must be expanded.
subprocess.Popen does not expand shell syntax, so a literal "~/"
causes FileNotFoundError. Regression test for commit 3c42064e.
"""
def test_terminal_cwd_tilde_expanded(self):
"""terminal.cwd: '~/projects' should expand to /home/<user>/projects."""
cfg = {"terminal": {"cwd": "~/projects"}}
result = _simulate_config_bridge(cfg)
assert result["TERMINAL_CWD"] == os.path.expanduser("~/projects")
def test_top_level_cwd_tilde_expanded(self):
"""top-level cwd: '~/' should expand to user's home directory."""
cfg = {"cwd": "~/"}
result = _simulate_config_bridge(cfg)
assert result["TERMINAL_CWD"] == os.path.expanduser("~/")
def test_tilde_with_nested_precedence(self):
"""Nested terminal.cwd should win over top-level, both expanded."""
cfg = {
"cwd": "~/top",
"terminal": {"cwd": "~/nested"},
}
result = _simulate_config_bridge(cfg)
assert result["TERMINAL_CWD"] == os.path.expanduser("~/nested")
+156
View File
@@ -1337,3 +1337,159 @@ class TestCursorStrippingOnFallback:
assert consumer._already_sent is True
# _last_sent_text must NOT be updated when the edit failed
assert consumer._last_sent_text == "Hello ▉"
# ── on_new_message callback (tool-progress linearization) ─────────────
class TestOnNewMessageCallback:
"""The on_new_message callback fires whenever a fresh content bubble
lands on the platform. Gateway uses this to close off the current
tool-progress bubble so the next tool.started opens a new bubble
below the content — preserving chronological order in the chat.
Before this callback existed (post PR #7885), content messages got
their own bubbles after segment breaks, but the tool-progress task
kept editing the ORIGINAL progress bubble above all new content.
Result: tool lines appeared stacked in the upper bubble while
content messages lined up below, making the timeline look scrambled.
"""
@pytest.mark.asyncio
async def test_callback_fires_on_first_send(self):
"""First-send of a new content bubble fires on_new_message."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_delta("Hello")
consumer.finish()
await consumer.run()
assert events == ["reset"]
@pytest.mark.asyncio
async def test_callback_fires_once_per_segment(self):
"""A new first-send fires the callback again after segment break."""
adapter = MagicMock()
msg_counter = iter(["msg_1", "msg_2", "msg_3"])
adapter.send = AsyncMock(
side_effect=lambda **kw: SimpleNamespace(success=True, message_id=next(msg_counter))
)
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_delta("A")
consumer.on_delta(None)
consumer.on_delta("B")
consumer.on_delta(None)
consumer.on_delta("C")
consumer.finish()
await consumer.run()
# Three content bubbles ⇒ three reset notifications
assert events == ["reset", "reset", "reset"]
@pytest.mark.asyncio
async def test_callback_not_fired_on_edit(self):
"""Subsequent edits of the same bubble do NOT fire the callback."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_delta("Hello")
task = asyncio.create_task(consumer.run())
await asyncio.sleep(0.05)
consumer.on_delta(" world")
await asyncio.sleep(0.05)
consumer.on_delta(" more")
await asyncio.sleep(0.05)
consumer.finish()
await task
# Only one first-send happened; edits do not re-fire.
assert events == ["reset"]
@pytest.mark.asyncio
async def test_callback_fires_on_commentary(self):
"""Commentary messages are fresh bubbles too — fire the callback."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
events = []
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=lambda: events.append("reset"),
)
consumer.on_commentary("I'll search for that first.")
consumer.finish()
await consumer.run()
assert events == ["reset"]
@pytest.mark.asyncio
async def test_callback_error_swallowed(self):
"""Exceptions in the callback do not crash the consumer."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
def raiser():
raise RuntimeError("boom")
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(
adapter, "chat", config,
on_new_message=raiser,
)
consumer.on_delta("Hello")
consumer.finish()
await consumer.run() # must not raise
assert consumer.already_sent is True
@pytest.mark.asyncio
async def test_no_callback_when_none(self):
"""Consumer works correctly when on_new_message is None (default)."""
adapter = MagicMock()
adapter.send = AsyncMock(return_value=SimpleNamespace(success=True, message_id="msg_1"))
adapter.edit_message = AsyncMock(return_value=SimpleNamespace(success=True))
adapter.MAX_MESSAGE_LENGTH = 4096
config = StreamConsumerConfig(edit_interval=0.01, buffer_threshold=1)
consumer = GatewayStreamConsumer(adapter, "chat", config) # no callback
consumer.on_delta("Hello")
consumer.finish()
await consumer.run()
assert consumer.already_sent is True
+17
View File
@@ -319,6 +319,23 @@ class TestSanitizeEnvLines:
assert result[0].startswith("OPENROUTER_API_KEY=")
assert result[1].startswith("OPENAI_BASE_URL=")
def test_glm_suffix_collision_not_split(self):
"""GLM_API_KEY / GLM_BASE_URL must not be mangled by LM_API_KEY / LM_BASE_URL suffixes (#17138)."""
lines = [
"GLM_API_KEY=glm-secret\n",
"GLM_BASE_URL=https://api.z.ai/api/paas/v4\n",
]
result = _sanitize_env_lines(lines)
assert result == lines, f"GLM_* lines were corrupted by suffix collision: {result}"
def test_suffix_collision_does_not_break_real_concatenation(self):
"""A genuine concatenation that happens to start with a suffix-superset key still splits."""
lines = ["GLM_API_KEY=glmLM_API_KEY=lm-key\n"]
result = _sanitize_env_lines(lines)
assert len(result) == 2
assert result[0].startswith("GLM_API_KEY=")
assert result[1].startswith("LM_API_KEY=")
def test_save_env_value_fixes_corruption_on_write(self, tmp_path):
"""save_env_value sanitizes corrupted lines when writing a new key."""
env_file = tmp_path / ".env"
+362 -14
View File
@@ -2754,6 +2754,8 @@ def test_session_most_recent_handles_db_unavailable(monkeypatch):
)
assert resp["result"]["session_id"] is None
# ── browser.manage ───────────────────────────────────────────────────
@@ -2779,6 +2781,30 @@ def _stub_urlopen(monkeypatch, *, ok: bool):
monkeypatch.setattr(urllib.request, "urlopen", _opener)
def _stub_urlopen_capture(monkeypatch, *, ok: bool):
urls: list[str] = []
class _Resp:
status = 200
def __enter__(self):
return self
def __exit__(self, *_):
return False
def _opener(url, timeout=2.0): # noqa: ARG001 — match urllib signature
urls.append(url)
if not ok:
raise OSError("probe failed")
return _Resp()
import urllib.request
monkeypatch.setattr(urllib.request, "urlopen", _opener)
return urls
def test_browser_manage_status_reads_env_var(monkeypatch):
"""Status returns the env var verbatim (no network I/O)."""
monkeypatch.setenv("BROWSER_CDP_URL", "http://127.0.0.1:9222")
@@ -2787,7 +2813,8 @@ def test_browser_manage_status_reads_env_var(monkeypatch):
{"id": "1", "method": "browser.manage", "params": {"action": "status"}}
)
assert resp["result"] == {"connected": True, "url": "http://127.0.0.1:9222"}
assert resp["result"]["connected"] is True
assert resp["result"]["url"] == "http://127.0.0.1:9222"
def test_browser_manage_status_falls_back_to_config_cdp_url(monkeypatch):
@@ -2850,18 +2877,215 @@ def test_browser_manage_connect_sets_env_and_cleans_twice(monkeypatch):
}
)
assert resp["result"] == {"connected": True, "url": "http://127.0.0.1:9222"}
assert resp["result"]["connected"] is True
assert resp["result"]["url"] == "http://127.0.0.1:9222"
assert resp["result"]["messages"] == ["Chrome is already listening on port 9222"]
assert os.environ.get("BROWSER_CDP_URL") == "http://127.0.0.1:9222"
# First cleanup runs against the OLD env (none here), second against the NEW.
assert cleanup_calls == ["", "http://127.0.0.1:9222"]
def test_browser_manage_connect_defaults_to_loopback(monkeypatch):
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: None,
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
urls = _stub_urlopen_capture(monkeypatch, ok=True)
resp = server.handle_request(
{"id": "1", "method": "browser.manage", "params": {"action": "connect"}}
)
assert resp["result"]["connected"] is True
assert resp["result"]["url"] == "http://127.0.0.1:9222"
assert resp["result"]["messages"] == ["Chrome is already listening on port 9222"]
assert urls[0] == "http://127.0.0.1:9222/json/version"
def test_browser_manage_connect_default_local_reports_launch_hint(monkeypatch):
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
emitted: list[tuple[str, dict]] = []
monkeypatch.setattr(
server,
"_emit",
lambda evt, sid, payload=None: emitted.append((evt, payload or {})),
)
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: None,
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
_stub_urlopen(monkeypatch, ok=False)
with (
patch(
"hermes_cli.browser_connect.try_launch_chrome_debug", return_value=False
),
patch(
"hermes_cli.browser_connect.get_chrome_debug_candidates",
return_value=[],
),
):
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {
"action": "connect",
"session_id": "sess-1",
"url": "http://localhost:9222",
},
}
)
assert resp["result"]["connected"] is False
assert resp["result"]["url"] == "http://127.0.0.1:9222"
assert (
resp["result"]["messages"][0]
== "Chrome isn't running with remote debugging — attempting to launch..."
)
assert any(
"No Chrome/Chromium executable was found" in line
for line in resp["result"]["messages"]
)
assert any(
"--remote-debugging-port=9222" in line for line in resp["result"]["messages"]
)
assert "BROWSER_CDP_URL" not in os.environ
progress = [p["message"] for evt, p in emitted if evt == "browser.progress"]
assert progress == resp["result"]["messages"]
def test_browser_manage_connect_no_session_skips_progress_events(monkeypatch):
"""Without a session_id the TUI prints messages from the response;
emitting ``browser.progress`` events would double-render. Gate the
emit so callers without a session see the bundled list only."""
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
emitted: list[tuple[str, dict]] = []
monkeypatch.setattr(
server,
"_emit",
lambda evt, sid, payload=None: emitted.append((evt, payload or {})),
)
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: None,
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
_stub_urlopen(monkeypatch, ok=False)
with (
patch(
"hermes_cli.browser_connect.try_launch_chrome_debug", return_value=False
),
patch(
"hermes_cli.browser_connect.get_chrome_debug_candidates",
return_value=[],
),
):
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {"action": "connect", "url": "http://localhost:9222"},
}
)
assert resp["result"]["connected"] is False
assert resp["result"]["messages"] # bundled list still populated
assert [evt for evt, _ in emitted if evt == "browser.progress"] == []
def test_browser_manage_connect_handles_null_url(monkeypatch):
"""Explicit ``{"url": null}`` (or empty string) must fall back to the
default loopback URL instead of raising a TypeError that gets swallowed
by the outer 5031 catch."""
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: None,
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
_stub_urlopen(monkeypatch, ok=True)
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {"action": "connect", "url": None},
}
)
assert resp["result"]["connected"] is True
assert resp["result"]["url"] == "http://127.0.0.1:9222"
def test_browser_manage_connect_rejects_non_string_url(monkeypatch):
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {"action": "connect", "url": 9222},
}
)
assert resp["error"]["code"] == 4015
assert "must be a string" in resp["error"]["message"]
assert "BROWSER_CDP_URL" not in os.environ
def test_browser_manage_connect_default_local_retries_after_launch(monkeypatch):
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
monkeypatch.setattr(server.time, "sleep", lambda _seconds: None)
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: None,
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
class _Resp:
status = 200
def __enter__(self):
return self
def __exit__(self, *_):
return False
attempts = {"n": 0}
def _opener(_url, timeout=2.0): # noqa: ARG001 — match urllib signature
attempts["n"] += 1
if attempts["n"] < 3:
raise OSError("not ready")
return _Resp()
import urllib.request
monkeypatch.setattr(urllib.request, "urlopen", _opener)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
with patch(
"hermes_cli.browser_connect.try_launch_chrome_debug", return_value=True
):
resp = server.handle_request(
{"id": "1", "method": "browser.manage", "params": {"action": "connect"}}
)
assert resp["result"]["connected"] is True
assert resp["result"]["url"] == "http://127.0.0.1:9222"
assert resp["result"]["messages"] == [
"Chrome isn't running with remote debugging — attempting to launch...",
"Chrome launched and listening on port 9222",
]
assert os.environ["BROWSER_CDP_URL"] == "http://127.0.0.1:9222"
def test_browser_manage_connect_rejects_unreachable_endpoint(monkeypatch):
"""An unreachable endpoint must NOT mutate the env or reap sessions."""
monkeypatch.setenv("BROWSER_CDP_URL", "http://existing:9222")
cleanup_calls: list[str] = []
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: cleanup_calls.append(os.environ.get("BROWSER_CDP_URL", "")),
cleanup_all_browsers=lambda: cleanup_calls.append(
os.environ.get("BROWSER_CDP_URL", "")
),
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
@@ -2941,14 +3165,19 @@ def test_browser_manage_connect_preserves_devtools_browser_endpoint(monkeypatch)
concrete = "ws://browserbase.example/devtools/browser/abc123"
class _OkSocket:
def __enter__(self): return self
def __exit__(self, *a): return False
def __enter__(self):
return self
def __exit__(self, *a):
return False
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
# If urlopen is reached for a concrete ws endpoint, the test
# would still pass because _stub_urlopen returned ok=True before;
# patch it to assert-fail so we prove the HTTP probe is skipped.
with patch("urllib.request.urlopen", side_effect=AssertionError("urlopen called")):
with patch(
"urllib.request.urlopen", side_effect=AssertionError("urlopen called")
):
with patch("socket.create_connection", return_value=_OkSocket()):
resp = server.handle_request(
{
@@ -2963,6 +3192,69 @@ def test_browser_manage_connect_preserves_devtools_browser_endpoint(monkeypatch)
assert os.environ["BROWSER_CDP_URL"] == concrete
def test_browser_manage_connect_local_devtools_ws_preserves_path(monkeypatch):
"""Regression: ``ws://127.0.0.1:9222/devtools/browser/<id>`` is a real
connectable endpoint; default-local normalization must not strip the
``/devtools/browser/...`` path or it breaks valid local CDP connects."""
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: None,
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
concrete = "ws://127.0.0.1:9222/devtools/browser/abc123"
class _OkSocket:
def __enter__(self):
return self
def __exit__(self, *a):
return False
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
with patch("socket.create_connection", return_value=_OkSocket()):
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {"action": "connect", "url": concrete},
}
)
assert resp["result"]["connected"] is True
assert resp["result"]["url"] == concrete
assert os.environ["BROWSER_CDP_URL"] == concrete
def test_browser_manage_connect_rejects_invalid_port(monkeypatch):
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {"action": "connect", "url": "http://localhost:abc"},
}
)
assert resp["error"]["code"] == 4015
assert "invalid port" in resp["error"]["message"]
assert "BROWSER_CDP_URL" not in os.environ
def test_browser_manage_connect_rejects_missing_host(monkeypatch):
monkeypatch.delenv("BROWSER_CDP_URL", raising=False)
resp = server.handle_request(
{
"id": "1",
"method": "browser.manage",
"params": {"action": "connect", "url": "http://:9222"},
}
)
assert resp["error"]["code"] == 4015
assert "missing host" in resp["error"]["message"]
assert "BROWSER_CDP_URL" not in os.environ
def test_browser_manage_connect_concrete_ws_skips_http_probe(monkeypatch):
"""Regression for round-2 Copilot review: a hosted CDP endpoint
(no HTTP discovery) must connect via TCP-only reachability check.
@@ -2977,8 +3269,11 @@ def test_browser_manage_connect_concrete_ws_skips_http_probe(monkeypatch):
seen_targets: list[tuple[str, int]] = []
class _OkSocket:
def __enter__(self): return self
def __exit__(self, *a): return False
def __enter__(self):
return self
def __exit__(self, *a):
return False
def _fake_create_connection(addr, timeout=None):
seen_targets.append(addr)
@@ -2987,7 +3282,9 @@ def test_browser_manage_connect_concrete_ws_skips_http_probe(monkeypatch):
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
# urlopen would 404/ECONNREFUSED on a real hosted CDP endpoint;
# asserting it's never called proves the probe was skipped.
with patch("urllib.request.urlopen", side_effect=AssertionError("urlopen called")):
with patch(
"urllib.request.urlopen", side_effect=AssertionError("urlopen called")
):
with patch("socket.create_connection", side_effect=_fake_create_connection):
resp = server.handle_request(
{
@@ -3031,7 +3328,9 @@ def test_browser_manage_disconnect_drops_env_and_cleans(monkeypatch):
monkeypatch.setenv("BROWSER_CDP_URL", "http://127.0.0.1:9222")
cleanup_count = {"n": 0}
fake = types.SimpleNamespace(
cleanup_all_browsers=lambda: cleanup_count.__setitem__("n", cleanup_count["n"] + 1),
cleanup_all_browsers=lambda: cleanup_count.__setitem__(
"n", cleanup_count["n"] + 1
),
_get_cdp_override=lambda: os.environ.get("BROWSER_CDP_URL", ""),
)
with patch.dict(sys.modules, {"tools.browser_tool": fake}):
@@ -3099,11 +3398,16 @@ def test_config_get_indicator_falls_back_when_unset(monkeypatch):
def test_config_set_indicator_accepts_known_value(monkeypatch):
written: dict = {}
monkeypatch.setattr(
server, "_write_config_key",
server,
"_write_config_key",
lambda k, v: written.update({k: v}),
)
resp = server.handle_request(
{"id": "1", "method": "config.set", "params": {"key": "indicator", "value": "EMOJI"}}
{
"id": "1",
"method": "config.set",
"params": {"key": "indicator", "value": "EMOJI"},
}
)
assert resp["result"] == {"key": "indicator", "value": "emoji"}
assert written == {"display.tui_status_indicator": "emoji"}
@@ -3117,7 +3421,11 @@ def test_config_set_indicator_falsy_non_string_surfaces_in_error(monkeypatch):
for bad in (0, False, []):
resp = server.handle_request(
{"id": "1", "method": "config.set", "params": {"key": "indicator", "value": bad}}
{
"id": "1",
"method": "config.set",
"params": {"key": "indicator", "value": bad},
}
)
assert "error" in resp
msg = resp["error"]["message"]
@@ -3132,7 +3440,47 @@ def test_config_set_indicator_none_keeps_blank_repr(monkeypatch):
"""`None` is the genuine 'no value' case — empty raw is acceptable."""
monkeypatch.setattr(server, "_write_config_key", lambda *a, **k: None)
resp = server.handle_request(
{"id": "1", "method": "config.set", "params": {"key": "indicator", "value": None}}
{
"id": "1",
"method": "config.set",
"params": {"key": "indicator", "value": None},
}
)
assert "error" in resp
assert "unknown indicator: ''" in resp["error"]["message"]
# ── reload.env ───────────────────────────────────────────────────────
def test_reload_env_rpc_calls_hermes_cli_reload_env(monkeypatch):
"""reload.env mirrors classic CLI's `/reload` — re-reads ~/.hermes/.env
into the gateway process and reports the count of vars updated."""
calls = {"n": 0}
def _fake_reload():
calls["n"] += 1
return 7
fake = types.SimpleNamespace(reload_env=_fake_reload)
with patch.dict(sys.modules, {"hermes_cli.config": fake}):
resp = server.handle_request(
{"id": "1", "method": "reload.env", "params": {}}
)
assert resp["result"] == {"updated": 7}
assert calls["n"] == 1
def test_reload_env_rpc_surfaces_errors(monkeypatch):
def _broken():
raise RuntimeError("env path locked")
fake = types.SimpleNamespace(reload_env=_broken)
with patch.dict(sys.modules, {"hermes_cli.config": fake}):
resp = server.handle_request(
{"id": "1", "method": "reload.env", "params": {}}
)
assert "error" in resp
assert "env path locked" in resp["error"]["message"]
@@ -0,0 +1,148 @@
"""Tests that init_session() respects the configured cwd.
The bug: when terminal.cwd is set in config.yaml, the configured path was
displayed in the TUI banner but actual terminal commands ran in os.getcwd()
(the directory where ``hermes chat`` was started).
Root cause: init_session() captures the login shell environment by running
``pwd -P`` inside a ``bash -l -c`` bootstrap. Profile scripts (.bashrc,
.bash_profile, etc.) can change the working directory before ``pwd -P``
runs, so _update_cwd() overwrites self.cwd with the wrong directory.
Fix: the bootstrap now includes an explicit ``cd`` back to self.cwd before
running ``pwd -P``, so the configured cwd is always what gets recorded.
"""
from tempfile import TemporaryFile
from unittest.mock import MagicMock
from tools.environments.base import BaseEnvironment
class _TestableEnv(BaseEnvironment):
"""Concrete subclass for testing base class methods."""
def __init__(self, cwd="/tmp", timeout=10):
super().__init__(cwd=cwd, timeout=timeout)
def _run_bash(self, cmd_string, *, login=False, timeout=120, stdin_data=None):
raise NotImplementedError("Use mock")
def cleanup(self):
pass
class TestInitSessionCwdRespect:
"""init_session() must preserve the configured cwd."""
def test_bootstrap_contains_cd_to_configured_cwd(self):
"""The bootstrap script must cd to self.cwd before running pwd."""
env = _TestableEnv(cwd="/my/project")
# Capture the bootstrap script that init_session would pass to _run_bash
captured = {}
def mock_run_bash(cmd_string, *, login=False, timeout=120, stdin_data=None):
captured["cmd"] = cmd_string
mock = MagicMock()
mock.poll.return_value = 0
mock.returncode = 0
stdout = TemporaryFile(mode="w+b")
stdout.seek(0)
mock.stdout = stdout
return mock
env._run_bash = mock_run_bash
env.init_session()
assert "cmd" in captured, "init_session did not call _run_bash"
bootstrap = captured["cmd"]
# The cd must appear before pwd -P so the configured cwd is recorded
cd_pos = bootstrap.find("builtin cd")
pwd_pos = bootstrap.find("pwd -P")
assert cd_pos != -1, "bootstrap must contain 'builtin cd'"
assert pwd_pos != -1, "bootstrap must contain 'pwd -P'"
assert cd_pos < pwd_pos, (
"builtin cd must appear before pwd -P in the bootstrap so "
"the configured cwd is what gets recorded"
)
# The cd target must be the configured path (shlex.quote only adds
# quotes when the path contains shell-special characters)
assert "/my/project" in bootstrap, (
"bootstrap cd must target the configured cwd (/my/project)"
)
def test_configured_cwd_survives_init_session(self):
"""self.cwd must be the configured path after init_session completes."""
configured_cwd = "/my/project"
env = _TestableEnv(cwd=configured_cwd)
marker = env._cwd_marker
def mock_run_bash(cmd_string, *, login=False, timeout=120, stdin_data=None):
mock = MagicMock()
mock.poll.return_value = 0
mock.returncode = 0
# Simulate output where pwd reports the configured cwd
output = f"snapshot output\n{marker}{configured_cwd}{marker}\n"
stdout = TemporaryFile(mode="w+b")
stdout.write(output.encode("utf-8"))
stdout.seek(0)
mock.stdout = stdout
return mock
env._run_bash = mock_run_bash
env.init_session()
assert env.cwd == configured_cwd, (
f"Expected cwd={configured_cwd!r} after init_session, got {env.cwd!r}"
)
def test_default_cwd_still_works(self):
"""When no custom cwd is configured, default /tmp behavior is preserved."""
env = _TestableEnv() # default cwd="/tmp"
marker = env._cwd_marker
def mock_run_bash(cmd_string, *, login=False, timeout=120, stdin_data=None):
mock = MagicMock()
mock.poll.return_value = 0
mock.returncode = 0
output = f"snapshot output\n{marker}/tmp{marker}\n"
stdout = TemporaryFile(mode="w+b")
stdout.write(output.encode("utf-8"))
stdout.seek(0)
mock.stdout = stdout
return mock
env._run_bash = mock_run_bash
env.init_session()
assert env.cwd == "/tmp"
def test_bootstrap_cd_uses_shlex_quote(self):
"""Paths with spaces must be properly quoted in the bootstrap cd."""
env = _TestableEnv(cwd="/my project/with spaces")
captured = {}
def mock_run_bash(cmd_string, *, login=False, timeout=120, stdin_data=None):
captured["cmd"] = cmd_string
mock = MagicMock()
mock.poll.return_value = 0
mock.returncode = 0
stdout = TemporaryFile(mode="w+b")
stdout.seek(0)
mock.stdout = stdout
return mock
env._run_bash = mock_run_bash
env.init_session()
bootstrap = captured["cmd"]
# shlex.quote wraps paths with spaces in single quotes
assert "'/my project/with spaces'" in bootstrap, (
"bootstrap cd must properly quote paths with spaces"
)
+487
View File
@@ -0,0 +1,487 @@
"""Tests for tools/skill_usage.py — sidecar telemetry + provenance filtering."""
import json
import os
from pathlib import Path
import pytest
@pytest.fixture
def skills_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with a clean skills/ dir for each test."""
home = tmp_path / ".hermes"
home.mkdir()
(home / "skills").mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
# Force skill_usage module to re-resolve paths per test
import importlib
import tools.skill_usage as mod
importlib.reload(mod)
return home
def _write_skill(skills_dir: Path, name: str, category: str = ""):
"""Create a minimal SKILL.md with a name: frontmatter field."""
if category:
d = skills_dir / category / name
else:
d = skills_dir / name
d.mkdir(parents=True, exist_ok=True)
(d / "SKILL.md").write_text(
f"""---
name: {name}
description: test skill
---
# body
""",
encoding="utf-8",
)
return d
# ---------------------------------------------------------------------------
# Round-trip
# ---------------------------------------------------------------------------
def test_empty_usage_returns_empty_dict(skills_home):
from tools.skill_usage import load_usage
assert load_usage() == {}
def test_save_and_load_roundtrip(skills_home):
from tools.skill_usage import load_usage, save_usage
data = {"skill-a": {"use_count": 3, "state": "active"}}
save_usage(data)
loaded = load_usage()
assert loaded["skill-a"]["use_count"] == 3
assert loaded["skill-a"]["state"] == "active"
def test_save_is_atomic_no_partial_tmp_files(skills_home):
from tools.skill_usage import save_usage, _usage_file
save_usage({"x": {"use_count": 1}})
skills_dir = _usage_file().parent
# No leftover tempfile
for p in skills_dir.iterdir():
assert not p.name.startswith(".usage_"), f"leftover tmp: {p.name}"
def test_get_record_missing_returns_empty_record(skills_home):
from tools.skill_usage import get_record
rec = get_record("nonexistent")
assert rec["use_count"] == 0
assert rec["view_count"] == 0
assert rec["state"] == "active"
assert rec["pinned"] is False
assert rec["archived_at"] is None
def test_get_record_backfills_missing_keys(skills_home):
from tools.skill_usage import get_record, save_usage
save_usage({"legacy": {"use_count": 5}}) # old-format record
rec = get_record("legacy")
assert rec["use_count"] == 5
assert "view_count" in rec # backfilled
assert "state" in rec
def test_load_usage_handles_corrupt_file(skills_home):
from tools.skill_usage import load_usage, _usage_file
_usage_file().write_text("{ not json }", encoding="utf-8")
assert load_usage() == {}
# ---------------------------------------------------------------------------
# Counter bumps
# ---------------------------------------------------------------------------
def test_bump_view_increments_and_timestamps(skills_home):
from tools.skill_usage import bump_view, get_record
bump_view("my-skill")
bump_view("my-skill")
rec = get_record("my-skill")
assert rec["view_count"] == 2
assert rec["last_viewed_at"] is not None
def test_bump_use_increments_and_timestamps(skills_home):
from tools.skill_usage import bump_use, get_record
bump_use("my-skill")
rec = get_record("my-skill")
assert rec["use_count"] == 1
assert rec["last_used_at"] is not None
def test_bump_patch_increments_and_timestamps(skills_home):
from tools.skill_usage import bump_patch, get_record
bump_patch("my-skill")
rec = get_record("my-skill")
assert rec["patch_count"] == 1
assert rec["last_patched_at"] is not None
def test_bump_on_empty_name_is_noop(skills_home):
from tools.skill_usage import bump_view, load_usage
bump_view("")
assert load_usage() == {}
def test_bumps_do_not_corrupt_other_skills(skills_home):
from tools.skill_usage import bump_view, bump_use, get_record
bump_view("skill-a")
bump_use("skill-b")
bump_view("skill-a")
assert get_record("skill-a")["view_count"] == 2
assert get_record("skill-a")["use_count"] == 0
assert get_record("skill-b")["use_count"] == 1
# ---------------------------------------------------------------------------
# State transitions
# ---------------------------------------------------------------------------
def test_set_state_active(skills_home):
from tools.skill_usage import set_state, get_record, STATE_ACTIVE
set_state("x", STATE_ACTIVE)
assert get_record("x")["state"] == "active"
def test_set_state_archived_records_timestamp(skills_home):
from tools.skill_usage import set_state, get_record, STATE_ARCHIVED
set_state("x", STATE_ARCHIVED)
rec = get_record("x")
assert rec["state"] == "archived"
assert rec["archived_at"] is not None
def test_set_state_invalid_is_noop(skills_home):
from tools.skill_usage import set_state, get_record
set_state("x", "bogus")
# No record created for invalid state
rec = get_record("x")
assert rec["state"] == "active" # default
def test_restoring_from_archive_clears_timestamp(skills_home):
from tools.skill_usage import set_state, get_record, STATE_ARCHIVED, STATE_ACTIVE
set_state("x", STATE_ARCHIVED)
assert get_record("x")["archived_at"] is not None
set_state("x", STATE_ACTIVE)
assert get_record("x")["archived_at"] is None
def test_set_pinned(skills_home):
from tools.skill_usage import set_pinned, get_record
set_pinned("x", True)
assert get_record("x")["pinned"] is True
set_pinned("x", False)
assert get_record("x")["pinned"] is False
def test_forget_removes_record(skills_home):
from tools.skill_usage import bump_view, forget, load_usage
bump_view("x")
assert "x" in load_usage()
forget("x")
assert "x" not in load_usage()
# ---------------------------------------------------------------------------
# Provenance filter — the load-bearing safety check
# ---------------------------------------------------------------------------
def test_agent_created_excludes_bundled(skills_home):
from tools.skill_usage import list_agent_created_skill_names
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "bundled-skill", category="github")
_write_skill(skills_dir, "my-skill")
# Seed a bundled manifest marking bundled-skill as upstream
(skills_dir / ".bundled_manifest").write_text(
"bundled-skill:abc123\n", encoding="utf-8",
)
names = list_agent_created_skill_names()
assert "my-skill" in names
assert "bundled-skill" not in names
def test_agent_created_excludes_hub_installed(skills_home):
from tools.skill_usage import list_agent_created_skill_names
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "hub-skill")
_write_skill(skills_dir, "my-skill")
hub_dir = skills_dir / ".hub"
hub_dir.mkdir()
(hub_dir / "lock.json").write_text(
json.dumps({"version": 1, "installed": {"hub-skill": {"source": "taps/main"}}}),
encoding="utf-8",
)
names = list_agent_created_skill_names()
assert "my-skill" in names
assert "hub-skill" not in names
def test_is_agent_created(skills_home):
from tools.skill_usage import is_agent_created
skills_dir = skills_home / "skills"
(skills_dir / ".bundled_manifest").write_text("bundled:abc\n", encoding="utf-8")
hub_dir = skills_dir / ".hub"
hub_dir.mkdir()
(hub_dir / "lock.json").write_text(
json.dumps({"installed": {"hubbed": {}}}), encoding="utf-8",
)
assert is_agent_created("my-skill") is True
assert is_agent_created("bundled") is False
assert is_agent_created("hubbed") is False
def test_agent_created_skips_archive_and_hub_dirs(skills_home):
from tools.skill_usage import list_agent_created_skill_names
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "real-skill")
# Dot-prefixed dirs must be ignored even if they contain SKILL.md
archive = skills_dir / ".archive" / "old-skill"
archive.mkdir(parents=True)
(archive / "SKILL.md").write_text(
"---\nname: old-skill\n---\n", encoding="utf-8",
)
names = list_agent_created_skill_names()
assert "real-skill" in names
assert "old-skill" not in names
# ---------------------------------------------------------------------------
# Archive / restore
# ---------------------------------------------------------------------------
def test_archive_skill_moves_directory(skills_home):
from tools.skill_usage import archive_skill, get_record, STATE_ARCHIVED
skills_dir = skills_home / "skills"
skill_dir = _write_skill(skills_dir, "old-skill")
assert skill_dir.exists()
ok, msg = archive_skill("old-skill")
assert ok, msg
assert not skill_dir.exists()
assert (skills_dir / ".archive" / "old-skill" / "SKILL.md").exists()
assert get_record("old-skill")["state"] == "archived"
assert get_record("old-skill")["archived_at"] is not None
def test_archive_refuses_bundled_skill(skills_home):
from tools.skill_usage import archive_skill
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "bundled")
(skills_dir / ".bundled_manifest").write_text("bundled:abc\n", encoding="utf-8")
ok, msg = archive_skill("bundled")
assert not ok
assert "bundled" in msg.lower() or "hub" in msg.lower()
def test_archive_refuses_hub_skill(skills_home):
from tools.skill_usage import archive_skill
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "hub-skill")
hub_dir = skills_dir / ".hub"
hub_dir.mkdir()
(hub_dir / "lock.json").write_text(
json.dumps({"installed": {"hub-skill": {}}}), encoding="utf-8",
)
ok, msg = archive_skill("hub-skill")
assert not ok
def test_archive_missing_skill_returns_error(skills_home):
from tools.skill_usage import archive_skill
ok, msg = archive_skill("nonexistent")
assert not ok
assert "not found" in msg.lower()
def test_restore_skill_moves_back(skills_home):
from tools.skill_usage import archive_skill, restore_skill, get_record
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "temp-skill")
archive_skill("temp-skill")
assert not (skills_dir / "temp-skill").exists()
ok, msg = restore_skill("temp-skill")
assert ok, msg
assert (skills_dir / "temp-skill" / "SKILL.md").exists()
assert get_record("temp-skill")["state"] == "active"
def test_archive_collision_gets_suffix(skills_home):
from tools.skill_usage import archive_skill
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "dup")
archive_skill("dup")
_write_skill(skills_dir, "dup") # recreate
ok, msg = archive_skill("dup")
assert ok
# Two entries under .archive/ — second should have a timestamp suffix
archived = sorted(p.name for p in (skills_dir / ".archive").iterdir() if p.is_dir())
assert "dup" in archived
assert any(n.startswith("dup-") and n != "dup" for n in archived)
# ---------------------------------------------------------------------------
# Reporting
# ---------------------------------------------------------------------------
def test_agent_created_report_includes_defaults(skills_home):
from tools.skill_usage import agent_created_report, bump_view
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "a")
_write_skill(skills_dir, "b")
bump_view("a")
rows = agent_created_report()
by_name = {r["name"]: r for r in rows}
assert "a" in by_name and "b" in by_name
assert by_name["a"]["view_count"] == 1
# b has no usage record yet — must still appear with defaults
assert by_name["b"]["view_count"] == 0
assert by_name["b"]["state"] == "active"
def test_agent_created_report_excludes_bundled_and_hub(skills_home):
from tools.skill_usage import agent_created_report
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "mine")
_write_skill(skills_dir, "bundled")
_write_skill(skills_dir, "hubbed")
(skills_dir / ".bundled_manifest").write_text("bundled:abc\n", encoding="utf-8")
hub = skills_dir / ".hub"
hub.mkdir()
(hub / "lock.json").write_text(
json.dumps({"installed": {"hubbed": {}}}), encoding="utf-8",
)
names = {r["name"] for r in agent_created_report()}
assert "mine" in names
assert "bundled" not in names
assert "hubbed" not in names
# ---------------------------------------------------------------------------
# Provenance guard — telemetry must not leak records for bundled/hub skills
# ---------------------------------------------------------------------------
def test_bump_view_no_op_for_bundled_skill(skills_home):
"""Telemetry bumps on bundled skills are dropped — the sidecar must stay
focused on agent-created skills only."""
from tools.skill_usage import bump_view, load_usage
skills_dir = skills_home / "skills"
(skills_dir / ".bundled_manifest").write_text(
"ship-bundled:abc\n", encoding="utf-8",
)
bump_view("ship-bundled")
assert "ship-bundled" not in load_usage(), (
"bundled skill leaked into .usage.json"
)
def test_bump_patch_no_op_for_hub_skill(skills_home):
from tools.skill_usage import bump_patch, load_usage
skills_dir = skills_home / "skills"
hub = skills_dir / ".hub"
hub.mkdir()
(hub / "lock.json").write_text(
json.dumps({"installed": {"from-hub": {}}}), encoding="utf-8",
)
bump_patch("from-hub")
assert "from-hub" not in load_usage()
def test_bump_use_no_op_for_hub_skill(skills_home):
from tools.skill_usage import bump_use, load_usage
skills_dir = skills_home / "skills"
hub = skills_dir / ".hub"
hub.mkdir()
(hub / "lock.json").write_text(
json.dumps({"installed": {"from-hub": {}}}), encoding="utf-8",
)
bump_use("from-hub")
assert "from-hub" not in load_usage()
def test_set_state_no_op_for_bundled_skill(skills_home):
"""State transitions on bundled skills must not land in the sidecar."""
from tools.skill_usage import set_state, load_usage, STATE_ARCHIVED
skills_dir = skills_home / "skills"
(skills_dir / ".bundled_manifest").write_text(
"locked:abc\n", encoding="utf-8",
)
set_state("locked", STATE_ARCHIVED)
assert "locked" not in load_usage()
def test_restore_refuses_to_shadow_bundled_skill(skills_home):
"""If a bundled skill now occupies the name, refuse to restore."""
from tools.skill_usage import archive_skill, restore_skill
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "shared-name")
archive_skill("shared-name")
# Now a bundled skill appears with the same name
(skills_dir / ".bundled_manifest").write_text(
"shared-name:abc\n", encoding="utf-8",
)
_write_skill(skills_dir, "shared-name") # bundled install landed
ok, msg = restore_skill("shared-name")
assert not ok
assert "bundled" in msg.lower() or "shadow" in msg.lower()
def test_end_to_end_no_code_path_mutates_bundled_skill(skills_home):
"""The combined guarantee: no curator code path can archive, mark stale,
set-state, or persist telemetry for a bundled or hub-installed skill."""
from tools.skill_usage import (
bump_view, bump_use, bump_patch, set_state, set_pinned,
archive_skill, load_usage, STATE_STALE, STATE_ARCHIVED,
)
skills_dir = skills_home / "skills"
_write_skill(skills_dir, "bundled-one")
_write_skill(skills_dir, "hub-one")
_write_skill(skills_dir, "mine")
(skills_dir / ".bundled_manifest").write_text(
"bundled-one:abc\n", encoding="utf-8",
)
hub = skills_dir / ".hub"
hub.mkdir()
(hub / "lock.json").write_text(
json.dumps({"installed": {"hub-one": {}}}), encoding="utf-8",
)
# Hammer every mutator at the bundled/hub names
for name in ("bundled-one", "hub-one"):
bump_view(name)
bump_use(name)
bump_patch(name)
set_state(name, STATE_STALE)
set_state(name, STATE_ARCHIVED)
set_pinned(name, True)
ok, _msg = archive_skill(name)
assert not ok, f"archive_skill(\"{name}\") should refuse"
# Sidecar must be clean of all three
data = load_usage()
assert "bundled-one" not in data
assert "hub-one" not in data
# Directories must still be in place on disk
assert (skills_dir / "bundled-one" / "SKILL.md").exists()
assert (skills_dir / "hub-one" / "SKILL.md").exists()
# The agent-created skill can still be mutated normally
bump_view("mine")
assert load_usage()["mine"]["view_count"] == 1
+5
View File
@@ -335,6 +335,10 @@ class BaseEnvironment(ABC):
instead of running with ``bash -l``.
"""
# Full capture: env vars, functions (filtered), aliases, shell options.
# Restore configured cwd after login shell profile scripts, which may
# change the working directory (e.g. bashrc `cd ~`). Without this,
# pwd -P captures the profile's directory, not terminal.cwd.
_quoted_cwd = shlex.quote(self.cwd)
bootstrap = (
f"export -p > {self._snapshot_path}\n"
f"declare -f | grep -vE '^_[^_]' >> {self._snapshot_path}\n"
@@ -342,6 +346,7 @@ class BaseEnvironment(ABC):
f"echo 'shopt -s expand_aliases' >> {self._snapshot_path}\n"
f"echo 'set +e' >> {self._snapshot_path}\n"
f"echo 'set +u' >> {self._snapshot_path}\n"
f"builtin cd {_quoted_cwd} 2>/dev/null || true\n"
f"pwd -P > {self._cwd_file} 2>/dev/null || true\n"
f"printf '\\n{self._cwd_marker}%s{self._cwd_marker}\\n' \"$(pwd -P)\"\n"
)
+2
View File
@@ -305,6 +305,8 @@ class LocalEnvironment(BaseEnvironment):
"""
def __init__(self, cwd: str = "", timeout: int = 60, env: dict = None):
if cwd:
cwd = os.path.expanduser(cwd)
super().__init__(cwd=cwd or os.getcwd(), timeout=timeout, env=env)
self.init_session()
+11
View File
@@ -700,6 +700,17 @@ def skill_manage(
clear_skills_system_prompt_cache(clear_snapshot=True)
except Exception:
pass
# Curator telemetry: bump patch_count on edit/patch/write_file (the actions
# that mutate an existing skill's guidance), drop the record on delete.
# Best-effort; telemetry failures never break the tool.
try:
from tools.skill_usage import bump_patch, forget
if action in ("patch", "edit", "write_file", "remove_file"):
bump_patch(name)
elif action == "delete":
forget(name)
except Exception:
pass
return json.dumps(result, ensure_ascii=False)
+456
View File
@@ -0,0 +1,456 @@
"""Skill usage telemetry + provenance tracking for the Curator feature.
Tracks per-skill usage metadata in a sidecar JSON file (~/.hermes/skills/.usage.json)
keyed by skill name. Counters are bumped by the existing skill tools (skill_view,
skill_manage); the curator orchestrator reads them to decide lifecycle transitions.
Design notes:
- Sidecar, not frontmatter. Keeps operational telemetry out of user-authored
SKILL.md content and avoids conflict pressure for bundled/hub skills.
- Atomic writes via tempfile + os.replace (same pattern as .bundled_manifest).
- All counter bumps are best-effort: failures log at DEBUG and return silently.
A broken sidecar never breaks the underlying tool call.
- Provenance filter: "agent-created" == not in .bundled_manifest AND not in
.hub/lock.json. The curator only ever mutates agent-created skills.
Lifecycle states:
active -> default
stale -> unused > stale_after_days (config)
archived -> unused > archive_after_days (config); moved to .archive/
pinned -> opt-out from auto transitions (boolean flag, orthogonal to state)
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
STATE_ACTIVE = "active"
STATE_STALE = "stale"
STATE_ARCHIVED = "archived"
_VALID_STATES = {STATE_ACTIVE, STATE_STALE, STATE_ARCHIVED}
def _skills_dir() -> Path:
return get_hermes_home() / "skills"
def _usage_file() -> Path:
return _skills_dir() / ".usage.json"
def _archive_dir() -> Path:
return _skills_dir() / ".archive"
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
# ---------------------------------------------------------------------------
# Provenance — which skills are agent-created (and thus eligible for curation)
# ---------------------------------------------------------------------------
def _read_bundled_manifest_names() -> Set[str]:
"""Return the set of skill names that were seeded from the bundled repo.
Reads ~/.hermes/skills/.bundled_manifest (format: "name:hash" per line).
Returns empty set if the file is missing or unreadable.
"""
manifest = _skills_dir() / ".bundled_manifest"
if not manifest.exists():
return set()
names: Set[str] = set()
try:
for line in manifest.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
name = line.split(":", 1)[0].strip()
if name:
names.add(name)
except OSError as e:
logger.debug("Failed to read bundled manifest: %s", e)
return names
def _read_hub_installed_names() -> Set[str]:
"""Return the set of skill names installed via the Skills Hub.
Reads ~/.hermes/skills/.hub/lock.json (see tools/skills_hub.py :: HubLockFile).
"""
lock_path = _skills_dir() / ".hub" / "lock.json"
if not lock_path.exists():
return set()
try:
data = json.loads(lock_path.read_text(encoding="utf-8"))
if isinstance(data, dict):
installed = data.get("installed") or {}
if isinstance(installed, dict):
return {str(k) for k in installed.keys()}
except (OSError, json.JSONDecodeError) as e:
logger.debug("Failed to read hub lock file: %s", e)
return set()
def list_agent_created_skill_names() -> List[str]:
"""Enumerate skills that were authored by the agent (or user), NOT by a
bundled or hub-installed source.
The curator operates exclusively on this set. Bundled / hub skills are
maintained by their upstream sources and must never be pruned here.
"""
base = _skills_dir()
if not base.exists():
return []
bundled = _read_bundled_manifest_names()
hub = _read_hub_installed_names()
off_limits = bundled | hub
names: List[str] = []
# Top-level SKILL.md files (flat layout) AND nested category/skill/SKILL.md
for skill_md in base.rglob("SKILL.md"):
# Skip anything under .archive or .hub
try:
rel = skill_md.relative_to(base)
except ValueError:
continue
parts = rel.parts
if parts and (parts[0].startswith(".") or parts[0] == "node_modules"):
continue
name = _read_skill_name(skill_md, fallback=skill_md.parent.name)
if name in off_limits:
continue
names.append(name)
return sorted(set(names))
def _read_skill_name(skill_md: Path, fallback: str) -> str:
"""Parse the `name:` field from a SKILL.md YAML frontmatter."""
try:
text = skill_md.read_text(encoding="utf-8", errors="replace")[:4000]
except OSError:
return fallback
in_frontmatter = False
for line in text.split("\n"):
stripped = line.strip()
if stripped == "---":
if in_frontmatter:
break
in_frontmatter = True
continue
if in_frontmatter and stripped.startswith("name:"):
value = stripped.split(":", 1)[1].strip().strip("\"'")
if value:
return value
return fallback
def is_agent_created(skill_name: str) -> bool:
"""Whether *skill_name* is neither bundled nor hub-installed."""
off_limits = _read_bundled_manifest_names() | _read_hub_installed_names()
return skill_name not in off_limits
# ---------------------------------------------------------------------------
# Sidecar I/O
# ---------------------------------------------------------------------------
def _empty_record() -> Dict[str, Any]:
return {
"use_count": 0,
"view_count": 0,
"last_used_at": None,
"last_viewed_at": None,
"patch_count": 0,
"last_patched_at": None,
"created_at": _now_iso(),
"state": STATE_ACTIVE,
"pinned": False,
"archived_at": None,
}
def load_usage() -> Dict[str, Dict[str, Any]]:
"""Read the entire .usage.json map. Returns empty dict on missing/corrupt."""
path = _usage_file()
if not path.exists():
return {}
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as e:
logger.debug("Failed to read %s: %s", path, e)
return {}
if not isinstance(data, dict):
return {}
# Defensive: coerce any non-dict values to a fresh empty record
clean: Dict[str, Dict[str, Any]] = {}
for k, v in data.items():
if isinstance(v, dict):
clean[str(k)] = v
return clean
def save_usage(data: Dict[str, Dict[str, Any]]) -> None:
"""Write the usage map atomically. Best-effort — errors are logged, not raised."""
path = _usage_file()
try:
path.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(
dir=str(path.parent), prefix=".usage_", suffix=".tmp"
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, sort_keys=True, ensure_ascii=False)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, path)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to write %s: %s", path, e, exc_info=True)
def get_record(skill_name: str) -> Dict[str, Any]:
"""Return the record for *skill_name*, creating a fresh one if missing."""
data = load_usage()
rec = data.get(skill_name)
if not isinstance(rec, dict):
return _empty_record()
# Backfill any missing keys so callers don't need to handle old files
base = _empty_record()
for k, v in base.items():
rec.setdefault(k, v)
return rec
def _mutate(skill_name: str, mutator) -> None:
"""Load, apply *mutator(record)* in place, save. Best-effort.
Bundled and hub-installed skills are NEVER recorded in the sidecar.
This keeps .usage.json focused on agent-created skills (the only ones
the curator considers) and prevents stale counters from hanging around
for upstream-managed skills.
"""
if not skill_name:
return
try:
if not is_agent_created(skill_name):
return
data = load_usage()
rec = data.get(skill_name)
if not isinstance(rec, dict):
rec = _empty_record()
mutator(rec)
data[skill_name] = rec
save_usage(data)
except Exception as e:
logger.debug("skill_usage._mutate(%s) failed: %s", skill_name, e, exc_info=True)
# ---------------------------------------------------------------------------
# Public counter-bump helpers
# ---------------------------------------------------------------------------
def bump_view(skill_name: str) -> None:
"""Bump view_count and last_viewed_at. Called from skill_view()."""
def _apply(rec: Dict[str, Any]) -> None:
rec["view_count"] = int(rec.get("view_count") or 0) + 1
rec["last_viewed_at"] = _now_iso()
_mutate(skill_name, _apply)
def bump_use(skill_name: str) -> None:
"""Bump use_count and last_used_at. Called when a skill is actively used
(e.g. loaded into the prompt path or referenced from an assistant turn)."""
def _apply(rec: Dict[str, Any]) -> None:
rec["use_count"] = int(rec.get("use_count") or 0) + 1
rec["last_used_at"] = _now_iso()
_mutate(skill_name, _apply)
def bump_patch(skill_name: str) -> None:
"""Bump patch_count and last_patched_at. Called from skill_manage (patch/edit)."""
def _apply(rec: Dict[str, Any]) -> None:
rec["patch_count"] = int(rec.get("patch_count") or 0) + 1
rec["last_patched_at"] = _now_iso()
_mutate(skill_name, _apply)
def set_state(skill_name: str, state: str) -> None:
"""Set lifecycle state. No-op if *state* is invalid."""
if state not in _VALID_STATES:
logger.debug("set_state: invalid state %r for %s", state, skill_name)
return
def _apply(rec: Dict[str, Any]) -> None:
rec["state"] = state
if state == STATE_ARCHIVED:
rec["archived_at"] = _now_iso()
elif state == STATE_ACTIVE:
rec["archived_at"] = None
_mutate(skill_name, _apply)
def set_pinned(skill_name: str, pinned: bool) -> None:
def _apply(rec: Dict[str, Any]) -> None:
rec["pinned"] = bool(pinned)
_mutate(skill_name, _apply)
def forget(skill_name: str) -> None:
"""Drop a skill's usage entry entirely. Called when the skill is deleted."""
if not skill_name:
return
try:
data = load_usage()
if skill_name in data:
del data[skill_name]
save_usage(data)
except Exception as e:
logger.debug("skill_usage.forget(%s) failed: %s", skill_name, e, exc_info=True)
# ---------------------------------------------------------------------------
# Archive / restore
# ---------------------------------------------------------------------------
def archive_skill(skill_name: str) -> Tuple[bool, str]:
"""Move an agent-created skill directory to ~/.hermes/skills/.archive/.
Returns (ok, message). Never archives bundled or hub skills callers are
responsible for checking provenance, but we double-check here as a safety net.
"""
if not is_agent_created(skill_name):
return False, f"skill '{skill_name}' is bundled or hub-installed; never archive"
skill_dir = _find_skill_dir(skill_name)
if skill_dir is None:
return False, f"skill '{skill_name}' not found"
archive_root = _archive_dir()
try:
archive_root.mkdir(parents=True, exist_ok=True)
except OSError as e:
return False, f"failed to create archive dir: {e}"
# Flatten any category nesting into a single ".archive/<skill>/" so restores
# are simple. If a collision exists, append a timestamp.
dest = archive_root / skill_dir.name
if dest.exists():
dest = archive_root / f"{skill_dir.name}-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}"
try:
skill_dir.rename(dest)
except OSError as e:
# Cross-device — fall back to shutil.move
import shutil
try:
shutil.move(str(skill_dir), str(dest))
except Exception as e2:
return False, f"failed to archive: {e2}"
set_state(skill_name, STATE_ARCHIVED)
return True, f"archived to {dest}"
def restore_skill(skill_name: str) -> Tuple[bool, str]:
"""Move an archived skill back to ~/.hermes/skills/. Restores to the flat
top-level layout; original category nesting is NOT reconstructed.
Refuses to restore under a name that now collides with a bundled or
hub-installed skill that would shadow the upstream version.
"""
# If a bundled or hub skill has since been installed under the same
# name, refuse to restore rather than shadow it.
if not is_agent_created(skill_name):
return False, (
f"skill '{skill_name}' is now bundled or hub-installed; "
"restore would shadow the upstream version"
)
archive_root = _archive_dir()
if not archive_root.exists():
return False, "no archive directory"
# Try exact name match first, then any prefix match (for timestamped dupes)
candidates = [p for p in archive_root.iterdir() if p.is_dir() and p.name == skill_name]
if not candidates:
candidates = sorted(
[p for p in archive_root.iterdir()
if p.is_dir() and p.name.startswith(f"{skill_name}-")],
reverse=True,
)
if not candidates:
return False, f"skill '{skill_name}' not found in archive"
src = candidates[0]
dest = _skills_dir() / skill_name
if dest.exists():
return False, f"destination already exists: {dest}"
try:
src.rename(dest)
except OSError:
import shutil
try:
shutil.move(str(src), str(dest))
except Exception as e:
return False, f"failed to restore: {e}"
set_state(skill_name, STATE_ACTIVE)
return True, f"restored to {dest}"
def _find_skill_dir(skill_name: str) -> Optional[Path]:
"""Locate the directory for a skill by its frontmatter `name:` field.
Handles both flat (~/.hermes/skills/<skill>/SKILL.md) and category-nested
(~/.hermes/skills/<category>/<skill>/SKILL.md) layouts.
"""
base = _skills_dir()
if not base.exists():
return None
for skill_md in base.rglob("SKILL.md"):
try:
rel = skill_md.relative_to(base)
except ValueError:
continue
if rel.parts and rel.parts[0].startswith("."):
continue
if _read_skill_name(skill_md, fallback=skill_md.parent.name) == skill_name:
return skill_md.parent
return None
# ---------------------------------------------------------------------------
# Reporting — for the curator CLI / slash command
# ---------------------------------------------------------------------------
def agent_created_report() -> List[Dict[str, Any]]:
"""Return a list of {name, state, pinned, last_used_at, use_count, ...}
records for every agent-created skill. Missing usage records are backfilled
with defaults so callers can always index fields."""
data = load_usage()
rows: List[Dict[str, Any]] = []
for name in list_agent_created_skill_names():
rec = data.get(name)
if not isinstance(rec, dict):
rec = _empty_record()
base = _empty_record()
for k, v in base.items():
rec.setdefault(k, v)
rows.append({"name": name, **rec})
return rows
+22 -3
View File
@@ -1480,13 +1480,32 @@ registry.register(
check_fn=check_skills_requirements,
emoji="📚",
)
def _skill_view_with_bump(args, **kw):
"""Invoke skill_view, then bump view_count on success. Best-effort: a
telemetry failure never breaks the tool call."""
name = args.get("name", "")
result = skill_view(
name, file_path=args.get("file_path"), task_id=kw.get("task_id")
)
try:
parsed = json.loads(result)
if isinstance(parsed, dict) and parsed.get("success"):
# Use the resolved skill name from the payload when present —
# qualified forms ("plugin:skill") return with the canonical name.
resolved = parsed.get("name") or name
if resolved:
from tools.skill_usage import bump_view
bump_view(str(resolved))
except Exception:
pass
return result
registry.register(
name="skill_view",
toolset="skills",
schema=SKILL_VIEW_SCHEMA,
handler=lambda args, **kw: skill_view(
args.get("name", ""), file_path=args.get("file_path"), task_id=kw.get("task_id")
),
handler=_skill_view_with_bump,
check_fn=check_skills_requirements,
emoji="📚",
)
+2
View File
@@ -925,6 +925,8 @@ def _get_env_config() -> Dict[str, Any]:
# /workspace and track the original host path separately. Otherwise keep the
# normal sandbox behavior and discard host paths.
cwd = os.getenv("TERMINAL_CWD", default_cwd)
if cwd:
cwd = os.path.expanduser(cwd)
host_cwd = None
host_prefixes = ("/Users/", "/home/", "C:\\", "C:/")
if env_type == "docker" and mount_docker_cwd:
+213 -103
View File
@@ -140,6 +140,7 @@ _SLASH_WORKER_TIMEOUT_S = max(
# response writes are safe.
_LONG_HANDLERS = frozenset(
{
"browser.manage",
"cli.exec",
"session.branch",
"session.resume",
@@ -3210,7 +3211,8 @@ def _(rid, params: dict) -> dict:
raw = ("" if value is None else str(value)).strip().lower()
if raw not in _INDICATOR_STYLES:
return _err(
rid, 4002,
rid,
4002,
f"unknown indicator: {raw!r}; pick one of {'|'.join(_INDICATOR_STYLES)}",
)
_write_config_key("display.tui_status_indicator", raw)
@@ -3427,6 +3429,27 @@ def _(rid, params: dict) -> dict:
return _err(rid, 5015, str(e))
@method("reload.env")
def _(rid, params: dict) -> dict:
"""Re-read ``~/.hermes/.env`` into the gateway process via
``hermes_cli.config.reload_env``, matching classic CLI's ``/reload``
handler. Newly added API keys take effect on the next agent call
without restarting the TUI.
The credential pool / provider routing for any *already-constructed*
agent does not auto-rebuild that's the same behaviour as classic
CLI's ``/reload``. Users who want a brand-new credential resolution
should follow with ``/new``.
"""
try:
from hermes_cli.config import reload_env
count = reload_env()
return _ok(rid, {"updated": int(count)})
except Exception as e:
return _err(rid, 5015, str(e))
_TUI_HIDDEN: frozenset[str] = frozenset(
{
"sethome",
@@ -4751,121 +4774,208 @@ def _resolve_browser_cdp_url() -> str:
return ""
def _is_default_local_cdp(parsed) -> bool:
"""Match the discovery-style local default; never the concrete WS form.
A user-supplied ``ws://127.0.0.1:9222/devtools/browser/<id>`` is a
real, connectable endpoint collapsing it to bare ``http://...:9222``
would strip the path and break the connect.
"""
try:
port = parsed.port or 80
except ValueError:
return False
discovery_path = parsed.path in {"", "/", "/json", "/json/version"}
return (
parsed.scheme in {"http", "ws"}
and parsed.hostname in {"127.0.0.1", "localhost"}
and port == 9222
and discovery_path
)
def _http_ok(url: str, timeout: float) -> bool:
import urllib.request
try:
with urllib.request.urlopen(url, timeout=timeout) as resp:
return 200 <= getattr(resp, "status", 200) < 300
except Exception:
return False
def _probe_urls(parsed) -> list[str]:
scheme = {"ws": "http", "wss": "https"}.get(parsed.scheme, parsed.scheme)
root = f"{scheme}://{parsed.netloc}".rstrip("/")
return [f"{root}/json/version", f"{root}/json"]
def _normalize_cdp_url(parsed) -> str:
# Concrete ``/devtools/browser/<id>`` endpoints (Browserbase et al.)
# are connectable as-is. Discovery-style inputs collapse to bare
# ``scheme://host:port`` so ``_resolve_cdp_override`` can append
# ``/json/version`` later without doubling the path.
if parsed.path.startswith("/devtools/browser/"):
return parsed.geturl()
return parsed._replace(path="", params="", query="", fragment="").geturl()
def _failure_messages(url: str, port: int, system: str) -> list[str]:
from hermes_cli.browser_connect import manual_chrome_debug_command
command = manual_chrome_debug_command(port, system)
hint = (
["Start Chrome with remote debugging, then retry /browser connect:", command]
if command
else [
"No Chrome/Chromium executable was found in this environment.",
f"Install one or start Chrome with --remote-debugging-port={port}, then retry /browser connect.",
]
)
return [
f"Chrome is not reachable at {url}.",
*hint,
"Browser not connected — start Chrome with remote debugging and retry /browser connect",
]
@method("browser.manage")
def _(rid, params: dict) -> dict:
action = params.get("action", "status")
if action == "status":
resolved_url = _resolve_browser_cdp_url()
return _ok(
rid,
{
"connected": bool(resolved_url),
"url": resolved_url,
},
)
if action == "connect":
url = params.get("url", "http://localhost:9222")
try:
import urllib.request
from urllib.parse import urlparse
from tools.browser_tool import cleanup_all_browsers
url = _resolve_browser_cdp_url()
return _ok(rid, {"connected": bool(url), "url": url})
parsed = urlparse(url if "://" in url else f"http://{url}")
if parsed.scheme not in {"http", "https", "ws", "wss"}:
return _err(rid, 4015, f"unsupported browser url: {url}")
# A concrete ``ws[s]://.../devtools/browser/<id>`` endpoint is
# already directly connectable — those are the URLs Browserbase
# / browserless / hosted CDP providers return, and they
# generally DON'T serve the discovery-style ``/json/version``
# path. Probing it would just reject valid endpoints. Skip
# the HTTP probe and do a TCP-level reachability check instead;
# the actual CDP handshake happens on the next ``browser_navigate``.
is_concrete_ws = (
parsed.scheme in {"ws", "wss"}
and parsed.path.startswith("/devtools/browser/")
)
if is_concrete_ws:
import socket
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == "wss" else 80)
if not host:
return _err(rid, 4015, f"missing host in browser url: {url}")
try:
with socket.create_connection((host, port), timeout=2.0):
pass
except OSError as e:
return _err(rid, 5031, f"could not reach browser CDP at {url}: {e}")
else:
probe_root = f"{'https' if parsed.scheme == 'wss' else 'http' if parsed.scheme == 'ws' else parsed.scheme}://{parsed.netloc}"
probe_urls = [
f"{probe_root.rstrip('/')}/json/version",
f"{probe_root.rstrip('/')}/json",
]
ok = False
for probe in probe_urls:
try:
with urllib.request.urlopen(probe, timeout=2.0) as resp:
if 200 <= getattr(resp, "status", 200) < 300:
ok = True
break
except Exception:
continue
if not ok:
return _err(rid, 5031, f"could not reach browser CDP at {url}")
# Persist a normalized URL for downstream CDP resolution.
# Discovery-style inputs (`http://host:port` or
# `http://host:port/json[/version]`) collapse to bare
# ``scheme://host:port`` so ``_resolve_cdp_override`` can
# safely append ``/json/version`` without producing a
# double-discovery path like ``.../json/json/version``.
# Concrete websocket endpoints (``/devtools/browser/<id>``
# — what Browserbase and other cloud providers return)
# are preserved verbatim.
if parsed.path.startswith("/devtools/browser/"):
normalized = parsed.geturl()
else:
normalized = parsed._replace(
path="",
params="",
query="",
fragment="",
).geturl()
# Order matters: clear any cached browser sessions BEFORE
# publishing the new env var so an in-flight tool call
# observing the old supervisor is reaped first, and the
# next call freshly resolves the new URL. The previous
# ordering left a brief window where ``_ensure_cdp_supervisor``
# could re-attach to the *old* supervisor.
cleanup_all_browsers()
os.environ["BROWSER_CDP_URL"] = normalized
# Drain any further cached state that could outlive the
# cleanup pass (CDP supervisor for the default task,
# cached agent-browser timeouts, etc.) so the next
# ``browser_navigate`` definitively reaches ``normalized``.
cleanup_all_browsers()
except Exception as e:
return _err(rid, 5031, str(e))
return _ok(rid, {"connected": True, "url": normalized})
if action == "disconnect":
return _browser_disconnect(rid)
if action != "connect":
return _err(rid, 4015, f"unknown action: {action}")
return _browser_connect(rid, params)
def _browser_connect(rid, params: dict) -> dict:
import platform
from hermes_cli.browser_connect import DEFAULT_BROWSER_CDP_URL
from tools.browser_tool import cleanup_all_browsers
from urllib.parse import urlparse
raw_url = params.get("url")
if raw_url is not None and not isinstance(raw_url, str):
return _err(rid, 4015, f"browser url must be a string, got {type(raw_url).__name__}")
url = (raw_url or "").strip() or DEFAULT_BROWSER_CDP_URL
sid = params.get("session_id") or ""
system = platform.system()
messages: list[str] = []
def announce(message: str, *, level: str = "info") -> None:
messages.append(message)
# Without a session id the TUI prints `messages` from the
# response; emitting an event would double-render. Only stream
# progress when there's a real session to scope it to.
if sid:
_emit("browser.progress", sid, {"message": message, "level": level})
parsed = urlparse(url if "://" in url else f"http://{url}")
if parsed.scheme not in {"http", "https", "ws", "wss"}:
return _err(rid, 4015, f"unsupported browser url: {url}")
if not parsed.hostname:
return _err(rid, 4015, f"missing host in browser url: {url}")
try:
port = parsed.port or (443 if parsed.scheme in {"https", "wss"} else 80)
except ValueError:
return _err(rid, 4015, f"invalid port in browser url: {url}")
# Always normalize default-local to 127.0.0.1:9222 so downstream
# comparisons + messaging match what we'll actually persist.
if _is_default_local_cdp(parsed):
url = DEFAULT_BROWSER_CDP_URL
parsed = urlparse(url)
port = parsed.port or 9222
try:
# ws[s]://.../devtools/browser/<id> endpoints (hosted CDP
# providers) don't serve the HTTP discovery path; just check
# TCP-level reachability and let browser_navigate handshake.
if parsed.scheme in {"ws", "wss"} and parsed.path.startswith(
"/devtools/browser/"
):
import socket
try:
with socket.create_connection((parsed.hostname, port), timeout=2.0):
pass
except OSError as e:
return _err(rid, 5031, f"could not reach browser CDP at {url}: {e}")
else:
probes = _probe_urls(parsed)
ok = any(_http_ok(p, timeout=2.0) for p in probes)
if not ok and _is_default_local_cdp(parsed):
from hermes_cli.browser_connect import try_launch_chrome_debug
announce(
"Chrome isn't running with remote debugging — attempting to launch..."
)
if try_launch_chrome_debug(port, system):
for _ in range(20):
time.sleep(0.5)
if any(_http_ok(p, timeout=1.0) for p in probes):
ok = True
break
if ok:
announce(f"Chrome launched and listening on port {port}")
else:
for line in _failure_messages(url, port, system)[1:]:
announce(line, level="error")
return _ok(
rid, {"connected": False, "url": url, "messages": messages}
)
elif not ok:
return _err(rid, 5031, f"could not reach browser CDP at {url}")
elif _is_default_local_cdp(parsed):
announce(f"Chrome is already listening on port {port}")
normalized = _normalize_cdp_url(parsed)
# Order matters: reap sessions BEFORE publishing the new env
# so an in-flight tool call sees the old supervisor closed,
# then again AFTER so the default task's cached supervisor
# is drained against the new URL.
cleanup_all_browsers()
os.environ["BROWSER_CDP_URL"] = normalized
cleanup_all_browsers()
except Exception as e:
return _err(rid, 5031, str(e))
payload: dict[str, object] = {"connected": True, "url": normalized}
if messages:
payload["messages"] = messages
return _ok(rid, payload)
def _browser_disconnect(rid) -> dict:
# Reap, drop the env override, reap again — closes the same swap
# window covered by ``_browser_connect``.
def reap() -> None:
try:
from tools.browser_tool import cleanup_all_browsers
cleanup_all_browsers()
except Exception:
pass
os.environ.pop("BROWSER_CDP_URL", None)
try:
from tools.browser_tool import cleanup_all_browsers as _again
_again()
except Exception:
pass
return _ok(rid, {"connected": False})
return _err(rid, 4015, f"unknown action: {action}")
reap()
os.environ.pop("BROWSER_CDP_URL", None)
reap()
return _ok(rid, {"connected": False})
@method("plugins.list")
@@ -293,6 +293,19 @@ describe('createGatewayEventHandler', () => {
expect(appended[1]).toMatchObject({ role: 'assistant', text: 'final answer' })
})
it('renders browser.progress events as system transcript lines as they stream in', () => {
const appended: Msg[] = []
const ctx = buildCtx(appended)
const handler = createGatewayEventHandler(ctx)
handler({
payload: { message: 'Chrome launched and listening on port 9222' },
type: 'browser.progress'
} as any)
expect(ctx.system.sys).toHaveBeenCalledWith('Chrome launched and listening on port 9222')
})
it('annotates gateway.start_timeout with stderr tail lines so users can diagnose without /logs', () => {
const appended: Msg[] = []
const onEvent = createGatewayEventHandler(buildCtx(appended))
@@ -191,8 +191,10 @@ describe('createSlashHandler', () => {
})
it.each([
['/browser status', 'browser.manage', { action: 'status' }],
['/browser status', 'browser.manage', { action: 'status', session_id: null }],
['/browser connect', 'browser.manage', { action: 'connect', session_id: null, url: 'http://127.0.0.1:9222' }],
['/reload-mcp', 'reload.mcp', { session_id: null }],
['/reload', 'reload.env', {}],
['/stop', 'process.stop', {}],
['/fast status', 'config.get', { key: 'fast', session_id: null }],
['/busy status', 'config.get', { key: 'busy' }],
@@ -206,6 +208,34 @@ describe('createSlashHandler', () => {
expect(ctx.gateway.gw.request).not.toHaveBeenCalled()
})
it('renders browser connect progress messages from the gateway', async () => {
const rpc = vi.fn(() =>
Promise.resolve({
connected: false,
messages: [
"Chrome isn't running with remote debugging — attempting to launch...",
'Browser not connected — start Chrome with remote debugging and retry /browser connect'
],
url: 'http://127.0.0.1:9222'
})
)
const ctx = buildCtx({ gateway: { ...buildGateway(), rpc } })
expect(createSlashHandler(ctx)('/browser connect')).toBe(true)
expect(ctx.transcript.sys).toHaveBeenCalledWith('checking Chrome remote debugging at http://127.0.0.1:9222...')
await vi.waitFor(() => {
expect(ctx.transcript.sys).toHaveBeenCalledWith(
"Chrome isn't running with remote debugging — attempting to launch..."
)
expect(ctx.transcript.sys).toHaveBeenCalledWith(
'Browser not connected — start Chrome with remote debugging and retry /browser connect'
)
expect(ctx.transcript.sys).not.toHaveBeenCalledWith('browser connect failed')
})
})
it('routes /rollback through native RPC when a session is active', () => {
patchUiState({ sid: 'sid-abc' })
const rpc = vi.fn(() => Promise.resolve({}))
@@ -307,6 +307,16 @@ export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev:
return
}
case 'browser.progress': {
const message = String(ev.payload?.message ?? '').trim()
if (message) {
sys(message)
}
return
}
case 'voice.status': {
// Continuous VAD loop reports its internal state so the status bar
// can show listening / transcribing / idle without polling.
+39 -18
View File
@@ -2,6 +2,7 @@ import type {
BrowserManageResponse,
DelegationPauseResponse,
ProcessStopResponse,
ReloadEnvResponse,
ReloadMcpResponse,
RollbackDiffResponse,
RollbackListResponse,
@@ -89,13 +90,30 @@ export const opsCommands: SlashCommand[] = [
}
},
{
help: 're-read ~/.hermes/.env into the running gateway (CLI parity)',
name: 'reload',
run: (_arg, ctx) => {
ctx.gateway
.rpc<ReloadEnvResponse>('reload.env', {})
.then(
ctx.guarded<ReloadEnvResponse>(r => {
const n = Number(r.updated ?? 0)
const noun = n === 1 ? 'var' : 'vars'
ctx.transcript.sys(`reloaded .env (${n} ${noun} updated)`)
})
)
.catch(ctx.guardedErr)
}
},
{
help: 'manage browser CDP connection [connect|disconnect|status]',
name: 'browser',
run: (arg, ctx) => {
const trimmed = arg.trim()
const [rawAction, ...rest] = trimmed ? trimmed.split(/\s+/) : ['status']
const action = (rawAction || 'status').toLowerCase()
const [rawAction = 'status', ...rest] = arg.trim().split(/\s+/).filter(Boolean)
const action = rawAction.toLowerCase()
if (!['connect', 'disconnect', 'status'].includes(action)) {
return ctx.transcript.sys(
@@ -103,17 +121,23 @@ export const opsCommands: SlashCommand[] = [
)
}
const payload: Record<string, unknown> = { action }
const requested = rest.join(' ').trim()
const sid = ctx.sid ?? null
const url = action === 'connect' ? rest.join(' ').trim() || 'http://127.0.0.1:9222' : undefined
if (action === 'connect') {
payload.url = requested || 'http://localhost:9222'
if (url) {
ctx.transcript.sys(`checking Chrome remote debugging at ${url}...`)
}
ctx.gateway
.rpc<BrowserManageResponse>('browser.manage', payload)
.rpc<BrowserManageResponse>('browser.manage', { action, session_id: sid, ...(url && { url }) })
.then(
ctx.guarded<BrowserManageResponse>(r => {
// Without a session we can't subscribe to streamed
// browser.progress events, so flush the bundled list.
if (!sid) {
r.messages?.forEach(message => ctx.transcript.sys(message))
}
if (action === 'status') {
return ctx.transcript.sys(
r.connected
@@ -122,18 +146,15 @@ export const opsCommands: SlashCommand[] = [
)
}
if (action === 'connect') {
if (r.connected) {
ctx.transcript.sys(`browser connected: ${r.url || '(url unavailable)'}`)
ctx.transcript.sys('next browser tool call will use this CDP endpoint')
return
}
return ctx.transcript.sys('browser connect failed')
if (action === 'disconnect') {
return ctx.transcript.sys('browser disconnected')
}
ctx.transcript.sys('browser disconnected')
if (r.connected) {
ctx.transcript.sys('Browser connected to live Chrome via CDP')
ctx.transcript.sys(`Endpoint: ${r.url || '(url unavailable)'}`)
ctx.transcript.sys('next browser tool call will use this CDP endpoint')
}
})
)
.catch(ctx.guardedErr)
+10
View File
@@ -308,12 +308,17 @@ export interface ReloadMcpResponse {
status?: string
}
export interface ReloadEnvResponse {
updated?: number
}
export interface ProcessStopResponse {
killed?: number
}
export interface BrowserManageResponse {
connected?: boolean
messages?: string[]
url?: string
}
@@ -432,6 +437,11 @@ export type GatewayEvent =
| { payload?: { state?: 'idle' | 'listening' | 'transcribing' }; session_id?: string; type: 'voice.status' }
| { payload?: { no_speech_limit?: boolean; text?: string }; session_id?: string; type: 'voice.transcript' }
| { payload: { line: string }; session_id?: string; type: 'gateway.stderr' }
| {
payload?: { level?: 'info' | 'warn' | 'error'; message?: string }
session_id?: string
type: 'browser.progress'
}
| {
payload?: { cwd?: string; python?: string; stderr_tail?: string }
session_id?: string