Compare commits

..

23 Commits

Author SHA1 Message Date
Sam Herring 545809d09b Tau2 bench changes 2026-04-06 15:30:54 -07:00
Sam Herring c32efc2885 Initial taubench implementation 2026-04-02 09:23:42 -07:00
Dean Kerr e905768ffd fix(gateway): remap HERMES_HOME to target user in system service unit
When `sudo hermes gateway install --system --run-as-user <user>` generates
the systemd unit, get_hermes_home() resolves to /root/.hermes because
Path.home() returns root's home under sudo. The unit correctly sets
HOME= and User= via _system_service_identity(), but HERMES_HOME was
computed independently and pointed to root's config directory.

Add _hermes_home_for_target_user() which remaps the current HERMES_HOME
to the equivalent path under the target user's home. This handles:
- Default ~/.hermes → target user's ~/.hermes
- Profiles (e.g. ~/.hermes/profiles/coder) → preserves relative structure
- Custom paths (e.g. /opt/hermes) → kept as-is

Supersedes #3861 which only handled the default case and left profiles
broken (also flagged by Copilot review).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-01 06:09:33 -07:00
Teknium e0abf2416d fix: restore _config_version to 11 (reverted by stale-branch merge in #4419) (#4440)
PR #4419 was based on pre-credential-pools main where _config_version was 10.
The squash merge downgraded it from 11 (set by #2647) back to 10.
Also fixes the test assertion.
2026-04-01 04:34:04 -07:00
Teknium f6ada27d1c feat(skills): size limits for agent writes + fuzzy matching for patch (#4414)
* feat(skills): add content size limits for agent-created skills

Agent writes via skill_manage (create/edit/patch/write_file) are now
constrained to prevent unbounded growth:

- SKILL.md and supporting files: 100,000 character limit
- Supporting files: additional 1 MiB byte limit
- Patches on oversized hand-placed skills that reduce the size are
  allowed (shrink path), but patches that grow beyond the limit are
  rejected

Hand-placed skills and hub-installed skills have NO hard limit —
they load and function normally regardless of size. Hub installs
get a warning in the log if SKILL.md exceeds 100k chars.

This mirrors the memory system's char_limit pattern. Without this,
the agent auto-grows skills indefinitely through iterative patches
(hermes-agent-dev reached 197k chars / 72k tokens — 40x larger than
the largest skill in the entire skills.sh ecosystem).

Constants: MAX_SKILL_CONTENT_CHARS (100k), MAX_SKILL_FILE_BYTES (1MiB)
Tests: 14 new tests covering all write paths and edge cases

* feat(skills): add fuzzy matching to skill patch

_patch_skill now uses the same 8-strategy fuzzy matching engine
(tools/fuzzy_match.py) as the file patch tool. Handles whitespace
normalization, indentation differences, escape sequences, and
block-anchor matching. Eliminates exact-match failures when agents
patch skills with minor formatting mismatches.
2026-04-01 04:19:19 -07:00
Teknium 70744add15 feat(browser): add persistent Camofox sessions and VNC URL discovery (salvage #4400) (#4419)
Adds two Camofox features:

1. Persistent browser sessions: new `browser.camofox.managed_persistence`
   config option. When enabled, Hermes sends a deterministic profile-scoped
   userId to Camofox so the server maps it to a persistent browser profile
   directory. Cookies, logins, and browser state survive across restarts.
   Default remains ephemeral (random userId per session).

2. VNC URL discovery: Camofox /health endpoint returns vncPort when running
   in headed mode. Hermes constructs the VNC URL and includes it in navigate
   responses so the agent can share it with users.

Also fixes camofox_vision bug where call_llm response object was passed
directly to json.dumps instead of extracting .choices[0].message.content.

Changes from original PR:
- Removed browser_evaluate tool (separate feature, needs own PR)
- Removed snapshot truncation limit change (unrelated)
- Config.yaml only for managed_persistence (no env var, no version bump)
- Rewrote tests to use config mock instead of env var
- Reverted package-lock.json churn

Co-authored-by: analista <psikonetik@gmail.com.com>
2026-04-01 04:18:50 -07:00
Teknium 85e96a4638 fix(skills): move unified hermes-agent skill into autonomous-ai-agents category (#4435)
The unified skill from PR #4332 was placed at a top-level
skills/hermes-agent/ directory, creating a redundant standalone
category. Move it to skills/autonomous-ai-agents/hermes-agent/
alongside claude-code, codex, and opencode where it belongs.
2026-04-01 03:39:25 -07:00
Teknium c9dc6c4749 fix(insights): show cache tokens in overview so total adds up (#4428)
The total_tokens field includes cache_read + cache_write tokens, but
the display only showed input + output — making the math look wrong
(e.g. 765K + 134K displayed but total said 9.2M). Now shows a cache
line when cache tokens are present so all visible numbers sum to the
displayed total.

Affects both terminal (hermes insights) and gateway (/insights)
formats.
2026-04-01 03:06:47 -07:00
kshitijk4poor 935137f0d9 feat: add inline diff previews for write actions
Show inline diffs in the CLI transcript when write_file, patch, or
skill_manage modifies files. Captures a filesystem snapshot before the
tool runs, computes a unified diff after, and renders it with ANSI
coloring in the activity feed.

Adds tool_start_callback and tool_complete_callback hooks to AIAgent
for pre/post tool execution notifications.

Also fixes _extract_parallel_scope_path to normalize relative paths
to absolute, preventing the parallel overlap detection from missing
conflicts when the same file is referenced with different path styles.

Gated by display.inline_diffs config option (default: true).

Based on PR #3774 by @kshitijk4poor.
2026-04-01 02:13:57 -07:00
Teknium 68fc4aec21 fix: comprehensive default profile export exclusions and import guard
- Add _DEFAULT_EXPORT_EXCLUDE_ROOT constant with 25+ entries to exclude
  from default profile exports: repo checkout (hermes-agent), worktrees,
  databases (state.db), caches, runtime state, logs, binaries
- Add _default_export_ignore() with root-level and universal exclusions
  (__pycache__, *.sock, *.tmp at any depth)
- Remove redundant shutil/tempfile imports from contributor's if-block
- Block import_profile() from accepting 'default' as target name with
  clear guidance to use --name
- Add 7 tests covering: archive creation, inclusion of profile data,
  exclusion of infrastructure, nested __pycache__ exclusion, import
  rejection without --name, import rejection with --name default,
  full export-import roundtrip with a different name

Addresses review feedback on PR #4370.
2026-04-01 01:43:51 -07:00
Devorun f04977f45a fix(cli): support exporting the default root profile (#4366) 2026-04-01 01:43:51 -07:00
Teknium 996250d178 fix(cli): pin entire TUI to bottom of terminal on startup (#4412)
Replace the per-response padding from PR #4359 (which created a void
between short responses and the prompt) with a one-time initial scroll
at session start.  Prints terminal_height newlines before the banner so
the cursor starts at the bottom row — banner, responses, and prompt all
appear pinned to the bottom with empty space above, not below.

patch_stdout naturally keeps the prompt at the bottom from there, so
no per-response padding is needed.
2026-04-01 01:41:09 -07:00
Bartok9 afa75a6185 fix(client): handle is_closed as method in OpenAI SDK
The openai SDK's SyncAPIClient.is_closed is a method, not a property.
getattr(client, 'is_closed', False) returned the bound method object,
which is always truthy — causing _is_openai_client_closed() to report
all clients as closed and triggering unnecessary client recreation
(~100-200ms TCP+TLS overhead per API call).

Fix: check if is_closed is callable and call it, otherwise treat as bool.

Fixes #4377
Co-authored-by: Bartok9 <Bartok9@users.noreply.github.com>
2026-04-01 01:40:43 -07:00
Nick 9a581bba50 fix(gateway): resume agent after /approve executes blocked command
When a dangerous command was blocked and the user approved it via /approve,
the command was executed but the agent loop had already exited — the agent
never received the command output and the task died silently.

Now _handle_approve_command sends immediate feedback to the user, then
creates a synthetic continuation message with the command output and feeds
it through _handle_message so the agent picks up where it left off.

- Send command result to chat immediately via adapter.send()
- Create synthetic MessageEvent with command + output as context
- Spawn asyncio task to re-invoke agent via _handle_message
- Return None (feedback already sent directly)
- Add test for agent re-invocation after approval
- Update existing approval tests for new return behavior
2026-04-01 01:38:55 -07:00
Smyile 8327f7cc61 fix(docs): use compound selector instead of media query
Target the exact state that breaks: when .navbar-sidebar--show is active
on the same <nav> element. This preserves the blur on mobile when the
sidebar is closed, and only removes it when the sidebar is open.
2026-04-01 01:14:39 -07:00
Smyile 7baee0b023 fix(docs): restrict backdrop-filter to desktop to fix mobile sidebar
backdrop-filter on .navbar creates a new CSS stacking context that
hides .navbar-sidebar menu content on mobile (only the close button
is visible). Scope the blur effect to min-width: 997px so it only
applies on desktop where the sidebar is not rendered inside the navbar.

Ref: facebook/docusaurus#6996, facebook/docusaurus#6853
2026-04-01 01:14:39 -07:00
Teknium efa327a998 fix: add missing provider attrs to cli_obj test fixture
_show_status() now references self.provider and self._provider_source,
added after the original PR was submitted.
2026-04-01 01:12:23 -07:00
Johannnnn506 9b99ea176e fix(cli): initialize ctx_len before compact banner path 2026-04-01 01:12:23 -07:00
Teknium a7f7e87070 fix: preserve credential_pool through smart routing and defer eager fallback on 429 (#4361)
Three bugs prevented credential pool rotation from working when multiple
Codex OAuth tokens were configured:

1. credential_pool was dropped during smart model turn routing.
   resolve_turn_route() constructed runtime dicts without it, so the
   AIAgent was created without pool access. Fixed in smart_model_routing.py
   (no-route and fallback paths), cli.py, and gateway/run.py.

2. Eager fallback fired before pool rotation on 429. The rate-limit
   handler at line ~7180 switched to a fallback provider immediately,
   before _recover_with_credential_pool got a chance to rotate to the
   next credential. Now deferred when the pool still has credentials.

3. (Non-issue) Retry budget was reported as too small, but successful
   pool rotations already skip retry_count increment — no change needed.

Reported by community member Schinsly who identified all three root
causes and verified the fix locally with multiple Codex accounts.
2026-04-01 01:02:34 -07:00
Teknium ef2ae3e48f fix(file_tools): refresh staleness timestamp after writes (#4390)
After a successful write_file or patch, update the stored read
timestamp to match the file's new modification time.  Without this,
consecutive edits by the same task (read → write → write) would
false-warn on the second write because the stored timestamp still
reflected the original read, not the first write.

Also renames the internal tracker key from 'file_mtimes' to
'read_timestamps' for clarity.
2026-04-01 00:50:08 -07:00
SHL0MS 83dec2b3ec fix: skip empty/whitespace text in Telegram send to prevent 400 errors
Telegram API returns HTTP 400 when sent whitespace-only or empty
text. Add a guard at the top of send() to silently succeed on
blank content instead of crashing.

Equivalent to OpenClaw #56620.
2026-03-31 19:10:26 -07:00
Laura Batalha f4d44c777b feat(discord): only create threads and reactions for authorized users 2026-03-31 19:06:46 -07:00
Teknium 0a6d366327 fix(security): redact secrets from execute_code sandbox output
* fix: root-level provider in config.yaml no longer overrides model.provider

load_cli_config() had a priority inversion: a stale root-level
'provider' key in config.yaml would OVERRIDE the canonical
'model.provider' set by 'hermes model'. The gateway reads
model.provider directly from YAML and worked correctly, but
'hermes chat -q' and the interactive CLI went through the merge
logic and picked up the stale root-level key.

Fix: root-level provider/base_url are now only used as a fallback
when model.provider/model.base_url is not set (never as an override).

Also added _normalize_root_model_keys() to config.py load_config()
and save_config() — migrates root-level provider/base_url into the
model section and removes the root-level keys permanently.

Reported by (≧▽≦) in Discord: opencode-go provider persisted as a
root-level key and overrode the correct model.provider=openrouter,
causing 401 errors.

* fix(security): redact secrets from execute_code sandbox output

The execute_code sandbox stripped env vars with secret-like names from
the child process (preventing os.environ access), but scripts could
still read secrets from disk (e.g. open('~/.hermes/.env')) and print
them to stdout. The raw values entered the model context unredacted.

terminal_tool and file_tools already applied redact_sensitive_text()
to their output — execute_code was the only tool that skipped this
step. Now the same redaction runs on both stdout and stderr after
ANSI stripping.

Reported via Discord (not filed on GitHub to avoid public disclosure
of the reproduction steps).
2026-03-31 18:52:11 -07:00
39 changed files with 3128 additions and 73 deletions
+4
View File
@@ -267,6 +267,10 @@ class CredentialPool:
def has_credentials(self) -> bool:
return bool(self._entries)
def has_available(self) -> bool:
"""True if at least one entry is not currently in exhaustion cooldown."""
return bool(self._available_entries())
def entries(self) -> List[PooledCredential]:
return list(self._entries)
+313
View File
@@ -10,6 +10,9 @@ import os
import sys
import threading
import time
from dataclasses import dataclass, field
from difflib import unified_diff
from pathlib import Path
# ANSI escape codes for coloring tool failure indicators
_RED = "\033[31m"
@@ -17,6 +20,22 @@ _RESET = "\033[0m"
logger = logging.getLogger(__name__)
_ANSI_RESET = "\033[0m"
_ANSI_DIM = "\033[38;2;150;150;150m"
_ANSI_FILE = "\033[38;2;180;160;255m"
_ANSI_HUNK = "\033[38;2;120;120;140m"
_ANSI_MINUS = "\033[38;2;255;255;255;48;2;120;20;20m"
_ANSI_PLUS = "\033[38;2;255;255;255;48;2;20;90;20m"
_MAX_INLINE_DIFF_FILES = 6
_MAX_INLINE_DIFF_LINES = 80
@dataclass
class LocalEditSnapshot:
"""Pre-tool filesystem snapshot used to render diffs locally after writes."""
paths: list[Path] = field(default_factory=list)
before: dict[str, str | None] = field(default_factory=dict)
# =========================================================================
# Configurable tool preview length (0 = no limit)
# Set once at startup by CLI or gateway from display.tool_preview_length config.
@@ -218,6 +237,300 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int | None = None) -
return preview
# =========================================================================
# Inline diff previews for write actions
# =========================================================================
def _resolved_path(path: str) -> Path:
"""Resolve a possibly-relative filesystem path against the current cwd."""
candidate = Path(os.path.expanduser(path))
if candidate.is_absolute():
return candidate
return Path.cwd() / candidate
def _snapshot_text(path: Path) -> str | None:
"""Return UTF-8 file content, or None for missing/unreadable files."""
try:
return path.read_text(encoding="utf-8")
except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError):
return None
def _display_diff_path(path: Path) -> str:
"""Prefer cwd-relative paths in diffs when available."""
try:
return str(path.resolve().relative_to(Path.cwd().resolve()))
except Exception:
return str(path)
def _resolve_skill_manage_paths(args: dict) -> list[Path]:
"""Resolve skill_manage write targets to filesystem paths."""
action = args.get("action")
name = args.get("name")
if not action or not name:
return []
from tools.skill_manager_tool import _find_skill, _resolve_skill_dir
if action == "create":
skill_dir = _resolve_skill_dir(name, args.get("category"))
return [skill_dir / "SKILL.md"]
existing = _find_skill(name)
if not existing:
return []
skill_dir = Path(existing["path"])
if action in {"edit", "patch"}:
file_path = args.get("file_path")
return [skill_dir / file_path] if file_path else [skill_dir / "SKILL.md"]
if action in {"write_file", "remove_file"}:
file_path = args.get("file_path")
return [skill_dir / file_path] if file_path else []
if action == "delete":
files = [path for path in sorted(skill_dir.rglob("*")) if path.is_file()]
return files
return []
def _resolve_local_edit_paths(tool_name: str, function_args: dict | None) -> list[Path]:
"""Resolve local filesystem targets for write-capable tools."""
if not isinstance(function_args, dict):
return []
if tool_name == "write_file":
path = function_args.get("path")
return [_resolved_path(path)] if path else []
if tool_name == "patch":
path = function_args.get("path")
return [_resolved_path(path)] if path else []
if tool_name == "skill_manage":
return _resolve_skill_manage_paths(function_args)
return []
def capture_local_edit_snapshot(tool_name: str, function_args: dict | None) -> LocalEditSnapshot | None:
"""Capture before-state for local write previews."""
paths = _resolve_local_edit_paths(tool_name, function_args)
if not paths:
return None
snapshot = LocalEditSnapshot(paths=paths)
for path in paths:
snapshot.before[str(path)] = _snapshot_text(path)
return snapshot
def _result_succeeded(result: str | None) -> bool:
"""Conservatively detect whether a tool result represents success."""
if not result:
return False
try:
data = json.loads(result)
except (json.JSONDecodeError, TypeError):
return False
if not isinstance(data, dict):
return False
if data.get("error"):
return False
if "success" in data:
return bool(data.get("success"))
return True
def _diff_from_snapshot(snapshot: LocalEditSnapshot | None) -> str | None:
"""Generate unified diff text from a stored before-state and current files."""
if not snapshot:
return None
chunks: list[str] = []
for path in snapshot.paths:
before = snapshot.before.get(str(path))
after = _snapshot_text(path)
if before == after:
continue
display_path = _display_diff_path(path)
diff = "".join(
unified_diff(
[] if before is None else before.splitlines(keepends=True),
[] if after is None else after.splitlines(keepends=True),
fromfile=f"a/{display_path}",
tofile=f"b/{display_path}",
)
)
if diff:
chunks.append(diff)
if not chunks:
return None
return "".join(chunk if chunk.endswith("\n") else chunk + "\n" for chunk in chunks)
def extract_edit_diff(
tool_name: str,
result: str | None,
*,
function_args: dict | None = None,
snapshot: LocalEditSnapshot | None = None,
) -> str | None:
"""Extract a unified diff from a file-edit tool result."""
if tool_name == "patch" and result:
try:
data = json.loads(result)
except (json.JSONDecodeError, TypeError):
data = None
if isinstance(data, dict):
diff = data.get("diff")
if isinstance(diff, str) and diff.strip():
return diff
if tool_name not in {"write_file", "patch", "skill_manage"}:
return None
if not _result_succeeded(result):
return None
return _diff_from_snapshot(snapshot)
def _emit_inline_diff(diff_text: str, print_fn) -> bool:
"""Emit rendered diff text through the CLI's prompt_toolkit-safe printer."""
if print_fn is None or not diff_text:
return False
try:
print_fn(" ┊ review diff")
for line in diff_text.rstrip("\n").splitlines():
print_fn(line)
return True
except Exception:
return False
def _render_inline_unified_diff(diff: str) -> list[str]:
"""Render unified diff lines in Hermes' inline transcript style."""
rendered: list[str] = []
from_file = None
to_file = None
for raw_line in diff.splitlines():
if raw_line.startswith("--- "):
from_file = raw_line[4:].strip()
continue
if raw_line.startswith("+++ "):
to_file = raw_line[4:].strip()
if from_file or to_file:
rendered.append(f"{_ANSI_FILE}{from_file or 'a/?'}{to_file or 'b/?'}{_ANSI_RESET}")
continue
if raw_line.startswith("@@"):
rendered.append(f"{_ANSI_HUNK}{raw_line}{_ANSI_RESET}")
continue
if raw_line.startswith("-"):
rendered.append(f"{_ANSI_MINUS}{raw_line}{_ANSI_RESET}")
continue
if raw_line.startswith("+"):
rendered.append(f"{_ANSI_PLUS}{raw_line}{_ANSI_RESET}")
continue
if raw_line.startswith(" "):
rendered.append(f"{_ANSI_DIM}{raw_line}{_ANSI_RESET}")
continue
if raw_line:
rendered.append(raw_line)
return rendered
def _split_unified_diff_sections(diff: str) -> list[str]:
"""Split a unified diff into per-file sections."""
sections: list[list[str]] = []
current: list[str] = []
for line in diff.splitlines():
if line.startswith("--- ") and current:
sections.append(current)
current = [line]
continue
current.append(line)
if current:
sections.append(current)
return ["\n".join(section) for section in sections if section]
def _summarize_rendered_diff_sections(
diff: str,
*,
max_files: int = _MAX_INLINE_DIFF_FILES,
max_lines: int = _MAX_INLINE_DIFF_LINES,
) -> list[str]:
"""Render diff sections while capping file count and total line count."""
sections = _split_unified_diff_sections(diff)
rendered: list[str] = []
omitted_files = 0
omitted_lines = 0
for idx, section in enumerate(sections):
if idx >= max_files:
omitted_files += 1
omitted_lines += len(_render_inline_unified_diff(section))
continue
section_lines = _render_inline_unified_diff(section)
remaining_budget = max_lines - len(rendered)
if remaining_budget <= 0:
omitted_lines += len(section_lines)
omitted_files += 1
continue
if len(section_lines) <= remaining_budget:
rendered.extend(section_lines)
continue
rendered.extend(section_lines[:remaining_budget])
omitted_lines += len(section_lines) - remaining_budget
omitted_files += 1 + max(0, len(sections) - idx - 1)
for leftover in sections[idx + 1:]:
omitted_lines += len(_render_inline_unified_diff(leftover))
break
if omitted_files or omitted_lines:
summary = f"… omitted {omitted_lines} diff line(s)"
if omitted_files:
summary += f" across {omitted_files} additional file(s)/section(s)"
rendered.append(f"{_ANSI_HUNK}{summary}{_ANSI_RESET}")
return rendered
def render_edit_diff_with_delta(
tool_name: str,
result: str | None,
*,
function_args: dict | None = None,
snapshot: LocalEditSnapshot | None = None,
print_fn=None,
) -> bool:
"""Render an edit diff inline without taking over the terminal UI."""
diff = extract_edit_diff(
tool_name,
result,
function_args=function_args,
snapshot=snapshot,
)
if not diff:
return False
try:
rendered_lines = _summarize_rendered_diff_sections(diff)
except Exception as exc:
logger.debug("Could not render inline diff: %s", exc)
return False
return _emit_inline_diff("\n".join(rendered_lines), print_fn)
# =========================================================================
# KawaiiSpinner
# =========================================================================
+8 -1
View File
@@ -644,6 +644,9 @@ class InsightsEngine:
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
cache_total = o.get("total_cache_read_tokens", 0) + o.get("total_cache_write_tokens", 0)
if cache_total > 0:
lines.append(f" Cache read: {o['total_cache_read_tokens']:<12,} Cache write: {o['total_cache_write_tokens']:,}")
cost_str = f"${o['estimated_cost']:.2f}"
if o.get("models_without_pricing"):
cost_str += " *"
@@ -746,7 +749,11 @@ class InsightsEngine:
# Overview
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
cache_total = o.get("total_cache_read_tokens", 0) + o.get("total_cache_write_tokens", 0)
if cache_total > 0:
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,} / cache: {cache_total:,})")
else:
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
cost_note = ""
if o.get("models_without_pricing"):
cost_note = " _(excludes custom/self-hosted models)_"
+2
View File
@@ -127,6 +127,7 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
"api_mode": primary.get("api_mode"),
"command": primary.get("command"),
"args": list(primary.get("args") or []),
"credential_pool": primary.get("credential_pool"),
},
"label": None,
"signature": (
@@ -162,6 +163,7 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
"api_mode": primary.get("api_mode"),
"command": primary.get("command"),
"args": list(primary.get("args") or []),
"credential_pool": primary.get("credential_pool"),
},
"label": None,
"signature": (
+51 -17
View File
@@ -1077,12 +1077,16 @@ class HermesCLI:
# streaming: stream tokens to the terminal as they arrive (display.streaming in config.yaml)
self.streaming_enabled = CLI_CONFIG["display"].get("streaming", False)
# Inline diff previews for write actions (display.inline_diffs in config.yaml)
self._inline_diffs_enabled = CLI_CONFIG["display"].get("inline_diffs", True)
# Streaming display state
self._stream_buf = "" # Partial line buffer for line-buffered rendering
self._stream_started = False # True once first delta arrives
self._stream_box_opened = False # True once the response box header is printed
self._reasoning_stream_started = False # True once live reasoning starts streaming
self._reasoning_preview_buf = "" # Coalesce tiny reasoning chunks for [thinking] output
self._pending_edit_snapshots = {}
# Configuration - priority: CLI args > env vars > config file
# Model comes from: CLI arg or config.yaml (single source of truth).
@@ -2024,6 +2028,7 @@ class HermesCLI:
"api_mode": self.api_mode,
"command": self.acp_command,
"args": list(self.acp_args or []),
"credential_pool": getattr(self, "_credential_pool", None),
},
)
@@ -2131,6 +2136,8 @@ class HermesCLI:
checkpoint_max_snapshots=self.checkpoint_max_snapshots,
pass_session_id=self.pass_session_id,
tool_progress_callback=self._on_tool_progress,
tool_start_callback=self._on_tool_start if self._inline_diffs_enabled else None,
tool_complete_callback=self._on_tool_complete if self._inline_diffs_enabled else None,
stream_delta_callback=self._stream_delta if self.streaming_enabled else None,
tool_gen_callback=self._on_tool_gen_start if self.streaming_enabled else None,
)
@@ -2162,6 +2169,12 @@ class HermesCLI:
def show_banner(self):
"""Display the welcome banner in Claude Code style."""
self.console.clear()
# Get context length for display before branching so it remains
# available to the low-context warning logic in compact mode too.
ctx_len = None
if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'context_compressor'):
ctx_len = self.agent.context_compressor.context_length
# Auto-compact for narrow terminals — the full banner with caduceus
# + tool list needs ~80 columns minimum to render without wrapping.
@@ -2178,11 +2191,6 @@ class HermesCLI:
# Get terminal working directory (where commands will execute)
cwd = os.getenv("TERMINAL_CWD", os.getcwd())
# Get context length for display
ctx_len = None
if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'context_compressor'):
ctx_len = self.agent.context_compressor.context_length
# Build and display the banner
build_welcome_banner(
console=self.console,
@@ -5032,6 +5040,33 @@ class HermesCLI:
except Exception:
pass
def _on_tool_start(self, tool_call_id: str, function_name: str, function_args: dict):
"""Capture local before-state for write-capable tools."""
try:
from agent.display import capture_local_edit_snapshot
snapshot = capture_local_edit_snapshot(function_name, function_args)
if snapshot is not None:
self._pending_edit_snapshots[tool_call_id] = snapshot
except Exception:
logger.debug("Edit snapshot capture failed for %s", function_name, exc_info=True)
def _on_tool_complete(self, tool_call_id: str, function_name: str, function_args: dict, function_result: str):
"""Render file edits with inline diff after write-capable tools complete."""
snapshot = self._pending_edit_snapshots.pop(tool_call_id, None)
try:
from agent.display import render_edit_diff_with_delta
render_edit_diff_with_delta(
function_name,
function_result,
function_args=function_args,
snapshot=snapshot,
print_fn=_cprint,
)
except Exception:
logger.debug("Edit diff preview failed for %s", function_name, exc_info=True)
# ====================================================================
# Voice mode methods
# ====================================================================
@@ -6343,6 +6378,17 @@ class HermesCLI:
def run(self):
"""Run the interactive CLI loop with persistent input at bottom."""
# Push the entire TUI to the bottom of the terminal so the banner,
# responses, and prompt all appear pinned to the bottom — empty
# space stays above, not below. This prints enough blank lines to
# scroll the cursor to the last row before any content is rendered.
try:
_term_lines = shutil.get_terminal_size().lines
if _term_lines > 2:
print("\n" * (_term_lines - 1), end="", flush=True)
except Exception:
pass
self.show_banner()
# One-line Honcho session indicator (TTY-only, not captured by agent).
@@ -7569,18 +7615,6 @@ class HermesCLI:
self._agent_running = False
self._spinner_text = ""
# Push the input prompt toward the bottom of the
# terminal so it doesn't sit mid-screen after short
# responses. patch_stdout renders these newlines
# above the input area, creating visual separation
# and anchoring the prompt near the bottom.
try:
_pad = shutil.get_terminal_size().lines // 2
if _pad > 2:
_cprint("\n" * _pad)
except Exception:
pass
app.invalidate() # Refresh status line
# Continuous voice: auto-restart recording after agent responds.
@@ -0,0 +1,324 @@
"""
HermesAgent for tau2-bench evaluation.
Implements the tau2 HalfDuplexAgent interface using litellm with OpenRouter,
matching the inference path used across the rest of the Hermes Agent codebase.
Usage:
python environments/benchmarks/taubench/run_eval.py \\
--model anthropic/claude-sonnet-4-5 \\
--base-url openrouter \\
--env retail
"""
import json
import os
import sys
from pathlib import Path
from typing import Optional
import litellm
from pydantic import BaseModel
_repo_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from environments.tool_call_parsers import get_parser
from tau2.agent.base_agent import HalfDuplexAgent, ValidAgentInputMessage
from tau2.data_model.message import (
AssistantMessage,
Message,
MultiToolMessage,
SystemMessage,
ToolCall,
ToolMessage,
UserMessage,
)
from tau2.environment.tool import Tool
class HermesAgentState(BaseModel):
system_messages: list[SystemMessage]
messages: list
class HermesAgent(HalfDuplexAgent[HermesAgentState]):
"""
tau2 HalfDuplexAgent backed by litellm, using OpenRouter (or any
OpenAI-compatible endpoint).
Registered as "hermes_agent" in the tau2 registry by run_eval.py.
"""
SYSTEM_PROMPT = (
"You are a customer service agent that helps the user according to the "
"<policy> provided below.\n"
"In each turn you can either:\n"
"- Send a message to the user.\n"
"- Make a tool call.\n"
"You cannot do both at the same time.\n\n"
"Try to be helpful and always follow the policy. "
"Always make sure you generate valid JSON only.\n\n"
"<policy>\n{domain_policy}\n</policy>"
)
# System prompt variant for qwen3_coder tool format — tools are embedded
# directly in the system prompt as <tools> XML instead of passed via the
# OpenAI tools= parameter.
SYSTEM_PROMPT_QWEN3_CODER = (
"You are a customer service agent that helps the user according to the "
"<policy> provided below.\n"
"In each turn you can either:\n"
"- Send a message to the user.\n"
"- Make a tool call.\n"
"You cannot do both at the same time.\n\n"
"Try to be helpful and always follow the policy. "
"Always make sure you generate valid JSON only.\n\n"
"You may call one or more functions to assist with the user query.\n\n"
"You are provided with function signatures within <tools></tools> XML tags:\n"
"<tools>\n{tools_json}\n</tools>\n\n"
"<policy>\n{domain_policy}\n</policy>"
)
def __init__(
self,
tools: list[Tool],
domain_policy: str,
model: str,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
temperature: float = 0.0,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
thinking: bool = False,
tool_parser: Optional[str] = None,
):
super().__init__(tools=tools, domain_policy=domain_policy)
self.model = model
self.base_url = base_url
self.api_key = api_key
self.temperature = temperature
self.max_tokens = max_tokens
self.top_p = top_p
self.thinking = thinking
self.tool_parser = tool_parser
self._parser = get_parser(tool_parser) if tool_parser else None
# OpenRouter requires specific headers; pass them via litellm extra_headers
self._extra_headers: dict = {}
if base_url and "openrouter" in base_url.lower():
self._extra_headers = {
"HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-Title": "Hermes Agent",
}
@property
def system_prompt(self) -> str:
if self.tool_parser == "qwen3_coder" and self.tools:
tools_json = json.dumps(
[t.openai_schema for t in self.tools], indent=2, ensure_ascii=False
)
return self.SYSTEM_PROMPT_QWEN3_CODER.format(
tools_json=tools_json,
domain_policy=self.domain_policy,
)
return self.SYSTEM_PROMPT.format(domain_policy=self.domain_policy)
def get_init_state(
self, message_history: Optional[list[Message]] = None
) -> HermesAgentState:
return HermesAgentState(
system_messages=[SystemMessage(role="system", content=self.system_prompt)],
messages=list(message_history or []),
)
def generate_next_message(
self, message: ValidAgentInputMessage, state: HermesAgentState
) -> tuple[AssistantMessage, HermesAgentState]:
# Append incoming message(s) to history
if isinstance(message, MultiToolMessage):
state.messages.extend(message.tool_messages)
else:
state.messages.append(message)
# Build litellm-compatible message list
all_messages = state.system_messages + state.messages
lm_messages = [_to_litellm_message(m) for m in all_messages]
kwargs = dict(
model=self.model,
messages=lm_messages,
temperature=self.temperature,
)
if self.tools:
kwargs["tools"] = [t.openai_schema for t in self.tools]
if self.max_tokens is not None:
kwargs["max_tokens"] = self.max_tokens
if self.top_p is not None:
kwargs["top_p"] = self.top_p
# Enable thinking/reasoning mode. OpenRouter exposes this as
# `include_reasoning` for nemotron (per supported_parameters in the
# model metadata). Pass via extra_body to bypass litellm filtering.
if self.thinking:
kwargs["extra_body"] = {"include_reasoning": True}
# Only pass base_url when model doesn't already have a provider prefix
# (litellm uses either the prefix OR base_url, not both)
if self.base_url and not self.model.startswith("openrouter/"):
kwargs["base_url"] = self.base_url
if self.api_key:
kwargs["api_key"] = self.api_key
if self._extra_headers:
kwargs["extra_headers"] = self._extra_headers
response = litellm.completion(**kwargs)
assistant_msg = _litellm_response_to_assistant_message(response, parser=self._parser)
state.messages.append(assistant_msg)
return assistant_msg, state
# ---------------------------------------------------------------------------
# Conversion helpers
# ---------------------------------------------------------------------------
def _to_litellm_message(msg) -> dict:
"""Convert a tau2 message object to a litellm-compatible dict."""
if isinstance(msg, SystemMessage):
return {"role": "system", "content": msg.content or ""}
if isinstance(msg, UserMessage):
if msg.tool_calls:
# User tool calls (tau2 v2 feature — user has tools too)
return {
"role": "user",
"content": msg.content or "",
"tool_calls": [_tool_call_to_dict(tc) for tc in msg.tool_calls],
}
return {"role": "user", "content": msg.content or ""}
if isinstance(msg, AssistantMessage):
d: dict = {"role": "assistant", "content": msg.content or ""}
if msg.tool_calls:
d["tool_calls"] = [_tool_call_to_dict(tc) for tc in msg.tool_calls]
return d
if isinstance(msg, ToolMessage):
return {
"role": "tool",
"tool_call_id": msg.id,
"content": msg.content or "",
}
# Fallback
return {"role": getattr(msg, "role", "user"), "content": str(getattr(msg, "content", ""))}
def _tool_call_to_dict(tc: ToolCall) -> dict:
import json
return {
"id": tc.id or "call_0",
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments),
},
}
def _litellm_response_to_assistant_message(response, parser=None) -> AssistantMessage:
"""Convert a litellm ModelResponse to a tau2 AssistantMessage."""
import json
choice = response.choices[0]
msg = choice.message
content = msg.content or ""
tool_calls_raw = getattr(msg, "tool_calls", None)
tau2_tool_calls: Optional[list[ToolCall]] = None
if parser and content:
# Use the custom tool parser (e.g. qwen3_coder) to extract tool calls
# from the raw text response.
parsed_content, parsed_tool_calls = parser.parse(content)
if parsed_tool_calls:
content = parsed_content or ""
tau2_tool_calls = []
for tc in parsed_tool_calls:
try:
arguments = json.loads(tc.function.arguments or "{}")
except json.JSONDecodeError:
arguments = {}
tau2_tool_calls.append(
ToolCall(
id=tc.id or "call_0",
name=tc.function.name,
arguments=arguments,
requestor="assistant",
)
)
elif tool_calls_raw:
tau2_tool_calls = []
for tc in tool_calls_raw:
if hasattr(tc, "function"):
name = tc.function.name
try:
arguments = json.loads(tc.function.arguments or "{}")
except json.JSONDecodeError:
arguments = {}
tau2_tool_calls.append(
ToolCall(
id=tc.id or "call_0",
name=name,
arguments=arguments,
requestor="assistant",
)
)
cost = None
try:
cost = litellm.completion_cost(response)
except Exception:
pass
usage = None
if hasattr(response, "usage") and response.usage:
usage = dict(response.usage)
return AssistantMessage(
role="assistant",
content=content if not tau2_tool_calls else None,
tool_calls=tau2_tool_calls,
cost=cost,
usage=usage,
)
def create_hermes_agent(tools: list[Tool], domain_policy: str, **kwargs) -> HermesAgent:
"""
Factory function registered with the tau2 registry.
Expected kwargs:
model (str): litellm model string
base_url (str): API base URL (optional)
api_key (str): API key (optional)
temperature (float): sampling temperature (default 0.0)
top_p (float): nucleus sampling (optional)
max_tokens (int): max tokens (optional)
thinking (bool): enable reasoning/thinking mode (default False)
"""
return HermesAgent(
tools=tools,
domain_policy=domain_policy,
model=kwargs["model"],
base_url=kwargs.get("base_url"),
api_key=kwargs.get("api_key"),
temperature=kwargs.get("temperature", 0.0),
top_p=kwargs.get("top_p"),
max_tokens=kwargs.get("max_tokens"),
thinking=kwargs.get("thinking", False),
tool_parser=kwargs.get("tool_parser"),
)
@@ -0,0 +1,288 @@
"""
tau2-bench evaluation runner for Hermes Agent.
Runs the tau2-bench retail, airline, telecom, or banking_knowledge evaluation
using HermesAgent backed by litellm — the same inference path used across the
rest of the Hermes Agent codebase.
Usage:
# Against OpenRouter (auto-detects OPENROUTER_API_KEY)
python environments/benchmarks/taubench/run_eval.py \\
--model openrouter/anthropic/claude-sonnet-4-5 \\
--base-url openrouter \\
--env retail
# Against OpenAI directly
python environments/benchmarks/taubench/run_eval.py \\
--model gpt-4o \\
--env retail
# Local vLLM
python environments/benchmarks/taubench/run_eval.py \\
--model openai/NousResearch/Hermes-3-Llama-3.1-70B \\
--base-url http://localhost:8000/v1 \\
--env retail \\
--num-trials 3
# Specific tasks only
python environments/benchmarks/taubench/run_eval.py \\
--model openrouter/anthropic/claude-sonnet-4-5 \\
--base-url openrouter \\
--env retail \\
--task-ids task_1 task_2 task_5
Results are saved to results/tau2bench/ as JSON.
Dependencies (requires Python 3.12+):
pip install "tau2 @ git+https://github.com/sierra-research/tau2-bench.git"
# or: pip install -e ".[tau2bench]"
"""
import argparse
import logging
import os
import sys
from pathlib import Path
from typing import Optional
_repo_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from tau2.data_model.simulation import Results, TextRunConfig
from tau2.evaluator.evaluator import EvaluationType
from tau2.registry import registry
from tau2.runner.batch import run_tasks
from tau2.runner.helpers import get_tasks
from environments.benchmarks.taubench.hermes_agent import create_hermes_agent
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
AGENT_NAME = "hermes_agent"
def _register_agent(
model: str,
base_url: Optional[str],
api_key: Optional[str],
temperature: float,
top_p: Optional[float],
max_tokens: Optional[int],
thinking: bool,
tool_parser: Optional[str],
) -> None:
"""Register the HermesAgent factory with the tau2 registry (idempotent)."""
if registry.get_agent_factory(AGENT_NAME) is not None:
return
def factory(tools, domain_policy, **kwargs):
return create_hermes_agent(
tools=tools,
domain_policy=domain_policy,
model=model,
base_url=base_url,
api_key=api_key,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
thinking=thinking,
tool_parser=tool_parser,
)
registry.register_agent_factory(factory=factory, name=AGENT_NAME)
logger.info("Registered agent factory: %s (model=%s, thinking=%s, tool_parser=%s)", AGENT_NAME, model, thinking, tool_parser)
def run_eval(
model: str,
base_url: Optional[str],
api_key: Optional[str],
user_model: str,
env_name: str,
task_split: Optional[str],
num_trials: int,
max_concurrency: int,
max_steps: int,
temperature: float,
top_p: Optional[float],
max_tokens: Optional[int],
thinking: bool,
tool_parser: Optional[str],
task_ids: Optional[list],
start_index: int,
end_index: int,
log_dir: str,
seed: int,
) -> Results:
# Resolve OpenRouter shorthand
if base_url and base_url.strip().lower() == "openrouter":
base_url = OPENROUTER_BASE_URL
is_openrouter = base_url and "openrouter" in base_url.lower()
# litellm requires the "openrouter/" prefix to route correctly
if is_openrouter and not model.startswith("openrouter/"):
model = f"openrouter/{model}"
if is_openrouter and not user_model.startswith("openrouter/"):
user_model = f"openrouter/{user_model}"
# Resolve API key
if is_openrouter:
api_key = api_key or os.environ.get("OPENROUTER_API_KEY") or os.environ.get("OPENAI_API_KEY")
# litellm reads OPENAI_API_KEY for base_url overrides; set it so the
# user simulator's generate() call also authenticates correctly.
if api_key and not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = api_key
else:
api_key = api_key or os.environ.get("OPENAI_API_KEY")
_register_agent(
model=model,
base_url=base_url,
api_key=api_key,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
thinking=thinking,
tool_parser=tool_parser,
)
# Load tasks — task_ids in tau2 are strings like "task_1"
tasks = get_tasks(
task_set_name=env_name,
task_split_name=task_split,
task_ids=[str(i) for i in task_ids] if task_ids else None,
)
if not task_ids and (end_index != -1 or start_index != 0):
end = end_index if end_index != -1 else len(tasks)
tasks = tasks[start_index:end]
logger.info(
"Running tau2-%s eval: %d tasks, %d trial(s), concurrency=%d",
env_name, len(tasks), num_trials, max_concurrency,
)
save_path = Path(log_dir) / f"tau2-{env_name}-{model.split('/')[-1]}.json"
save_path.parent.mkdir(parents=True, exist_ok=True)
# Pass api_key/base_url to user sim via llm_args so tau2's generate() authenticates.
# When using OpenRouter for the user sim, mirror the agent's key + endpoint.
user_llm_args: dict = {}
if is_openrouter and api_key:
user_llm_args["api_key"] = api_key
user_llm_args["base_url"] = base_url
config = TextRunConfig(
domain=env_name,
agent=AGENT_NAME,
user="user_simulator",
llm_agent=model,
llm_args_agent={},
llm_user=user_model,
llm_args_user=user_llm_args,
num_trials=num_trials,
max_steps=max_steps,
max_concurrency=max_concurrency,
seed=seed,
)
results = run_tasks(
config,
tasks,
save_path=save_path,
console_display=True,
# ALL: respects each task's reward_basis. NL assertions are skipped
# gracefully (scored as pass) rather than raising an error, so tasks
# are evaluated only on their actual basis components (DB, ACTION, etc.)
evaluation_type=EvaluationType.ALL,
)
logger.info("Results saved to %s", save_path)
return results
def main():
parser = argparse.ArgumentParser(
description="Run tau2-bench evaluation with Hermes Agent (requires Python 3.12+)",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--model", required=True,
help="litellm model string, e.g. 'openrouter/anthropic/claude-sonnet-4-5' or 'gpt-4o'",
)
parser.add_argument(
"--base-url", default=None,
help="API base URL. Use 'openrouter' as shorthand for https://openrouter.ai/api/v1.",
)
parser.add_argument("--api-key", default=None, help="API key (falls back to OPENROUTER_API_KEY / OPENAI_API_KEY)")
parser.add_argument("--temperature", type=float, default=1.0,
help="Sampling temperature. NVIDIA used 1.0 for nemotron-super.")
parser.add_argument("--top-p", type=float, default=0.95,
help="Nucleus sampling. NVIDIA used 0.95 for nemotron-super.")
parser.add_argument("--max-tokens", type=int, default=None)
parser.add_argument("--thinking", action="store_true", default=False,
help="Enable reasoning/thinking mode (use_reasoning=true). "
"Required to match NVIDIA's reported nemotron-super scores.")
parser.add_argument("--tool-parser", default=None,
help="Tool call parser to use (e.g. 'qwen3_coder'). When set, tools are "
"embedded in the system prompt as <tools> XML and responses are parsed "
"from raw text instead of using OpenAI function calling format.")
parser.add_argument(
"--user-model", default="qwen/qwen3-235b-a22b-2507:nitro",
help="litellm model string for the tau2 user simulator. "
"Defaults to qwen/qwen3-235b-a22b-2507:nitro (instruct, non-thinking) to match NVIDIA's eval setup. "
"When using --base-url openrouter the openrouter/ prefix is added automatically.",
)
parser.add_argument(
"--env", default="retail",
choices=["retail", "airline", "telecom", "banking_knowledge", "mock"],
)
parser.add_argument(
"--task-split", default=None,
help="Task split name (e.g. 'base'). Defaults to the domain default.",
)
parser.add_argument("--num-trials", type=int, default=1)
parser.add_argument("--max-concurrency", type=int, default=8)
parser.add_argument("--max-steps", type=int, default=50)
parser.add_argument(
"--task-ids", nargs="*", default=None,
help="Specific task IDs to run (tau2 task IDs are strings like 'task_1')",
)
parser.add_argument("--start-index", type=int, default=0)
parser.add_argument("--end-index", type=int, default=-1)
parser.add_argument("--seed", type=int, default=10)
parser.add_argument("--log-dir", default="results/tau2bench")
args = parser.parse_args()
run_eval(
model=args.model,
base_url=args.base_url,
api_key=args.api_key,
user_model=args.user_model,
env_name=args.env,
task_split=args.task_split,
num_trials=args.num_trials,
max_concurrency=args.max_concurrency,
max_steps=args.max_steps,
temperature=args.temperature,
top_p=args.top_p,
max_tokens=args.max_tokens,
thinking=args.thinking,
tool_parser=args.tool_parser,
task_ids=args.task_ids,
start_index=args.start_index,
end_index=args.end_index,
log_dir=args.log_dir,
seed=args.seed,
)
if __name__ == "__main__":
main()
+4
View File
@@ -742,6 +742,10 @@ class TelegramAdapter(BasePlatformAdapter):
if not self._bot:
return SendResult(success=False, error="Not connected")
# Skip whitespace-only text to prevent Telegram 400 empty-text errors.
if not content or not content.strip():
return SendResult(success=True, message_id=None)
try:
# Format and split message if needed
formatted = self.format_message(content)
+58 -3
View File
@@ -24,6 +24,7 @@ import signal
import tempfile
import threading
import time
import uuid
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import datetime
@@ -788,6 +789,7 @@ class GatewayRunner:
"api_mode": runtime_kwargs.get("api_mode"),
"command": runtime_kwargs.get("command"),
"args": list(runtime_kwargs.get("args") or []),
"credential_pool": runtime_kwargs.get("credential_pool"),
}
return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
@@ -4719,9 +4721,13 @@ class GatewayRunner:
_APPROVAL_TIMEOUT_SECONDS = 300 # 5 minutes
async def _handle_approve_command(self, event: MessageEvent) -> str:
async def _handle_approve_command(self, event: MessageEvent) -> Optional[str]:
"""Handle /approve command — execute a pending dangerous command.
After execution, re-invokes the agent with the command result so it
can continue its multi-step task (fixes the "dead agent" bug where
the agent loop exited on approval_required and never resumed).
Usage:
/approve approve and execute the pending command
/approve session approve and remember for this session
@@ -4770,8 +4776,57 @@ class GatewayRunner:
logger.info("User approved dangerous command via /approve: %s...%s", cmd[:60], scope_msg)
from tools.terminal_tool import terminal_tool
result = terminal_tool(command=cmd, force=True)
return f"✅ Command approved and executed{scope_msg}.\n\n```\n{result[:3500]}\n```"
result = await asyncio.to_thread(terminal_tool, command=cmd, force=True)
# Send immediate feedback so the user sees the command output right away
immediate_msg = f"✅ Command approved and executed{scope_msg}.\n\n```\n{result[:3500]}\n```"
adapter = self.adapters.get(source.platform)
if adapter:
try:
await adapter.send(source.chat_id, immediate_msg)
except Exception as e:
logger.warning("Failed to send approval feedback: %s", e)
# Re-invoke the agent with the command result so it can continue its task.
# The agent's conversation history (persisted in SQLite) already contains
# the tool call that returned approval_required — the continuation message
# provides the actual execution output so the agent can pick up where it
# left off.
continuation_text = (
f"[System: The user approved the previously blocked command and it has been executed.\n"
f"Command: {cmd}\n"
f"<command_output>\n{result[:3500]}\n</command_output>\n\n"
f"Continue with the task you were working on.]"
)
synthetic_event = MessageEvent(
text=continuation_text,
source=source,
message_id=f"approve-continuation-{uuid.uuid4().hex}",
)
async def _continue_agent():
try:
response = await self._handle_message(synthetic_event)
if response and adapter:
await adapter.send(source.chat_id, response)
except Exception as e:
logger.error("Failed to continue agent after /approve: %s", e)
if adapter:
try:
await adapter.send(
source.chat_id,
f"⚠️ Failed to resume agent after approval: {e}"
)
except Exception:
pass
_task = asyncio.create_task(_continue_agent())
self._background_tasks.add(_task)
_task.add_done_callback(self._background_tasks.discard)
# Return None — we already sent the immediate feedback and the agent
# continuation is running in the background.
return None
async def _handle_deny_command(self, event: MessageEvent) -> str:
"""Handle /deny command — reject a pending dangerous command."""
+8
View File
@@ -247,6 +247,13 @@ DEFAULT_CONFIG = {
"command_timeout": 30, # Timeout for browser commands in seconds (screenshot, navigate, etc.)
"record_sessions": False, # Auto-record browser sessions as WebM videos
"allow_private_urls": False, # Allow navigating to private/internal IPs (localhost, 192.168.x.x, etc.)
"camofox": {
# When true, Hermes sends a stable profile-scoped userId to Camofox
# so the server can map it to a persistent browser profile directory.
# Requires Camofox server to be configured with CAMOFOX_PROFILE_DIR.
# When false (default), each session gets a random userId (ephemeral).
"managed_persistence": False,
},
},
# Filesystem checkpoints — automatic snapshots before destructive file ops.
@@ -352,6 +359,7 @@ DEFAULT_CONFIG = {
"bell_on_complete": False,
"show_reasoning": False,
"streaming": False,
"inline_diffs": True, # Show inline diff previews for write actions (write_file, patch, skill_manage)
"show_cost": False, # Show $ cost in the status bar (off by default)
"skin": "default",
"tool_progress_command": False, # Enable /verbose command in messaging gateway
+28 -2
View File
@@ -463,6 +463,32 @@ def _build_user_local_paths(home: Path, path_entries: list[str]) -> list[str]:
return [p for p in candidates if p not in path_entries and Path(p).exists()]
def _hermes_home_for_target_user(target_home_dir: str) -> str:
"""Remap the current HERMES_HOME to the equivalent under a target user's home.
When installing a system service via sudo, get_hermes_home() resolves to
root's home. This translates it to the target user's equivalent path:
/root/.hermes → /home/alice/.hermes
/root/.hermes/profiles/coder → /home/alice/.hermes/profiles/coder
/opt/custom-hermes → /opt/custom-hermes (kept as-is)
"""
current_hermes = get_hermes_home().resolve()
current_default = (Path.home() / ".hermes").resolve()
target_default = Path(target_home_dir) / ".hermes"
# Default ~/.hermes → remap to target user's default
if current_hermes == current_default:
return str(target_default)
# Profile or subdir of ~/.hermes → preserve the relative structure
try:
relative = current_hermes.relative_to(current_default)
return str(target_default / relative)
except ValueError:
# Completely custom path (not under ~/.hermes) — keep as-is
return str(current_hermes)
def generate_systemd_unit(system: bool = False, run_as_user: str | None = None) -> str:
python_path = get_python_path()
working_dir = str(PROJECT_ROOT)
@@ -478,12 +504,11 @@ def generate_systemd_unit(system: bool = False, run_as_user: str | None = None)
if resolved_node_dir not in path_entries:
path_entries.append(resolved_node_dir)
hermes_home = str(get_hermes_home().resolve())
common_bin_paths = ["/usr/local/sbin", "/usr/local/bin", "/usr/sbin", "/usr/bin", "/sbin", "/bin"]
if system:
username, group_name, home_dir = _system_service_identity(run_as_user)
hermes_home = _hermes_home_for_target_user(home_dir)
path_entries.extend(_build_user_local_paths(Path(home_dir), path_entries))
path_entries.extend(common_bin_paths)
sane_path = ":".join(path_entries)
@@ -518,6 +543,7 @@ StandardError=journal
WantedBy=multi-user.target
"""
hermes_home = str(get_hermes_home().resolve())
path_entries.extend(_build_user_local_paths(Path.home(), path_entries))
path_entries.extend(common_bin_paths)
sane_path = ":".join(path_entries)
+76
View File
@@ -58,6 +58,32 @@ _CLONE_ALL_STRIP = [
"processes.json",
]
# Directories/files to exclude when exporting the default (~/.hermes) profile.
# The default profile contains infrastructure (repo checkout, worktrees, DBs,
# caches, binaries) that named profiles don't have. We exclude those so the
# export is a portable, reasonable-size archive of actual profile data.
_DEFAULT_EXPORT_EXCLUDE_ROOT = frozenset({
# Infrastructure
"hermes-agent", # repo checkout (multi-GB)
".worktrees", # git worktrees
"profiles", # other profiles — never recursive-export
"bin", # installed binaries (tirith, etc.)
"node_modules", # npm packages
# Databases & runtime state
"state.db", "state.db-shm", "state.db-wal",
"hermes_state.db",
"response_store.db", "response_store.db-shm", "response_store.db-wal",
"gateway.pid", "gateway_state.json", "processes.json",
"auth.lock", "active_profile", ".update_check",
"errors.log",
".hermes_history",
# Caches (regenerated on use)
"image_cache", "audio_cache", "document_cache",
"browser_screenshots", "checkpoints",
"sandboxes",
"logs", # gateway logs
})
# Names that cannot be used as profile aliases
_RESERVED_NAMES = frozenset({
"hermes", "default", "test", "tmp", "root", "sudo",
@@ -685,11 +711,37 @@ def get_active_profile_name() -> str:
# Export / Import
# ---------------------------------------------------------------------------
def _default_export_ignore(root_dir: Path):
"""Return an *ignore* callable for :func:`shutil.copytree`.
At the root level it excludes everything in ``_DEFAULT_EXPORT_EXCLUDE_ROOT``.
At all levels it excludes ``__pycache__``, sockets, and temp files.
"""
def _ignore(directory: str, contents: list) -> set:
ignored: set = set()
for entry in contents:
# Universal exclusions (any depth)
if entry == "__pycache__" or entry.endswith((".sock", ".tmp")):
ignored.add(entry)
# npm lockfiles can appear at root
elif entry in ("package.json", "package-lock.json"):
ignored.add(entry)
# Root-level exclusions
if Path(directory) == root_dir:
ignored.update(c for c in contents if c in _DEFAULT_EXPORT_EXCLUDE_ROOT)
return ignored
return _ignore
def export_profile(name: str, output_path: str) -> Path:
"""Export a profile to a tar.gz archive.
Returns the output file path.
"""
import tempfile
validate_profile_name(name)
profile_dir = get_profile_dir(name)
if not profile_dir.is_dir():
@@ -698,6 +750,21 @@ def export_profile(name: str, output_path: str) -> Path:
output = Path(output_path)
# shutil.make_archive wants the base name without extension
base = str(output).removesuffix(".tar.gz").removesuffix(".tgz")
if name == "default":
# The default profile IS ~/.hermes itself — its parent is ~/ and its
# directory name is ".hermes", not "default". We stage a clean copy
# under a temp dir so the archive contains ``default/...``.
with tempfile.TemporaryDirectory() as tmpdir:
staged = Path(tmpdir) / "default"
shutil.copytree(
profile_dir,
staged,
ignore=_default_export_ignore(profile_dir),
)
result = shutil.make_archive(base, "gztar", tmpdir, "default")
return Path(result)
result = shutil.make_archive(base, "gztar", str(profile_dir.parent), name)
return Path(result)
@@ -788,6 +855,15 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
"Specify it explicitly: hermes profile import <archive> --name <name>"
)
# Archives exported from the default profile have "default/" as top-level
# dir. Importing as "default" would target ~/.hermes itself — disallow
# that and guide the user toward a named profile.
if inferred_name == "default":
raise ValueError(
"Cannot import as 'default' — that is the built-in root profile (~/.hermes). "
"Specify a different name: hermes profile import <archive> --name <name>"
)
validate_profile_name(inferred_name)
profile_dir = get_profile_dir(inferred_name)
if profile_dir.exists():
+2
View File
@@ -72,6 +72,8 @@ rl = [
"wandb>=0.15.0,<1",
]
yc-bench = ["yc-bench @ git+https://github.com/collinear-ai/yc-bench.git ; python_version >= '3.12'"]
taubench = ["tau-bench @ git+https://github.com/sierra-research/tau-bench.git"]
tau2bench = ["tau2 @ git+https://github.com/sierra-research/tau2-bench.git"]
all = [
"hermes-agent[modal]",
"hermes-agent[daytona]",
+68 -9
View File
@@ -320,8 +320,12 @@ def _extract_parallel_scope_path(tool_name: str, function_args: dict) -> Path |
if not isinstance(raw_path, str) or not raw_path.strip():
return None
expanded = Path(raw_path).expanduser()
if expanded.is_absolute():
return Path(os.path.abspath(str(expanded)))
# Avoid resolve(); the file may not exist yet.
return Path(raw_path).expanduser()
return Path(os.path.abspath(str(Path.cwd() / expanded)))
def _paths_overlap(left: Path, right: Path) -> bool:
@@ -486,6 +490,8 @@ class AIAgent:
provider_data_collection: str = None,
session_id: str = None,
tool_progress_callback: callable = None,
tool_start_callback: callable = None,
tool_complete_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
@@ -620,6 +626,8 @@ class AIAgent:
).start()
self.tool_progress_callback = tool_progress_callback
self.tool_start_callback = tool_start_callback
self.tool_complete_callback = tool_complete_callback
self.thinking_callback = thinking_callback
self.reasoning_callback = reasoning_callback
self._reasoning_deltas_fired = False # Set by _fire_reasoning_delta, reset per API call
@@ -3486,14 +3494,33 @@ class AIAgent:
@staticmethod
def _is_openai_client_closed(client: Any) -> bool:
"""Check if an OpenAI client is closed.
Handles both property and method forms of is_closed:
- httpx.Client.is_closed is a bool property
- openai.OpenAI.is_closed is a method returning bool
Prior bug: getattr(client, "is_closed", False) returned the bound method,
which is always truthy, causing unnecessary client recreation on every call.
"""
from unittest.mock import Mock
if isinstance(client, Mock):
return False
if bool(getattr(client, "is_closed", False)):
return True
is_closed_attr = getattr(client, "is_closed", None)
if is_closed_attr is not None:
# Handle method (openai SDK) vs property (httpx)
if callable(is_closed_attr):
if is_closed_attr():
return True
elif bool(is_closed_attr):
return True
http_client = getattr(client, "_client", None)
return bool(getattr(http_client, "is_closed", False))
if http_client is not None:
return bool(getattr(http_client, "is_closed", False))
return False
def _create_openai_client(self, client_kwargs: dict, *, reason: str, shared: bool) -> Any:
if self.provider == "copilot-acp" or str(client_kwargs.get("base_url", "")).startswith("acp://copilot"):
@@ -5534,7 +5561,7 @@ class AIAgent:
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {name}({list(args.keys())}) - {args_preview}")
for _, name, args in parsed_calls:
for tc, name, args in parsed_calls:
if self.tool_progress_callback:
try:
preview = _build_tool_preview(name, args)
@@ -5542,6 +5569,13 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
for tc, name, args in parsed_calls:
if self.tool_start_callback:
try:
self.tool_start_callback(tc.id, name, args)
except Exception as cb_err:
logging.debug(f"Tool start callback error: {cb_err}")
# ── Concurrent execution ─────────────────────────────────────────
# Each slot holds (function_name, function_args, function_result, duration, error_flag)
results = [None] * num_tools
@@ -5612,6 +5646,12 @@ class AIAgent:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s - {response_preview}")
if self.tool_complete_callback:
try:
self.tool_complete_callback(tc.id, name, args, function_result)
except Exception as cb_err:
logging.debug(f"Tool complete callback error: {cb_err}")
# Truncate oversized results
MAX_TOOL_RESULT_CHARS = 100_000
if len(function_result) > MAX_TOOL_RESULT_CHARS:
@@ -5700,6 +5740,12 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
if self.tool_start_callback:
try:
self.tool_start_callback(tool_call.id, function_name, function_args)
except Exception as cb_err:
logging.debug(f"Tool start callback error: {cb_err}")
# Checkpoint: snapshot working dir before file-mutating tools
if function_name in ("write_file", "patch") and self._checkpoint_mgr.enabled:
try:
@@ -5864,6 +5910,12 @@ class AIAgent:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result ({len(function_result)} chars): {function_result}")
if self.tool_complete_callback:
try:
self.tool_complete_callback(tool_call.id, function_name, function_args, function_result)
except Exception as cb_err:
logging.debug(f"Tool complete callback error: {cb_err}")
# Guard against tools returning absurdly large content that would
# blow up the context window. 100K chars ≈ 25K tokens — generous
# enough for any reasonable tool output but prevents catastrophic
@@ -7178,10 +7230,17 @@ class AIAgent:
or "quota" in error_msg
)
if is_rate_limited and self._fallback_index < len(self._fallback_chain):
self._emit_status("⚠️ Rate limited — switching to fallback provider...")
if self._try_activate_fallback():
retry_count = 0
continue
# Don't eagerly fallback if credential pool rotation may
# still recover. The pool's retry-then-rotate cycle needs
# at least one more attempt to fire — jumping to a fallback
# provider here short-circuits it.
pool = self._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
self._emit_status("⚠️ Rate limited — switching to fallback provider...")
if self._try_activate_fallback():
retry_count = 0
continue
is_payload_too_large = (
status_code == 413
+71 -5
View File
@@ -4,6 +4,7 @@ Verifies that dangerous command approvals require explicit /approve or /deny
slash commands, not bare "yes"/"no" text matching.
"""
import asyncio
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
@@ -49,6 +50,7 @@ def _make_runner():
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._background_tasks = set()
runner._session_db = None
runner._reasoning_config = None
runner._provider_routing = {}
@@ -78,20 +80,32 @@ class TestApproveCommand:
@pytest.mark.asyncio
async def test_approve_executes_pending_command(self):
"""Basic /approve executes the pending command."""
"""Basic /approve executes the pending command and sends feedback."""
runner = _make_runner()
source = _make_source()
session_key = runner._session_key_for_source(source)
runner._pending_approvals[session_key] = _make_pending_approval()
event = _make_event("/approve")
with patch("tools.terminal_tool.terminal_tool", return_value="done") as mock_term:
with (
patch("tools.terminal_tool.terminal_tool", return_value="done") as mock_term,
patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value="agent continued"),
):
result = await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
assert "✅ Command approved and executed" in result
# Returns None because feedback is sent directly via adapter
assert result is None
mock_term.assert_called_once_with(command="sudo rm -rf /tmp/test", force=True)
assert session_key not in runner._pending_approvals
# Immediate feedback sent via adapter
adapter = runner.adapters[Platform.TELEGRAM]
sent_text = adapter.send.call_args_list[0][0][1]
assert "Command approved and executed" in sent_text
@pytest.mark.asyncio
async def test_approve_session_remembers_pattern(self):
"""/approve session approves the pattern for the session."""
@@ -104,12 +118,21 @@ class TestApproveCommand:
with (
patch("tools.terminal_tool.terminal_tool", return_value="done"),
patch("tools.approval.approve_session") as mock_session,
patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value=None),
):
result = await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
assert "pattern approved for this session" in result
assert result is None
mock_session.assert_called_once_with(session_key, "sudo")
# Verify scope message in adapter feedback
adapter = runner.adapters[Platform.TELEGRAM]
sent_text = adapter.send.call_args_list[0][0][1]
assert "pattern approved for this session" in sent_text
@pytest.mark.asyncio
async def test_approve_always_approves_permanently(self):
"""/approve always approves the pattern permanently."""
@@ -122,12 +145,21 @@ class TestApproveCommand:
with (
patch("tools.terminal_tool.terminal_tool", return_value="done"),
patch("tools.approval.approve_permanent") as mock_perm,
patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value=None),
):
result = await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
assert "pattern approved permanently" in result
assert result is None
mock_perm.assert_called_once_with("sudo")
# Verify scope message in adapter feedback
adapter = runner.adapters[Platform.TELEGRAM]
sent_text = adapter.send.call_args_list[0][0][1]
assert "pattern approved permanently" in sent_text
@pytest.mark.asyncio
async def test_approve_no_pending(self):
"""/approve with no pending approval returns helpful message."""
@@ -152,6 +184,40 @@ class TestApproveCommand:
assert "expired" in result
assert session_key not in runner._pending_approvals
@pytest.mark.asyncio
async def test_approve_reinvokes_agent_with_result(self):
"""After executing, /approve re-invokes the agent with command output."""
runner = _make_runner()
source = _make_source()
session_key = runner._session_key_for_source(source)
runner._pending_approvals[session_key] = _make_pending_approval()
event = _make_event("/approve")
mock_handle = AsyncMock(return_value="I continued the task.")
with (
patch("tools.terminal_tool.terminal_tool", return_value="file deleted"),
patch.object(runner, "_handle_message", mock_handle),
):
await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
# Agent was re-invoked via _handle_message with a synthetic event
mock_handle.assert_called_once()
synthetic_event = mock_handle.call_args[0][0]
assert "approved" in synthetic_event.text.lower()
assert "file deleted" in synthetic_event.text
assert "sudo rm -rf /tmp/test" in synthetic_event.text
# The continuation response was sent to the user
adapter = runner.adapters[Platform.TELEGRAM]
# First call: immediate feedback, second call: agent continuation
assert adapter.send.call_count == 2
continuation_response = adapter.send.call_args_list[1][0][1]
assert continuation_response == "I continued the task."
# ------------------------------------------------------------------
# /deny command
+96
View File
@@ -339,6 +339,102 @@ class TestDetectVenvDir:
assert result is None
class TestSystemUnitHermesHome:
"""HERMES_HOME in system units must reference the target user, not root."""
def test_system_unit_uses_target_user_home_not_calling_user(self, monkeypatch):
# Simulate sudo: Path.home() returns /root, target user is alice
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.delenv("HERMES_HOME", raising=False)
monkeypatch.setattr(
gateway_cli, "_system_service_identity",
lambda run_as_user=None: ("alice", "alice", "/home/alice"),
)
monkeypatch.setattr(
gateway_cli, "_build_user_local_paths",
lambda home, existing: [],
)
unit = gateway_cli.generate_systemd_unit(system=True, run_as_user="alice")
assert 'HERMES_HOME=/home/alice/.hermes' in unit
assert '/root/.hermes' not in unit
def test_system_unit_remaps_profile_to_target_user(self, monkeypatch):
# Simulate sudo with a profile: HERMES_HOME was resolved under root
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/root/.hermes/profiles/coder")
monkeypatch.setattr(
gateway_cli, "_system_service_identity",
lambda run_as_user=None: ("alice", "alice", "/home/alice"),
)
monkeypatch.setattr(
gateway_cli, "_build_user_local_paths",
lambda home, existing: [],
)
unit = gateway_cli.generate_systemd_unit(system=True, run_as_user="alice")
assert 'HERMES_HOME=/home/alice/.hermes/profiles/coder' in unit
assert '/root/' not in unit
def test_system_unit_preserves_custom_hermes_home(self, monkeypatch):
# Custom HERMES_HOME not under any user's home — keep as-is
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/opt/hermes-shared")
monkeypatch.setattr(
gateway_cli, "_system_service_identity",
lambda run_as_user=None: ("alice", "alice", "/home/alice"),
)
monkeypatch.setattr(
gateway_cli, "_build_user_local_paths",
lambda home, existing: [],
)
unit = gateway_cli.generate_systemd_unit(system=True, run_as_user="alice")
assert 'HERMES_HOME=/opt/hermes-shared' in unit
def test_user_unit_unaffected_by_change(self):
# User-scope units should still use the calling user's HERMES_HOME
unit = gateway_cli.generate_systemd_unit(system=False)
hermes_home = str(gateway_cli.get_hermes_home().resolve())
assert f'HERMES_HOME={hermes_home}' in unit
class TestHermesHomeForTargetUser:
"""Unit tests for _hermes_home_for_target_user()."""
def test_remaps_default_home(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.delenv("HERMES_HOME", raising=False)
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/home/alice/.hermes"
def test_remaps_profile_path(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/root/.hermes/profiles/coder")
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/home/alice/.hermes/profiles/coder"
def test_keeps_custom_path(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/opt/hermes")
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/opt/hermes"
def test_noop_when_same_user(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/home/alice")))
monkeypatch.delenv("HERMES_HOME", raising=False)
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/home/alice/.hermes"
class TestGeneratedUnitUsesDetectedVenv:
def test_systemd_unit_uses_dot_venv_when_detected(self, tmp_path, monkeypatch):
dot_venv = tmp_path / ".venv"
+143
View File
@@ -488,6 +488,149 @@ class TestExportImport:
with pytest.raises(FileNotFoundError):
export_profile("nonexistent", str(tmp_path / "out.tar.gz"))
# ---------------------------------------------------------------
# Default profile export / import
# ---------------------------------------------------------------
def test_export_default_creates_valid_archive(self, profile_env, tmp_path):
"""Exporting the default profile produces a valid tar.gz."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("model: test")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
result = export_profile("default", str(output))
assert Path(result).exists()
assert tarfile.is_tarfile(str(result))
def test_export_default_includes_profile_data(self, profile_env, tmp_path):
"""Profile data files end up in the archive."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("model: test")
(default_dir / ".env").write_text("KEY=val")
(default_dir / "SOUL.md").write_text("Be nice.")
mem_dir = default_dir / "memories"
mem_dir.mkdir(exist_ok=True)
(mem_dir / "MEMORY.md").write_text("remember this")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(output))
with tarfile.open(str(output), "r:gz") as tf:
names = tf.getnames()
assert "default/config.yaml" in names
assert "default/.env" in names
assert "default/SOUL.md" in names
assert "default/memories/MEMORY.md" in names
def test_export_default_excludes_infrastructure(self, profile_env, tmp_path):
"""Repo checkout, worktrees, profiles, databases are excluded."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
# Create dirs/files that should be excluded
for d in ("hermes-agent", ".worktrees", "profiles", "bin",
"image_cache", "logs", "sandboxes", "checkpoints"):
sub = default_dir / d
sub.mkdir(exist_ok=True)
(sub / "marker.txt").write_text("excluded")
for f in ("state.db", "gateway.pid", "gateway_state.json",
"processes.json", "errors.log", ".hermes_history",
"active_profile", ".update_check", "auth.lock"):
(default_dir / f).write_text("excluded")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(output))
with tarfile.open(str(output), "r:gz") as tf:
names = tf.getnames()
# Config is present
assert "default/config.yaml" in names
# Infrastructure excluded
excluded_prefixes = [
"default/hermes-agent", "default/.worktrees", "default/profiles",
"default/bin", "default/image_cache", "default/logs",
"default/sandboxes", "default/checkpoints",
]
for prefix in excluded_prefixes:
assert not any(n.startswith(prefix) for n in names), \
f"Expected {prefix} to be excluded but found it in archive"
excluded_files = [
"default/state.db", "default/gateway.pid",
"default/gateway_state.json", "default/processes.json",
"default/errors.log", "default/.hermes_history",
"default/active_profile", "default/.update_check",
"default/auth.lock",
]
for f in excluded_files:
assert f not in names, f"Expected {f} to be excluded"
def test_export_default_excludes_pycache_at_any_depth(self, profile_env, tmp_path):
"""__pycache__ dirs are excluded even inside nested directories."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
nested = default_dir / "skills" / "my-skill" / "__pycache__"
nested.mkdir(parents=True)
(nested / "cached.pyc").write_text("bytecode")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(output))
with tarfile.open(str(output), "r:gz") as tf:
names = tf.getnames()
assert not any("__pycache__" in n for n in names)
def test_import_default_without_name_raises(self, profile_env, tmp_path):
"""Importing a default export without --name gives clear guidance."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
archive = tmp_path / "export" / "default.tar.gz"
archive.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(archive))
with pytest.raises(ValueError, match="Cannot import as 'default'"):
import_profile(str(archive))
def test_import_default_with_explicit_default_name_raises(self, profile_env, tmp_path):
"""Explicitly importing as 'default' is also rejected."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
archive = tmp_path / "export" / "default.tar.gz"
archive.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(archive))
with pytest.raises(ValueError, match="Cannot import as 'default'"):
import_profile(str(archive), name="default")
def test_import_default_export_with_new_name_roundtrip(self, profile_env, tmp_path):
"""Export default → import under a different name → data preserved."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("model: opus")
mem_dir = default_dir / "memories"
mem_dir.mkdir(exist_ok=True)
(mem_dir / "MEMORY.md").write_text("important fact")
archive = tmp_path / "export" / "default.tar.gz"
archive.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(archive))
imported = import_profile(str(archive), name="backup")
assert imported.is_dir()
assert (imported / "config.yaml").read_text() == "model: opus"
assert (imported / "memories" / "MEMORY.md").read_text() == "important fact"
# ===================================================================
# TestProfileIsolation
+14
View File
@@ -32,6 +32,8 @@ def cli_obj(_isolate):
obj.session_id = None
obj.api_key = "test"
obj.base_url = ""
obj.provider = "test"
obj._provider_source = None
# Mock agent with context compressor
obj.agent = SimpleNamespace(
context_compressor=SimpleNamespace(context_length=None)
@@ -145,3 +147,15 @@ class TestLowContextWarning:
calls = [str(c) for c in cli_obj.console.print.call_args_list]
warning_calls = [c for c in calls if "too low" in c]
assert len(warning_calls) == 0
def test_compact_banner_does_not_crash_on_narrow_terminal(self, cli_obj):
"""Compact mode should still have ctx_len defined for warning logic."""
cli_obj.agent.context_compressor.context_length = 4096
with patch("shutil.get_terminal_size", return_value=os.terminal_size((70, 40))), \
patch("cli._build_compact_banner", return_value="compact banner"):
cli_obj.show_banner()
calls = [str(c) for c in cli_obj.console.print.call_args_list]
warning_calls = [c for c in calls if "too low" in c]
assert len(warning_calls) == 1
+350
View File
@@ -0,0 +1,350 @@
"""Tests for credential pool preservation through smart routing and 429 recovery.
Covers:
1. credential_pool flows through resolve_turn_route (no-route and fallback paths)
2. CLI _resolve_turn_agent_config passes credential_pool to primary dict
3. Gateway _resolve_turn_agent_config passes credential_pool to primary dict
4. Eager fallback deferred when credential pool has credentials
5. Eager fallback fires when no credential pool exists
6. Full 429 rotation cycle: retry-same rotate exhaust fallback
"""
import os
import time
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
# ---------------------------------------------------------------------------
# 1. smart_model_routing: credential_pool preserved in no-route path
# ---------------------------------------------------------------------------
class TestSmartRoutingPoolPreservation:
def test_no_route_preserves_credential_pool(self):
from agent.smart_model_routing import resolve_turn_route
fake_pool = MagicMock(name="CredentialPool")
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
# routing disabled
result = resolve_turn_route("hello", None, primary)
assert result["runtime"]["credential_pool"] is fake_pool
def test_no_route_none_pool(self):
from agent.smart_model_routing import resolve_turn_route
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
}
result = resolve_turn_route("hello", None, primary)
assert result["runtime"]["credential_pool"] is None
def test_routing_disabled_preserves_pool(self):
from agent.smart_model_routing import resolve_turn_route
fake_pool = MagicMock(name="CredentialPool")
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
# routing explicitly disabled
result = resolve_turn_route("hello", {"enabled": False}, primary)
assert result["runtime"]["credential_pool"] is fake_pool
def test_route_fallback_on_resolve_error_preserves_pool(self, monkeypatch):
"""When smart routing picks a cheap model but resolve_runtime_provider
fails, the fallback to primary must still include credential_pool."""
from agent.smart_model_routing import resolve_turn_route
fake_pool = MagicMock(name="CredentialPool")
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
routing_config = {
"enabled": True,
"cheap_model": "openai/gpt-4.1-mini",
"cheap_provider": "openrouter",
"max_tokens": 200,
"patterns": ["^(hi|hello|hey)"],
}
# Force resolve_runtime_provider to fail so it falls back to primary
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
MagicMock(side_effect=RuntimeError("no credentials")),
)
result = resolve_turn_route("hi", routing_config, primary)
assert result["runtime"]["credential_pool"] is fake_pool
# ---------------------------------------------------------------------------
# 2 & 3. CLI and Gateway _resolve_turn_agent_config include credential_pool
# ---------------------------------------------------------------------------
class TestCliTurnRoutePool:
def test_resolve_turn_includes_pool(self, monkeypatch, tmp_path):
"""CLI's _resolve_turn_agent_config must pass credential_pool to primary."""
from agent.smart_model_routing import resolve_turn_route
captured = {}
def spy_resolve(user_message, routing_config, primary):
captured["primary"] = primary
return resolve_turn_route(user_message, routing_config, primary)
monkeypatch.setattr(
"agent.smart_model_routing.resolve_turn_route", spy_resolve
)
# Build a minimal HermesCLI-like object with the method
shell = SimpleNamespace(
model="gpt-5.4",
api_key="sk-test",
base_url=None,
provider="openai-codex",
api_mode="codex_responses",
acp_command=None,
acp_args=[],
_credential_pool=MagicMock(name="FakePool"),
_smart_model_routing={"enabled": False},
)
# Import and bind the real method
from cli import HermesCLI
bound = HermesCLI._resolve_turn_agent_config.__get__(shell)
bound("test message")
assert "credential_pool" in captured["primary"]
assert captured["primary"]["credential_pool"] is shell._credential_pool
class TestGatewayTurnRoutePool:
def test_resolve_turn_includes_pool(self, monkeypatch):
"""Gateway's _resolve_turn_agent_config must pass credential_pool."""
from agent.smart_model_routing import resolve_turn_route
captured = {}
def spy_resolve(user_message, routing_config, primary):
captured["primary"] = primary
return resolve_turn_route(user_message, routing_config, primary)
monkeypatch.setattr(
"agent.smart_model_routing.resolve_turn_route", spy_resolve
)
from gateway.run import GatewayRunner
runner = SimpleNamespace(
_smart_model_routing={"enabled": False},
)
runtime_kwargs = {
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": MagicMock(name="FakePool"),
}
bound = GatewayRunner._resolve_turn_agent_config.__get__(runner)
bound("test message", "gpt-5.4", runtime_kwargs)
assert "credential_pool" in captured["primary"]
assert captured["primary"]["credential_pool"] is runtime_kwargs["credential_pool"]
# ---------------------------------------------------------------------------
# 4 & 5. Eager fallback deferred/fires based on credential pool
# ---------------------------------------------------------------------------
class TestEagerFallbackWithPool:
"""Test the eager fallback guard in run_agent.py's error handling loop."""
def _make_agent(self, has_pool=True, pool_has_creds=True, has_fallback=True):
"""Create a minimal AIAgent mock with the fields needed."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
agent._credential_pool = None
if has_pool:
pool = MagicMock()
pool.has_available.return_value = pool_has_creds
agent._credential_pool = pool
agent._fallback_chain = [{"model": "fallback/model"}] if has_fallback else []
agent._fallback_index = 0
agent._try_activate_fallback = MagicMock(return_value=True)
agent._emit_status = MagicMock()
return agent
def test_eager_fallback_deferred_when_pool_has_credentials(self):
"""429 with active pool should NOT trigger eager fallback."""
agent = self._make_agent(has_pool=True, pool_has_creds=True, has_fallback=True)
# Simulate the check from run_agent.py lines 7180-7191
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_not_called()
def test_eager_fallback_fires_when_no_pool(self):
"""429 without pool should trigger eager fallback."""
agent = self._make_agent(has_pool=False, has_fallback=True)
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_called_once()
def test_eager_fallback_fires_when_pool_exhausted(self):
"""429 with exhausted pool should trigger eager fallback."""
agent = self._make_agent(has_pool=True, pool_has_creds=False, has_fallback=True)
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_called_once()
# ---------------------------------------------------------------------------
# 6. Full 429 rotation cycle via _recover_with_credential_pool
# ---------------------------------------------------------------------------
class TestPoolRotationCycle:
"""Verify the retry-same → rotate → exhaust flow in _recover_with_credential_pool."""
def _make_agent_with_pool(self, pool_entries=3):
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
entries = []
for i in range(pool_entries):
e = MagicMock(name=f"entry_{i}")
e.id = f"cred-{i}"
entries.append(e)
pool = MagicMock()
pool.has_credentials.return_value = True
# mark_exhausted_and_rotate returns next entry until exhausted
self._rotation_index = 0
def rotate(status_code=None):
self._rotation_index += 1
if self._rotation_index < pool_entries:
return entries[self._rotation_index]
pool.has_credentials.return_value = False
return None
pool.mark_exhausted_and_rotate = MagicMock(side_effect=rotate)
agent._credential_pool = pool
agent._swap_credential = MagicMock()
agent.log_prefix = ""
return agent, pool, entries
def test_first_429_sets_retry_flag_no_rotation(self):
"""First 429 should just set has_retried_429=True, no rotation."""
agent, pool, _ = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert recovered is False
assert has_retried is True
pool.mark_exhausted_and_rotate.assert_not_called()
def test_second_429_rotates_to_next(self):
"""Second consecutive 429 should rotate to next credential."""
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=True
)
assert recovered is True
assert has_retried is False # reset after rotation
pool.mark_exhausted_and_rotate.assert_called_once_with(status_code=429)
agent._swap_credential.assert_called_once_with(entries[1])
def test_pool_exhaustion_returns_false(self):
"""When all credentials exhausted, recovery should return False."""
agent, pool, _ = self._make_agent_with_pool(1)
# First 429 sets flag
_, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert has_retried is True
# Second 429 tries to rotate but pool is exhausted (only 1 entry)
recovered, _ = agent._recover_with_credential_pool(
status_code=429, has_retried_429=True
)
assert recovered is False
def test_402_immediate_rotation(self):
"""402 (billing) should immediately rotate, no retry-first."""
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=402, has_retried_429=False
)
assert recovered is True
assert has_retried is False
pool.mark_exhausted_and_rotate.assert_called_once_with(status_code=402)
def test_no_pool_returns_false(self):
"""No pool should return (False, unchanged)."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
agent._credential_pool = None
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert recovered is False
assert has_retried is False
+119 -2
View File
@@ -1,7 +1,17 @@
"""Tests for agent/display.py — build_tool_preview()."""
"""Tests for agent/display.py — build_tool_preview() and inline diff previews."""
import os
import pytest
from agent.display import build_tool_preview
from unittest.mock import MagicMock, patch
from agent.display import (
build_tool_preview,
capture_local_edit_snapshot,
extract_edit_diff,
_render_inline_unified_diff,
_summarize_rendered_diff_sections,
render_edit_diff_with_delta,
)
class TestBuildToolPreview:
@@ -83,3 +93,110 @@ class TestBuildToolPreview:
assert build_tool_preview("terminal", 0) is None
assert build_tool_preview("terminal", "") is None
assert build_tool_preview("terminal", []) is None
class TestEditDiffPreview:
def test_extract_edit_diff_for_patch(self):
diff = extract_edit_diff("patch", '{"success": true, "diff": "--- a/x\\n+++ b/x\\n"}')
assert diff is not None
assert "+++ b/x" in diff
def test_render_inline_unified_diff_colors_added_and_removed_lines(self):
rendered = _render_inline_unified_diff(
"--- a/cli.py\n"
"+++ b/cli.py\n"
"@@ -1,2 +1,2 @@\n"
"-old line\n"
"+new line\n"
" context\n"
)
assert "a/cli.py" in rendered[0]
assert "b/cli.py" in rendered[0]
assert any("old line" in line for line in rendered)
assert any("new line" in line for line in rendered)
assert any("48;2;" in line for line in rendered)
def test_extract_edit_diff_ignores_non_edit_tools(self):
assert extract_edit_diff("web_search", '{"diff": "--- a\\n+++ b\\n"}') is None
def test_extract_edit_diff_uses_local_snapshot_for_write_file(self, tmp_path):
target = tmp_path / "note.txt"
target.write_text("old\n", encoding="utf-8")
snapshot = capture_local_edit_snapshot("write_file", {"path": str(target)})
target.write_text("new\n", encoding="utf-8")
diff = extract_edit_diff(
"write_file",
'{"bytes_written": 4}',
function_args={"path": str(target)},
snapshot=snapshot,
)
assert diff is not None
assert "--- a/" in diff
assert "+++ b/" in diff
assert "-old" in diff
assert "+new" in diff
def test_render_edit_diff_with_delta_invokes_printer(self):
printer = MagicMock()
rendered = render_edit_diff_with_delta(
"patch",
'{"diff": "--- a/x\\n+++ b/x\\n@@ -1 +1 @@\\n-old\\n+new\\n"}',
print_fn=printer,
)
assert rendered is True
assert printer.call_count >= 2
calls = [call.args[0] for call in printer.call_args_list]
assert any("a/x" in line and "b/x" in line for line in calls)
assert any("old" in line for line in calls)
assert any("new" in line for line in calls)
def test_render_edit_diff_with_delta_skips_without_diff(self):
rendered = render_edit_diff_with_delta(
"patch",
'{"success": true}',
)
assert rendered is False
def test_render_edit_diff_with_delta_handles_renderer_errors(self, monkeypatch):
printer = MagicMock()
monkeypatch.setattr("agent.display._summarize_rendered_diff_sections", MagicMock(side_effect=RuntimeError("boom")))
rendered = render_edit_diff_with_delta(
"patch",
'{"diff": "--- a/x\\n+++ b/x\\n"}',
print_fn=printer,
)
assert rendered is False
assert printer.call_count == 0
def test_summarize_rendered_diff_sections_truncates_large_diff(self):
diff = "--- a/x.py\n+++ b/x.py\n" + "".join(f"+line{i}\n" for i in range(120))
rendered = _summarize_rendered_diff_sections(diff, max_lines=20)
assert len(rendered) == 21
assert "omitted" in rendered[-1]
def test_summarize_rendered_diff_sections_limits_file_count(self):
diff = "".join(
f"--- a/file{i}.py\n+++ b/file{i}.py\n+line{i}\n"
for i in range(8)
)
rendered = _summarize_rendered_diff_sections(diff, max_files=3, max_lines=50)
assert any("a/file0.py" in line for line in rendered)
assert any("a/file1.py" in line for line in rendered)
assert any("a/file2.py" in line for line in rendered)
assert not any("a/file7.py" in line for line in rendered)
assert "additional file" in rendered[-1]
+108
View File
@@ -1239,6 +1239,42 @@ class TestConcurrentToolExecution:
)
assert result == "result"
def test_sequential_tool_callbacks_fire_in_order(self, agent):
tool_call = _mock_tool_call(name="web_search", arguments='{"query":"hello"}', call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tool_call])
messages = []
starts = []
completes = []
agent.tool_start_callback = lambda tool_call_id, function_name, function_args: starts.append((tool_call_id, function_name, function_args))
agent.tool_complete_callback = lambda tool_call_id, function_name, function_args, function_result: completes.append((tool_call_id, function_name, function_args, function_result))
with patch("run_agent.handle_function_call", return_value='{"success": true}'):
agent._execute_tool_calls_sequential(mock_msg, messages, "task-1")
assert starts == [("c1", "web_search", {"query": "hello"})]
assert completes == [("c1", "web_search", {"query": "hello"}, '{"success": true}')]
def test_concurrent_tool_callbacks_fire_for_each_tool(self, agent):
tc1 = _mock_tool_call(name="web_search", arguments='{"query":"one"}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{"query":"two"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
starts = []
completes = []
agent.tool_start_callback = lambda tool_call_id, function_name, function_args: starts.append((tool_call_id, function_name, function_args))
agent.tool_complete_callback = lambda tool_call_id, function_name, function_args, function_result: completes.append((tool_call_id, function_name, function_args, function_result))
with patch("run_agent.handle_function_call", side_effect=['{"id":1}', '{"id":2}']):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert starts == [
("c1", "web_search", {"query": "one"}),
("c2", "web_search", {"query": "two"}),
]
assert len(completes) == 2
assert {entry[0] for entry in completes} == {"c1", "c2"}
assert {entry[3] for entry in completes} == {'{"id":1}', '{"id":2}'}
def test_invoke_tool_handles_agent_level_tools(self, agent):
"""_invoke_tool should handle todo tool directly."""
with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo:
@@ -1280,6 +1316,38 @@ class TestPathsOverlap:
assert not _paths_overlap(Path("src/a.py"), Path(""))
class TestParallelScopePathNormalization:
def test_extract_parallel_scope_path_normalizes_relative_to_cwd(self, tmp_path, monkeypatch):
from run_agent import _extract_parallel_scope_path
monkeypatch.chdir(tmp_path)
scoped = _extract_parallel_scope_path("write_file", {"path": "./notes.txt"})
assert scoped == tmp_path / "notes.txt"
def test_extract_parallel_scope_path_treats_relative_and_absolute_same_file_as_same_scope(self, tmp_path, monkeypatch):
from run_agent import _extract_parallel_scope_path, _paths_overlap
monkeypatch.chdir(tmp_path)
abs_path = tmp_path / "notes.txt"
rel_scoped = _extract_parallel_scope_path("write_file", {"path": "notes.txt"})
abs_scoped = _extract_parallel_scope_path("write_file", {"path": str(abs_path)})
assert rel_scoped == abs_scoped
assert _paths_overlap(rel_scoped, abs_scoped)
def test_should_parallelize_tool_batch_rejects_same_file_with_mixed_path_spellings(self, tmp_path, monkeypatch):
from run_agent import _should_parallelize_tool_batch
monkeypatch.chdir(tmp_path)
tc1 = _mock_tool_call(name="write_file", arguments='{"path":"notes.txt","content":"one"}', call_id="c1")
tc2 = _mock_tool_call(name="write_file", arguments=f'{{"path":"{tmp_path / "notes.txt"}","content":"two"}}', call_id="c2")
assert not _should_parallelize_tool_batch([tc1, tc2])
class TestHandleMaxIterations:
def test_returns_summary(self, agent):
resp = _mock_response(content="Here is a summary of what I did.")
@@ -2741,6 +2809,46 @@ def test_is_openai_client_closed_honors_custom_client_flag():
assert AIAgent._is_openai_client_closed(SimpleNamespace(is_closed=False)) is False
def test_is_openai_client_closed_handles_method_form():
"""Fix for issue #4377: is_closed as method (openai SDK) vs property (httpx).
The openai SDK's is_closed is a method, not a property. Prior to this fix,
getattr(client, "is_closed", False) returned the bound method object, which
is always truthy, causing the function to incorrectly report all clients as
closed and triggering unnecessary client recreation on every API call.
"""
class MethodFormClient:
"""Mimics openai.OpenAI where is_closed() is a method."""
def __init__(self, closed: bool):
self._closed = closed
def is_closed(self) -> bool:
return self._closed
# Method returning False - client is open
open_client = MethodFormClient(closed=False)
assert AIAgent._is_openai_client_closed(open_client) is False
# Method returning True - client is closed
closed_client = MethodFormClient(closed=True)
assert AIAgent._is_openai_client_closed(closed_client) is True
def test_is_openai_client_closed_falls_back_to_http_client():
"""Verify fallback to _client.is_closed when top-level is_closed is None."""
class ClientWithHttpClient:
is_closed = None # No top-level is_closed
def __init__(self, http_closed: bool):
self._client = SimpleNamespace(is_closed=http_closed)
assert AIAgent._is_openai_client_closed(ClientWithHttpClient(http_closed=False)) is False
assert AIAgent._is_openai_client_closed(ClientWithHttpClient(http_closed=True)) is True
class TestAnthropicBaseUrlPassthrough:
"""Bug fix: base_url was filtered with 'anthropic in base_url', blocking proxies."""
@@ -0,0 +1,242 @@
"""Persistence tests for the Camofox browser backend.
Tests that managed persistence uses stable identity while default mode
uses random identity. The actual browser profile persistence is handled
by the Camofox server (when CAMOFOX_PROFILE_DIR is set).
"""
import json
from unittest.mock import MagicMock, patch
import pytest
from tools.browser_camofox import (
_drop_session,
_get_session,
_managed_persistence_enabled,
camofox_close,
camofox_navigate,
check_camofox_available,
cleanup_all_camofox_sessions,
get_vnc_url,
)
from tools.browser_camofox_state import get_camofox_identity
def _mock_response(status=200, json_data=None):
resp = MagicMock()
resp.status_code = status
resp.json.return_value = json_data or {}
resp.raise_for_status = MagicMock()
return resp
def _enable_persistence():
"""Return a patch context that enables managed persistence via config."""
config = {"browser": {"camofox": {"managed_persistence": True}}}
return patch("tools.browser_camofox.load_config", return_value=config)
@pytest.fixture(autouse=True)
def _clear_session_state():
import tools.browser_camofox as mod
yield
with mod._sessions_lock:
mod._sessions.clear()
mod._vnc_url = None
mod._vnc_url_checked = False
class TestManagedPersistenceToggle:
def test_disabled_by_default(self):
config = {"browser": {"camofox": {"managed_persistence": False}}}
with patch("tools.browser_camofox.load_config", return_value=config):
assert _managed_persistence_enabled() is False
def test_enabled_via_config_yaml(self):
config = {"browser": {"camofox": {"managed_persistence": True}}}
with patch("tools.browser_camofox.load_config", return_value=config):
assert _managed_persistence_enabled() is True
def test_disabled_when_key_missing(self):
config = {"browser": {}}
with patch("tools.browser_camofox.load_config", return_value=config):
assert _managed_persistence_enabled() is False
def test_disabled_on_config_load_error(self):
with patch("tools.browser_camofox.load_config", side_effect=Exception("fail")):
assert _managed_persistence_enabled() is False
class TestEphemeralMode:
"""Default behavior: random userId, no persistence."""
def test_session_gets_random_user_id(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
session = _get_session("task-1")
assert session["user_id"].startswith("hermes_")
assert session["managed"] is False
def test_different_tasks_get_different_user_ids(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
s1 = _get_session("task-1")
s2 = _get_session("task-2")
assert s1["user_id"] != s2["user_id"]
def test_session_reuse_within_same_task(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
s1 = _get_session("task-1")
s2 = _get_session("task-1")
assert s1 is s2
class TestManagedPersistenceMode:
"""With managed_persistence: stable userId derived from Hermes profile."""
def test_session_gets_stable_user_id(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
session = _get_session("task-1")
expected = get_camofox_identity("task-1")
assert session["user_id"] == expected["user_id"]
assert session["session_key"] == expected["session_key"]
assert session["managed"] is True
def test_same_user_id_after_session_drop(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
s1 = _get_session("task-1")
uid1 = s1["user_id"]
_drop_session("task-1")
s2 = _get_session("task-1")
assert s2["user_id"] == uid1
def test_same_user_id_across_tasks(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
s1 = _get_session("task-a")
s2 = _get_session("task-b")
# Same profile = same userId, different session keys
assert s1["user_id"] == s2["user_id"]
assert s1["session_key"] != s2["session_key"]
def test_different_profiles_get_different_user_ids(self, tmp_path, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "profile-a"))
s1 = _get_session("task-1")
uid_a = s1["user_id"]
_drop_session("task-1")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "profile-b"))
s2 = _get_session("task-1")
assert s2["user_id"] != uid_a
def test_navigate_uses_stable_identity(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
requests_seen = []
def _capture_post(url, json=None, timeout=None):
requests_seen.append(json)
return _mock_response(
json_data={"tabId": "tab-1", "url": "https://example.com"}
)
with _enable_persistence(), \
patch("tools.browser_camofox.requests.post", side_effect=_capture_post):
result = json.loads(camofox_navigate("https://example.com", task_id="task-1"))
assert result["success"] is True
expected = get_camofox_identity("task-1")
assert requests_seen[0]["userId"] == expected["user_id"]
def test_navigate_reuses_identity_after_close(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
requests_seen = []
def _capture_post(url, json=None, timeout=None):
requests_seen.append(json)
return _mock_response(
json_data={"tabId": f"tab-{len(requests_seen)}", "url": "https://example.com"}
)
with (
_enable_persistence(),
patch("tools.browser_camofox.requests.post", side_effect=_capture_post),
patch("tools.browser_camofox.requests.delete", return_value=_mock_response()),
):
first = json.loads(camofox_navigate("https://example.com", task_id="task-1"))
camofox_close("task-1")
second = json.loads(camofox_navigate("https://example.com", task_id="task-1"))
assert first["success"] is True
assert second["success"] is True
tab_requests = [req for req in requests_seen if "userId" in req]
assert len(tab_requests) == 2
assert tab_requests[0]["userId"] == tab_requests[1]["userId"]
class TestVncUrlDiscovery:
"""VNC URL is derived from the Camofox health endpoint."""
def test_vnc_url_from_health_port(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://myhost:9377")
health_resp = _mock_response(json_data={"ok": True, "vncPort": 6080})
with patch("tools.browser_camofox.requests.get", return_value=health_resp):
assert check_camofox_available() is True
assert get_vnc_url() == "http://myhost:6080"
def test_vnc_url_none_when_headless(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
health_resp = _mock_response(json_data={"ok": True})
with patch("tools.browser_camofox.requests.get", return_value=health_resp):
check_camofox_available()
assert get_vnc_url() is None
def test_vnc_url_rejects_invalid_port(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
health_resp = _mock_response(json_data={"ok": True, "vncPort": "bad"})
with patch("tools.browser_camofox.requests.get", return_value=health_resp):
check_camofox_available()
assert get_vnc_url() is None
def test_vnc_url_only_probed_once(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
health_resp = _mock_response(json_data={"ok": True, "vncPort": 6080})
with patch("tools.browser_camofox.requests.get", return_value=health_resp) as mock_get:
check_camofox_available()
check_camofox_available()
# Second call still hits /health for availability but doesn't re-parse vncPort
assert get_vnc_url() == "http://localhost:6080"
def test_navigate_includes_vnc_hint(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
import tools.browser_camofox as mod
mod._vnc_url = "http://localhost:6080"
mod._vnc_url_checked = True
with patch("tools.browser_camofox.requests.post", return_value=_mock_response(
json_data={"tabId": "t1", "url": "https://example.com"}
)):
result = json.loads(camofox_navigate("https://example.com", task_id="vnc-test"))
assert result["vnc_url"] == "http://localhost:6080"
assert "vnc_hint" in result
+66
View File
@@ -0,0 +1,66 @@
"""Tests for Hermes-managed Camofox state helpers."""
from unittest.mock import patch
import pytest
def _load_module():
from tools import browser_camofox_state as state
return state
class TestCamofoxStatePaths:
def test_paths_are_profile_scoped(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
assert state.get_camofox_state_dir() == tmp_path / "browser_auth" / "camofox"
class TestCamofoxIdentity:
def test_identity_is_deterministic(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
first = state.get_camofox_identity("task-1")
second = state.get_camofox_identity("task-1")
assert first == second
def test_identity_differs_by_task(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
a = state.get_camofox_identity("task-a")
b = state.get_camofox_identity("task-b")
# Same user (same profile), different session keys
assert a["user_id"] == b["user_id"]
assert a["session_key"] != b["session_key"]
def test_identity_differs_by_profile(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path / "profile-a"):
a = state.get_camofox_identity("task-1")
with patch.object(state, "get_hermes_home", return_value=tmp_path / "profile-b"):
b = state.get_camofox_identity("task-1")
assert a["user_id"] != b["user_id"]
def test_default_task_id(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
identity = state.get_camofox_identity()
assert "user_id" in identity
assert "session_key" in identity
assert identity["user_id"].startswith("hermes_")
assert identity["session_key"].startswith("task_")
class TestCamofoxConfigDefaults:
def test_default_config_includes_managed_persistence_toggle(self):
from hermes_cli.config import DEFAULT_CONFIG
browser_cfg = DEFAULT_CONFIG["browser"]
assert browser_cfg["camofox"]["managed_persistence"] is False
def test_config_version_unchanged(self):
from hermes_cli.config import DEFAULT_CONFIG
# managed_persistence is auto-merged by _deep_merge, no version bump needed
assert DEFAULT_CONFIG["_config_version"] == 11
+2 -2
View File
@@ -221,7 +221,7 @@ class TestCheckFileStalenessHelper(unittest.TestCase):
_read_tracker["t1"] = {
"last_key": None, "consecutive": 0,
"read_history": set(), "dedup": {},
"file_mtimes": {"/tmp/other.py": 12345.0},
"read_timestamps": {"/tmp/other.py": 12345.0},
}
self.assertIsNone(_check_file_staleness("/tmp/x.py", "t1"))
@@ -231,7 +231,7 @@ class TestCheckFileStalenessHelper(unittest.TestCase):
_read_tracker["t1"] = {
"last_key": None, "consecutive": 0,
"read_history": set(), "dedup": {},
"file_mtimes": {"/nonexistent/path": 99999.0},
"read_timestamps": {"/nonexistent/path": 99999.0},
}
# File doesn't exist → stat fails → returns None (let write handle it)
self.assertIsNone(_check_file_staleness("/nonexistent/path", "t1"))
+174
View File
@@ -0,0 +1,174 @@
"""Tests for skill fuzzy patching via tools.fuzzy_match."""
import json
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from tools.skill_manager_tool import (
_create_skill,
_patch_skill,
_write_file,
skill_manage,
)
SKILL_CONTENT = """\
---
name: test-skill
description: A test skill for unit testing.
---
# Test Skill
Step 1: Do the thing.
Step 2: Do another thing.
Step 3: Final step.
"""
# ---------------------------------------------------------------------------
# Fuzzy patching
# ---------------------------------------------------------------------------
class TestFuzzyPatchSkill:
@pytest.fixture(autouse=True)
def setup_skills(self, tmp_path, monkeypatch):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
monkeypatch.setattr("tools.skill_manager_tool.SKILLS_DIR", skills_dir)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
self.skills_dir = skills_dir
def test_exact_match_still_works(self):
_create_skill("test-skill", SKILL_CONTENT)
result = _patch_skill("test-skill", "Step 1: Do the thing.", "Step 1: Done!")
assert result["success"] is True
content = (self.skills_dir / "test-skill" / "SKILL.md").read_text()
assert "Step 1: Done!" in content
def test_whitespace_trimmed_match(self):
"""Patch with extra leading whitespace should still find the target."""
skill = """\
---
name: ws-skill
description: Whitespace test
---
# Commands
def hello():
print("hi")
"""
_create_skill("ws-skill", skill)
# Agent sends patch with no leading whitespace (common LLM behaviour)
result = _patch_skill("ws-skill", "def hello():\n print(\"hi\")", "def hello():\n print(\"hello world\")")
assert result["success"] is True
content = (self.skills_dir / "ws-skill" / "SKILL.md").read_text()
assert 'print("hello world")' in content
def test_indentation_flexible_match(self):
"""Patch where only indentation differs should succeed."""
skill = """\
---
name: indent-skill
description: Indentation test
---
# Steps
1. First step
2. Second step
3. Third step
"""
_create_skill("indent-skill", skill)
# Agent sends with different indentation
result = _patch_skill(
"indent-skill",
"1. First step\n2. Second step",
"1. Updated first\n2. Updated second"
)
assert result["success"] is True
content = (self.skills_dir / "indent-skill" / "SKILL.md").read_text()
assert "Updated first" in content
def test_multiple_matches_blocked_without_replace_all(self):
"""Multiple fuzzy matches should return an error without replace_all."""
skill = """\
---
name: dup-skill
description: Duplicate test
---
# Steps
word word word
"""
_create_skill("dup-skill", skill)
result = _patch_skill("dup-skill", "word", "replaced")
assert result["success"] is False
assert "match" in result["error"].lower()
def test_replace_all_with_fuzzy(self):
skill = """\
---
name: dup-skill
description: Duplicate test
---
# Steps
word word word
"""
_create_skill("dup-skill", skill)
result = _patch_skill("dup-skill", "word", "replaced", replace_all=True)
assert result["success"] is True
content = (self.skills_dir / "dup-skill" / "SKILL.md").read_text()
assert "word" not in content
assert "replaced" in content
def test_no_match_returns_preview(self):
_create_skill("test-skill", SKILL_CONTENT)
result = _patch_skill("test-skill", "this does not exist anywhere", "replacement")
assert result["success"] is False
assert "file_preview" in result
def test_fuzzy_patch_on_supporting_file(self):
"""Fuzzy matching should also work on supporting files."""
_create_skill("test-skill", SKILL_CONTENT)
ref_content = " function hello() {\n console.log('hi');\n }"
_write_file("test-skill", "references/code.js", ref_content)
# Patch with stripped indentation
result = _patch_skill(
"test-skill",
"function hello() {\nconsole.log('hi');\n}",
"function hello() {\nconsole.log('hello world');\n}",
file_path="references/code.js"
)
assert result["success"] is True
content = (self.skills_dir / "test-skill" / "references" / "code.js").read_text()
assert "hello world" in content
def test_patch_preserves_frontmatter_validation(self):
"""Fuzzy matching should still run frontmatter validation on SKILL.md."""
_create_skill("test-skill", SKILL_CONTENT)
# Try to destroy the frontmatter via patch
result = _patch_skill("test-skill", "---\nname: test-skill", "BROKEN")
assert result["success"] is False
assert "structure" in result["error"].lower() or "frontmatter" in result["error"].lower()
def test_skill_manage_patch_uses_fuzzy(self):
"""The dispatcher should route to the fuzzy-matching patch."""
_create_skill("test-skill", SKILL_CONTENT)
raw = skill_manage(
action="patch",
name="test-skill",
old_string=" Step 1: Do the thing.", # extra leading space
new_string="Step 1: Updated.",
)
result = json.loads(raw)
# Should succeed via line-trimmed or indentation-flexible matching
assert result["success"] is True
+2 -2
View File
@@ -271,7 +271,7 @@ class TestPatchSkill:
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _patch_skill("my-skill", "this text does not exist", "replacement")
assert result["success"] is False
assert "not found" in result["error"]
assert "not found" in result["error"].lower() or "could not find" in result["error"].lower()
def test_patch_ambiguous_match_rejected(self, tmp_path):
content = """\
@@ -288,7 +288,7 @@ word word
_create_skill("my-skill", content)
result = _patch_skill("my-skill", "word", "replaced")
assert result["success"] is False
assert "matched" in result["error"]
assert "match" in result["error"].lower()
def test_patch_replace_all(self, tmp_path):
content = """\
+215
View File
@@ -0,0 +1,215 @@
"""Tests for skill content size limits.
Agent writes (create/edit/patch/write_file) are constrained to
MAX_SKILL_CONTENT_CHARS (100k) and MAX_SKILL_FILE_BYTES (1 MiB).
Hand-placed and hub-installed skills have no hard limit.
"""
import json
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from tools.skill_manager_tool import (
MAX_SKILL_CONTENT_CHARS,
MAX_SKILL_FILE_BYTES,
_validate_content_size,
skill_manage,
)
@pytest.fixture(autouse=True)
def isolate_skills(tmp_path, monkeypatch):
"""Redirect SKILLS_DIR to a temp directory."""
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
monkeypatch.setattr("tools.skill_manager_tool.SKILLS_DIR", skills_dir)
monkeypatch.setattr("tools.skills_tool.SKILLS_DIR", skills_dir)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
return skills_dir
def _make_skill_content(body_chars: int) -> str:
"""Generate valid SKILL.md content with a body of the given character count."""
frontmatter = (
"---\n"
"name: test-skill\n"
"description: A test skill\n"
"---\n"
)
body = "# Test Skill\n\n" + ("x" * max(0, body_chars - 15))
return frontmatter + body
class TestValidateContentSize:
"""Unit tests for _validate_content_size."""
def test_within_limit(self):
assert _validate_content_size("a" * 1000) is None
def test_at_limit(self):
assert _validate_content_size("a" * MAX_SKILL_CONTENT_CHARS) is None
def test_over_limit(self):
err = _validate_content_size("a" * (MAX_SKILL_CONTENT_CHARS + 1))
assert err is not None
assert "100,001" in err
assert "100,000" in err
def test_custom_label(self):
err = _validate_content_size("a" * (MAX_SKILL_CONTENT_CHARS + 1), label="references/api.md")
assert "references/api.md" in err
class TestCreateSkillSizeLimit:
"""create action rejects oversized content."""
def test_create_within_limit(self, isolate_skills):
content = _make_skill_content(5000)
result = json.loads(skill_manage(action="create", name="small-skill", content=content))
assert result["success"] is True
def test_create_over_limit(self, isolate_skills):
content = _make_skill_content(MAX_SKILL_CONTENT_CHARS + 100)
result = json.loads(skill_manage(action="create", name="huge-skill", content=content))
assert result["success"] is False
assert "100,000" in result["error"]
def test_create_at_limit(self, isolate_skills):
# Content at exactly the limit should succeed
frontmatter = "---\nname: edge-skill\ndescription: Edge case\n---\n# Edge\n\n"
body_budget = MAX_SKILL_CONTENT_CHARS - len(frontmatter)
content = frontmatter + ("x" * body_budget)
assert len(content) == MAX_SKILL_CONTENT_CHARS
result = json.loads(skill_manage(action="create", name="edge-skill", content=content))
assert result["success"] is True
class TestEditSkillSizeLimit:
"""edit action rejects oversized content."""
def test_edit_over_limit(self, isolate_skills):
# Create a small skill first
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="grow-me", content=small))
# Try to edit it to be oversized
big = _make_skill_content(MAX_SKILL_CONTENT_CHARS + 100)
# Fix the name in frontmatter
big = big.replace("name: test-skill", "name: grow-me")
result = json.loads(skill_manage(action="edit", name="grow-me", content=big))
assert result["success"] is False
assert "100,000" in result["error"]
class TestPatchSkillSizeLimit:
"""patch action checks resulting size, not just the new_string."""
def test_patch_that_would_exceed_limit(self, isolate_skills):
# Create a skill near the limit
near_limit = _make_skill_content(MAX_SKILL_CONTENT_CHARS - 50)
json.loads(skill_manage(action="create", name="near-limit", content=near_limit))
# Patch that adds enough to go over
result = json.loads(skill_manage(
action="patch",
name="near-limit",
old_string="# Test Skill",
new_string="# Test Skill\n" + ("y" * 200),
))
assert result["success"] is False
assert "100,000" in result["error"]
def test_patch_that_reduces_size_on_oversized_skill(self, isolate_skills, tmp_path):
"""Patches that shrink an already-oversized skill should succeed."""
# Manually create an oversized skill (simulating hand-placed)
skill_dir = tmp_path / "skills" / "bloated"
skill_dir.mkdir(parents=True)
oversized = _make_skill_content(MAX_SKILL_CONTENT_CHARS + 5000)
oversized = oversized.replace("name: test-skill", "name: bloated")
(skill_dir / "SKILL.md").write_text(oversized, encoding="utf-8")
assert len(oversized) > MAX_SKILL_CONTENT_CHARS
# Patch that removes content to bring it under the limit.
# Use replace_all to replace the repeated x's with a shorter string.
result = json.loads(skill_manage(
action="patch",
name="bloated",
old_string="x" * 100,
new_string="y",
replace_all=True,
))
# Should succeed because the result is well within limits
assert result["success"] is True
def test_patch_supporting_file_size_limit(self, isolate_skills):
"""Patch on a supporting file also checks size."""
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="with-ref", content=small))
# Create a supporting file
json.loads(skill_manage(
action="write_file",
name="with-ref",
file_path="references/data.md",
file_content="# Data\n\nSmall content.",
))
# Try to patch it to be oversized
result = json.loads(skill_manage(
action="patch",
name="with-ref",
old_string="Small content.",
new_string="x" * (MAX_SKILL_CONTENT_CHARS + 100),
file_path="references/data.md",
))
assert result["success"] is False
assert "references/data.md" in result["error"]
class TestWriteFileSizeLimit:
"""write_file action enforces both char and byte limits."""
def test_write_file_over_char_limit(self, isolate_skills):
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="file-test", content=small))
result = json.loads(skill_manage(
action="write_file",
name="file-test",
file_path="references/huge.md",
file_content="x" * (MAX_SKILL_CONTENT_CHARS + 1),
))
assert result["success"] is False
assert "100,000" in result["error"]
def test_write_file_within_limit(self, isolate_skills):
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="file-ok", content=small))
result = json.loads(skill_manage(
action="write_file",
name="file-ok",
file_path="references/normal.md",
file_content="# Normal\n\n" + ("x" * 5000),
))
assert result["success"] is True
class TestHandPlacedSkillsNoLimit:
"""Skills dropped directly on disk are not constrained."""
def test_oversized_handplaced_skill_loads(self, isolate_skills, tmp_path):
"""A hand-placed 200k skill can still be read via skill_view."""
from tools.skills_tool import skill_view
skill_dir = tmp_path / "skills" / "manual-giant"
skill_dir.mkdir(parents=True)
huge = _make_skill_content(200_000)
huge = huge.replace("name: test-skill", "name: manual-giant")
(skill_dir / "SKILL.md").write_text(huge, encoding="utf-8")
result = json.loads(skill_view("manual-giant"))
assert "content" in result
# The full content is returned — no truncation at the storage layer
assert len(result["content"]) > MAX_SKILL_CONTENT_CHARS
+74 -9
View File
@@ -34,6 +34,9 @@ from typing import Any, Dict, Optional
import requests
from hermes_cli.config import load_config
from tools.browser_camofox_state import get_camofox_identity
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
@@ -42,6 +45,8 @@ logger = logging.getLogger(__name__)
_DEFAULT_TIMEOUT = 30 # seconds per HTTP request
_SNAPSHOT_MAX_CHARS = 80_000 # camofox paginates at this limit
_vnc_url: Optional[str] = None # cached from /health response
_vnc_url_checked = False # only probe once per process
def get_camofox_url() -> str:
@@ -56,16 +61,52 @@ def is_camofox_mode() -> bool:
def check_camofox_available() -> bool:
"""Verify the Camofox server is reachable."""
global _vnc_url, _vnc_url_checked
url = get_camofox_url()
if not url:
return False
try:
resp = requests.get(f"{url}/health", timeout=5)
if resp.status_code == 200 and not _vnc_url_checked:
try:
data = resp.json()
vnc_port = data.get("vncPort")
if isinstance(vnc_port, int) and 1 <= vnc_port <= 65535:
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname or "localhost"
_vnc_url = f"http://{host}:{vnc_port}"
except (ValueError, KeyError):
pass
_vnc_url_checked = True
return resp.status_code == 200
except Exception:
return False
def get_vnc_url() -> Optional[str]:
"""Return the VNC URL if the Camofox server exposes one, or None."""
if not _vnc_url_checked:
check_camofox_available()
return _vnc_url
def _managed_persistence_enabled() -> bool:
"""Return whether Hermes-managed persistence is enabled for Camofox.
When enabled, sessions use a stable profile-scoped userId so the
Camofox server can map it to a persistent browser profile directory.
When disabled (default), each session gets a random userId (ephemeral).
Controlled by ``browser.camofox.managed_persistence`` in config.yaml.
"""
try:
camofox_cfg = load_config().get("browser", {}).get("camofox", {})
except Exception:
return False
return bool(camofox_cfg.get("managed_persistence"))
# ---------------------------------------------------------------------------
# Session management
# ---------------------------------------------------------------------------
@@ -75,16 +116,31 @@ _sessions_lock = threading.Lock()
def _get_session(task_id: Optional[str]) -> Dict[str, Any]:
"""Get or create a camofox session for the given task."""
"""Get or create a camofox session for the given task.
When managed persistence is enabled, uses a deterministic userId
derived from the Hermes profile so the Camofox server can map it
to the same persistent browser profile across restarts.
"""
task_id = task_id or "default"
with _sessions_lock:
if task_id in _sessions:
return _sessions[task_id]
session = {
"user_id": f"hermes_{uuid.uuid4().hex[:10]}",
"tab_id": None,
"session_key": f"task_{task_id[:16]}",
}
if _managed_persistence_enabled():
identity = get_camofox_identity(task_id)
session = {
"user_id": identity["user_id"],
"tab_id": None,
"session_key": identity["session_key"],
"managed": True,
}
else:
session = {
"user_id": f"hermes_{uuid.uuid4().hex[:10]}",
"tab_id": None,
"session_key": f"task_{task_id[:16]}",
"managed": False,
}
_sessions[task_id] = session
return session
@@ -172,11 +228,19 @@ def camofox_navigate(url: str, task_id: Optional[str] = None) -> str:
{"userId": session["user_id"], "url": url},
timeout=60,
)
return json.dumps({
result = {
"success": True,
"url": data.get("url", url),
"title": data.get("title", ""),
})
}
vnc = get_vnc_url()
if vnc:
result["vnc_url"] = vnc
result["vnc_hint"] = (
"Browser is visible via VNC. "
"Share this link with the user so they can watch the browser live."
)
return json.dumps(result)
except requests.HTTPError as e:
return json.dumps({"success": False, "error": f"Navigation failed: {e}"})
except requests.ConnectionError:
@@ -436,7 +500,7 @@ def camofox_vision(question: str, annotate: bool = False,
except Exception:
_vision_timeout = 120
analysis = call_llm(
response = call_llm(
messages=[{
"role": "user",
"content": [
@@ -452,6 +516,7 @@ def camofox_vision(question: str, annotate: bool = False,
task="vision",
timeout=_vision_timeout,
)
analysis = response.choices[0].message.content if response.choices else ""
return json.dumps({
"success": True,
+47
View File
@@ -0,0 +1,47 @@
"""Hermes-managed Camofox state helpers.
Provides profile-scoped identity and state directory paths for Camofox
persistent browser profiles. When managed persistence is enabled, Hermes
sends a deterministic userId derived from the active profile so that
Camofox can map it to the same persistent browser profile directory
across restarts.
"""
from __future__ import annotations
import uuid
from pathlib import Path
from typing import Dict, Optional
from hermes_constants import get_hermes_home
CAMOFOX_STATE_DIR_NAME = "browser_auth"
CAMOFOX_STATE_SUBDIR = "camofox"
def get_camofox_state_dir() -> Path:
"""Return the profile-scoped root directory for Camofox persistence."""
return get_hermes_home() / CAMOFOX_STATE_DIR_NAME / CAMOFOX_STATE_SUBDIR
def get_camofox_identity(task_id: Optional[str] = None) -> Dict[str, str]:
"""Return the stable Hermes-managed Camofox identity for this profile.
The user identity is profile-scoped (same Hermes profile = same userId).
The session key is scoped to the logical browser task so newly created
tabs within the same profile reuse the same identity contract.
"""
scope_root = str(get_camofox_state_dir())
logical_scope = task_id or "default"
user_digest = uuid.uuid5(
uuid.NAMESPACE_URL,
f"camofox-user:{scope_root}",
).hex[:10]
session_digest = uuid.uuid5(
uuid.NAMESPACE_URL,
f"camofox-session:{scope_root}:{logical_scope}",
).hex[:16]
return {
"user_id": f"hermes_{user_digest}",
"session_key": f"task_{session_digest}",
}
+8
View File
@@ -596,6 +596,14 @@ def execute_code(
stdout_text = strip_ansi(stdout_text)
stderr_text = strip_ansi(stderr_text)
# Redact secrets (API keys, tokens, etc.) from sandbox output.
# The sandbox env-var filter (lines 434-454) blocks os.environ access,
# but scripts can still read secrets from disk (e.g. open('~/.hermes/.env')).
# This ensures leaked secrets never enter the model context.
from agent.redact import redact_sensitive_text
stdout_text = redact_sensitive_text(stdout_text)
stderr_text = redact_sensitive_text(stderr_text)
# Build response
result: Dict[str, Any] = {
"status": status,
+34 -5
View File
@@ -136,9 +136,12 @@ _file_ops_cache: dict = {}
# Used to skip re-reads of unchanged files. Reset on
# context compression (the original content is summarised
# away so the model needs the full content again).
# "file_mtimes": dict mapping resolved_path → mtime float at last read.
# Used by write_file and patch to detect when a file was
# modified externally between the agent's read and write.
# "read_timestamps": dict mapping resolved_path → modification-time float
# recorded when the file was last read (or written) by
# this task. Used by write_file and patch to detect
# external changes between the agent's read and write.
# Updated after successful writes so consecutive edits
# by the same task don't trigger false warnings.
_read_tracker_lock = threading.Lock()
_read_tracker: dict = {}
@@ -401,7 +404,7 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
try:
_mtime_now = os.path.getmtime(resolved_str)
task_data["dedup"][dedup_key] = _mtime_now
task_data.setdefault("file_mtimes", {})[resolved_str] = _mtime_now
task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now
except OSError:
pass # Can't stat — skip tracking for this entry
@@ -500,6 +503,24 @@ def notify_other_tool_call(task_id: str = "default"):
task_data["consecutive"] = 0
def _update_read_timestamp(filepath: str, task_id: str) -> None:
"""Record the file's current modification time after a successful write.
Called after write_file and patch so that consecutive edits by the
same task don't trigger false staleness warnings — each write
refreshes the stored timestamp to match the file's new state.
"""
try:
resolved = str(Path(filepath).expanduser().resolve())
current_mtime = os.path.getmtime(resolved)
except (OSError, ValueError):
return
with _read_tracker_lock:
task_data = _read_tracker.get(task_id)
if task_data is not None:
task_data.setdefault("read_timestamps", {})[resolved] = current_mtime
def _check_file_staleness(filepath: str, task_id: str) -> str | None:
"""Check whether a file was modified since the agent last read it.
@@ -515,7 +536,7 @@ def _check_file_staleness(filepath: str, task_id: str) -> str | None:
task_data = _read_tracker.get(task_id)
if not task_data:
return None
read_mtime = task_data.get("file_mtimes", {}).get(resolved)
read_mtime = task_data.get("read_timestamps", {}).get(resolved)
if read_mtime is None:
return None # File was never read — nothing to compare against
try:
@@ -543,6 +564,9 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning
# Refresh the stored timestamp so consecutive writes by this
# task don't trigger false staleness warnings.
_update_read_timestamp(path, task_id)
return json.dumps(result_dict, ensure_ascii=False)
except Exception as e:
if _is_expected_write_exception(e):
@@ -594,6 +618,11 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_dict = result.to_dict()
if stale_warnings:
result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings)
# Refresh stored timestamps for all successfully-patched paths so
# consecutive edits by this task don't trigger false warnings.
if not result_dict.get("error"):
for _p in _paths_to_check:
_update_read_timestamp(_p, task_id)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.
+57 -16
View File
@@ -82,6 +82,8 @@ SKILLS_DIR = HERMES_HOME / "skills"
MAX_NAME_LENGTH = 64
MAX_DESCRIPTION_LENGTH = 1024
MAX_SKILL_CONTENT_CHARS = 100_000 # ~36k tokens at 2.75 chars/token
MAX_SKILL_FILE_BYTES = 1_048_576 # 1 MiB per supporting file
# Characters allowed in skill names (filesystem-safe, URL-friendly)
VALID_NAME_RE = re.compile(r'^[a-z0-9][a-z0-9._-]*$')
@@ -177,6 +179,21 @@ def _validate_frontmatter(content: str) -> Optional[str]:
return None
def _validate_content_size(content: str, label: str = "SKILL.md") -> Optional[str]:
"""Check that content doesn't exceed the character limit for agent writes.
Returns an error message or None if within bounds.
"""
if len(content) > MAX_SKILL_CONTENT_CHARS:
return (
f"{label} content is {len(content):,} characters "
f"(limit: {MAX_SKILL_CONTENT_CHARS:,}). "
f"Consider splitting into a smaller SKILL.md with supporting files "
f"in references/ or templates/."
)
return None
def _resolve_skill_dir(name: str, category: str = None) -> Path:
"""Build the directory path for a new skill, optionally under a category."""
if category:
@@ -275,6 +292,10 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
if err:
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
return {"success": False, "error": err}
# Check for name collisions across all directories
existing = _find_skill(name)
if existing:
@@ -318,6 +339,10 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
if err:
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
return {"success": False, "error": err}
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
@@ -379,27 +404,29 @@ def _patch_skill(
content = target.read_text(encoding="utf-8")
count = content.count(old_string)
if count == 0:
# Use the same fuzzy matching engine as the file patch tool.
# This handles whitespace normalization, indentation differences,
# escape sequences, and block-anchor matching — saving the agent
# from exact-match failures on minor formatting mismatches.
from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, match_error = fuzzy_find_and_replace(
content, old_string, new_string, replace_all
)
if match_error:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
return {
"success": False,
"error": "old_string not found in the file.",
"error": match_error,
"file_preview": preview,
}
if count > 1 and not replace_all:
return {
"success": False,
"error": (
f"old_string matched {count} times. Provide more surrounding context "
f"to make the match unique, or set replace_all=true to replace all occurrences."
),
"match_count": count,
}
new_content = content.replace(old_string, new_string) if replace_all else content.replace(old_string, new_string, 1)
# Check size limit on the result
target_label = "SKILL.md" if not file_path else file_path
err = _validate_content_size(new_content, label=target_label)
if err:
return {"success": False, "error": err}
# If patching SKILL.md, validate frontmatter is still intact
if not file_path:
@@ -419,10 +446,9 @@ def _patch_skill(
_atomic_write_text(target, original_content)
return {"success": False, "error": scan_error}
replacements = count if replace_all else 1
return {
"success": True,
"message": f"Patched {'SKILL.md' if not file_path else file_path} in skill '{name}' ({replacements} replacement{'s' if replacements > 1 else ''}).",
"message": f"Patched {'SKILL.md' if not file_path else file_path} in skill '{name}' ({match_count} replacement{'s' if match_count > 1 else ''}).",
}
@@ -455,6 +481,21 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
if not file_content and file_content != "":
return {"success": False, "error": "file_content is required."}
# Check size limits
content_bytes = len(file_content.encode("utf-8"))
if content_bytes > MAX_SKILL_FILE_BYTES:
return {
"success": False,
"error": (
f"File content is {content_bytes:,} bytes "
f"(limit: {MAX_SKILL_FILE_BYTES:,} bytes / 1 MiB). "
f"Consider splitting into smaller files."
),
}
err = _validate_content_size(file_content, label=file_path)
if err:
return {"success": False, "error": err}
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Create it first with action='create'."}
+16
View File
@@ -2525,6 +2525,22 @@ def install_from_quarantine(
if install_dir.exists():
shutil.rmtree(install_dir)
# Warn (but don't block) if SKILL.md is very large
skill_md = quarantine_path / "SKILL.md"
if skill_md.exists():
try:
skill_size = skill_md.stat().st_size
if skill_size > 100_000:
logger.warning(
"Skill '%s' has a large SKILL.md (%s chars). "
"Large skills consume significant context when loaded. "
"Consider asking the author to split it into smaller files.",
safe_skill_name,
f"{skill_size:,}",
)
except OSError:
pass
install_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(quarantine_path), str(install_dir))
@@ -85,6 +85,7 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
| `BROWSERBASE_PROJECT_ID` | Browserbase project ID |
| `BROWSER_USE_API_KEY` | Browser Use cloud browser API key ([browser-use.com](https://browser-use.com/)) |
| `BROWSER_CDP_URL` | Chrome DevTools Protocol URL for local browser (set via `/browser connect`, e.g. `ws://localhost:9222`) |
| `CAMOFOX_URL` | Camofox local anti-detection browser URL (default: `http://localhost:9377`) |
| `BROWSER_INACTIVITY_TIMEOUT` | Browser session inactivity timeout in seconds |
| `FAL_KEY` | Image generation ([fal.ai](https://fal.ai/)) |
| `GROQ_API_KEY` | Groq Whisper STT API key ([groq.com](https://groq.com/)) |
+2
View File
@@ -1016,6 +1016,8 @@ browser:
inactivity_timeout: 120 # Seconds before auto-closing idle sessions
command_timeout: 30 # Timeout in seconds for browser commands (screenshot, navigate, etc.)
record_sessions: false # Auto-record browser sessions as WebM videos to ~/.hermes/browser_recordings/
camofox:
managed_persistence: false # When true, Camofox sessions persist cookies/logins across restarts
```
The browser toolset supports multiple providers. See the [Browser feature page](/docs/user-guide/features/browser) for details on Browserbase, Browser Use, and local Chrome CDP setup.
@@ -11,6 +11,7 @@ Hermes Agent includes a full browser automation toolset with multiple backend op
- **Browserbase cloud mode** via [Browserbase](https://browserbase.com) for managed cloud browsers and anti-bot tooling
- **Browser Use cloud mode** via [Browser Use](https://browser-use.com) as an alternative cloud browser provider
- **Camofox local mode** via [Camofox](https://github.com/jo-inc/camofox-browser) for local anti-detection browsing (Firefox-based fingerprint spoofing)
- **Local Chrome via CDP** — connect browser tools to your own Chrome instance using `/browser connect`
- **Local browser mode** via the `agent-browser` CLI and a local Chromium installation
@@ -54,6 +55,50 @@ BROWSER_USE_API_KEY=***
Get your API key at [browser-use.com](https://browser-use.com). Browser Use provides a cloud browser via its REST API. If both Browserbase and Browser Use credentials are set, Browserbase takes priority.
### Camofox local mode
[Camofox](https://github.com/jo-inc/camofox-browser) is a self-hosted Node.js server wrapping Camoufox (a Firefox fork with C++ fingerprint spoofing). It provides local anti-detection browsing without cloud dependencies.
```bash
# Install and run
git clone https://github.com/jo-inc/camofox-browser && cd camofox-browser
npm install && npm start # downloads Camoufox (~300MB) on first run
# Or via Docker
docker run -d --network host -e CAMOFOX_PORT=9377 jo-inc/camofox-browser
```
Then set in `~/.hermes/.env`:
```bash
CAMOFOX_URL=http://localhost:9377
```
Or configure via `hermes tools` → Browser Automation → Camofox.
When `CAMOFOX_URL` is set, all browser tools automatically route through Camofox instead of Browserbase or agent-browser.
#### Persistent browser sessions
By default, each Camofox session gets a random identity — cookies and logins don't survive across agent restarts. To enable persistent browser sessions:
```yaml
# In ~/.hermes/config.yaml
browser:
camofox:
managed_persistence: true
```
When enabled, Hermes sends a stable profile-scoped identity to Camofox. The Camofox server maps this identity to a persistent browser profile directory, so cookies, logins, and localStorage survive across restarts. Different Hermes profiles get different browser profiles (profile isolation).
:::note
The Camofox server must also be configured with `CAMOFOX_PROFILE_DIR` on the server side for persistence to work.
:::
#### VNC live view
When Camofox runs in headed mode (with a visible browser window), it exposes a VNC port in its health check response. Hermes automatically discovers this and includes the VNC URL in navigation responses, so the agent can share a link for you to watch the browser live.
### Local Chrome via CDP (`/browser connect`)
Instead of a cloud provider, you can attach Hermes browser tools to your own running Chrome instance via the Chrome DevTools Protocol (CDP). This is useful when you want to see what the agent is doing in real-time, interact with pages that require your own cookies/sessions, or avoid cloud browser costs.
+8
View File
@@ -67,6 +67,14 @@
border-bottom: 1px solid rgba(255, 215, 0, 0.08);
}
/* backdrop-filter creates a stacking context that hides
.navbar-sidebar menu content (Docusaurus #6996). Remove it
while the mobile sidebar is open both classes live on the
same <nav> element. */
.navbar.navbar-sidebar--show {
backdrop-filter: none;
}
.navbar__title {
font-weight: 600;
letter-spacing: -0.02em;