Compare commits

..

1 Commits

Author SHA1 Message Date
Laura Batalha 7d60316c99 feat(discord): only create threads and reactions for authorized users 2026-03-31 18:52:59 -07:00
39 changed files with 73 additions and 3128 deletions
-4
View File
@@ -267,10 +267,6 @@ class CredentialPool:
def has_credentials(self) -> bool:
return bool(self._entries)
def has_available(self) -> bool:
"""True if at least one entry is not currently in exhaustion cooldown."""
return bool(self._available_entries())
def entries(self) -> List[PooledCredential]:
return list(self._entries)
-313
View File
@@ -10,9 +10,6 @@ import os
import sys
import threading
import time
from dataclasses import dataclass, field
from difflib import unified_diff
from pathlib import Path
# ANSI escape codes for coloring tool failure indicators
_RED = "\033[31m"
@@ -20,22 +17,6 @@ _RESET = "\033[0m"
logger = logging.getLogger(__name__)
_ANSI_RESET = "\033[0m"
_ANSI_DIM = "\033[38;2;150;150;150m"
_ANSI_FILE = "\033[38;2;180;160;255m"
_ANSI_HUNK = "\033[38;2;120;120;140m"
_ANSI_MINUS = "\033[38;2;255;255;255;48;2;120;20;20m"
_ANSI_PLUS = "\033[38;2;255;255;255;48;2;20;90;20m"
_MAX_INLINE_DIFF_FILES = 6
_MAX_INLINE_DIFF_LINES = 80
@dataclass
class LocalEditSnapshot:
"""Pre-tool filesystem snapshot used to render diffs locally after writes."""
paths: list[Path] = field(default_factory=list)
before: dict[str, str | None] = field(default_factory=dict)
# =========================================================================
# Configurable tool preview length (0 = no limit)
# Set once at startup by CLI or gateway from display.tool_preview_length config.
@@ -237,300 +218,6 @@ def build_tool_preview(tool_name: str, args: dict, max_len: int | None = None) -
return preview
# =========================================================================
# Inline diff previews for write actions
# =========================================================================
def _resolved_path(path: str) -> Path:
"""Resolve a possibly-relative filesystem path against the current cwd."""
candidate = Path(os.path.expanduser(path))
if candidate.is_absolute():
return candidate
return Path.cwd() / candidate
def _snapshot_text(path: Path) -> str | None:
"""Return UTF-8 file content, or None for missing/unreadable files."""
try:
return path.read_text(encoding="utf-8")
except (FileNotFoundError, IsADirectoryError, UnicodeDecodeError, OSError):
return None
def _display_diff_path(path: Path) -> str:
"""Prefer cwd-relative paths in diffs when available."""
try:
return str(path.resolve().relative_to(Path.cwd().resolve()))
except Exception:
return str(path)
def _resolve_skill_manage_paths(args: dict) -> list[Path]:
"""Resolve skill_manage write targets to filesystem paths."""
action = args.get("action")
name = args.get("name")
if not action or not name:
return []
from tools.skill_manager_tool import _find_skill, _resolve_skill_dir
if action == "create":
skill_dir = _resolve_skill_dir(name, args.get("category"))
return [skill_dir / "SKILL.md"]
existing = _find_skill(name)
if not existing:
return []
skill_dir = Path(existing["path"])
if action in {"edit", "patch"}:
file_path = args.get("file_path")
return [skill_dir / file_path] if file_path else [skill_dir / "SKILL.md"]
if action in {"write_file", "remove_file"}:
file_path = args.get("file_path")
return [skill_dir / file_path] if file_path else []
if action == "delete":
files = [path for path in sorted(skill_dir.rglob("*")) if path.is_file()]
return files
return []
def _resolve_local_edit_paths(tool_name: str, function_args: dict | None) -> list[Path]:
"""Resolve local filesystem targets for write-capable tools."""
if not isinstance(function_args, dict):
return []
if tool_name == "write_file":
path = function_args.get("path")
return [_resolved_path(path)] if path else []
if tool_name == "patch":
path = function_args.get("path")
return [_resolved_path(path)] if path else []
if tool_name == "skill_manage":
return _resolve_skill_manage_paths(function_args)
return []
def capture_local_edit_snapshot(tool_name: str, function_args: dict | None) -> LocalEditSnapshot | None:
"""Capture before-state for local write previews."""
paths = _resolve_local_edit_paths(tool_name, function_args)
if not paths:
return None
snapshot = LocalEditSnapshot(paths=paths)
for path in paths:
snapshot.before[str(path)] = _snapshot_text(path)
return snapshot
def _result_succeeded(result: str | None) -> bool:
"""Conservatively detect whether a tool result represents success."""
if not result:
return False
try:
data = json.loads(result)
except (json.JSONDecodeError, TypeError):
return False
if not isinstance(data, dict):
return False
if data.get("error"):
return False
if "success" in data:
return bool(data.get("success"))
return True
def _diff_from_snapshot(snapshot: LocalEditSnapshot | None) -> str | None:
"""Generate unified diff text from a stored before-state and current files."""
if not snapshot:
return None
chunks: list[str] = []
for path in snapshot.paths:
before = snapshot.before.get(str(path))
after = _snapshot_text(path)
if before == after:
continue
display_path = _display_diff_path(path)
diff = "".join(
unified_diff(
[] if before is None else before.splitlines(keepends=True),
[] if after is None else after.splitlines(keepends=True),
fromfile=f"a/{display_path}",
tofile=f"b/{display_path}",
)
)
if diff:
chunks.append(diff)
if not chunks:
return None
return "".join(chunk if chunk.endswith("\n") else chunk + "\n" for chunk in chunks)
def extract_edit_diff(
tool_name: str,
result: str | None,
*,
function_args: dict | None = None,
snapshot: LocalEditSnapshot | None = None,
) -> str | None:
"""Extract a unified diff from a file-edit tool result."""
if tool_name == "patch" and result:
try:
data = json.loads(result)
except (json.JSONDecodeError, TypeError):
data = None
if isinstance(data, dict):
diff = data.get("diff")
if isinstance(diff, str) and diff.strip():
return diff
if tool_name not in {"write_file", "patch", "skill_manage"}:
return None
if not _result_succeeded(result):
return None
return _diff_from_snapshot(snapshot)
def _emit_inline_diff(diff_text: str, print_fn) -> bool:
"""Emit rendered diff text through the CLI's prompt_toolkit-safe printer."""
if print_fn is None or not diff_text:
return False
try:
print_fn(" ┊ review diff")
for line in diff_text.rstrip("\n").splitlines():
print_fn(line)
return True
except Exception:
return False
def _render_inline_unified_diff(diff: str) -> list[str]:
"""Render unified diff lines in Hermes' inline transcript style."""
rendered: list[str] = []
from_file = None
to_file = None
for raw_line in diff.splitlines():
if raw_line.startswith("--- "):
from_file = raw_line[4:].strip()
continue
if raw_line.startswith("+++ "):
to_file = raw_line[4:].strip()
if from_file or to_file:
rendered.append(f"{_ANSI_FILE}{from_file or 'a/?'}{to_file or 'b/?'}{_ANSI_RESET}")
continue
if raw_line.startswith("@@"):
rendered.append(f"{_ANSI_HUNK}{raw_line}{_ANSI_RESET}")
continue
if raw_line.startswith("-"):
rendered.append(f"{_ANSI_MINUS}{raw_line}{_ANSI_RESET}")
continue
if raw_line.startswith("+"):
rendered.append(f"{_ANSI_PLUS}{raw_line}{_ANSI_RESET}")
continue
if raw_line.startswith(" "):
rendered.append(f"{_ANSI_DIM}{raw_line}{_ANSI_RESET}")
continue
if raw_line:
rendered.append(raw_line)
return rendered
def _split_unified_diff_sections(diff: str) -> list[str]:
"""Split a unified diff into per-file sections."""
sections: list[list[str]] = []
current: list[str] = []
for line in diff.splitlines():
if line.startswith("--- ") and current:
sections.append(current)
current = [line]
continue
current.append(line)
if current:
sections.append(current)
return ["\n".join(section) for section in sections if section]
def _summarize_rendered_diff_sections(
diff: str,
*,
max_files: int = _MAX_INLINE_DIFF_FILES,
max_lines: int = _MAX_INLINE_DIFF_LINES,
) -> list[str]:
"""Render diff sections while capping file count and total line count."""
sections = _split_unified_diff_sections(diff)
rendered: list[str] = []
omitted_files = 0
omitted_lines = 0
for idx, section in enumerate(sections):
if idx >= max_files:
omitted_files += 1
omitted_lines += len(_render_inline_unified_diff(section))
continue
section_lines = _render_inline_unified_diff(section)
remaining_budget = max_lines - len(rendered)
if remaining_budget <= 0:
omitted_lines += len(section_lines)
omitted_files += 1
continue
if len(section_lines) <= remaining_budget:
rendered.extend(section_lines)
continue
rendered.extend(section_lines[:remaining_budget])
omitted_lines += len(section_lines) - remaining_budget
omitted_files += 1 + max(0, len(sections) - idx - 1)
for leftover in sections[idx + 1:]:
omitted_lines += len(_render_inline_unified_diff(leftover))
break
if omitted_files or omitted_lines:
summary = f"… omitted {omitted_lines} diff line(s)"
if omitted_files:
summary += f" across {omitted_files} additional file(s)/section(s)"
rendered.append(f"{_ANSI_HUNK}{summary}{_ANSI_RESET}")
return rendered
def render_edit_diff_with_delta(
tool_name: str,
result: str | None,
*,
function_args: dict | None = None,
snapshot: LocalEditSnapshot | None = None,
print_fn=None,
) -> bool:
"""Render an edit diff inline without taking over the terminal UI."""
diff = extract_edit_diff(
tool_name,
result,
function_args=function_args,
snapshot=snapshot,
)
if not diff:
return False
try:
rendered_lines = _summarize_rendered_diff_sections(diff)
except Exception as exc:
logger.debug("Could not render inline diff: %s", exc)
return False
return _emit_inline_diff("\n".join(rendered_lines), print_fn)
# =========================================================================
# KawaiiSpinner
# =========================================================================
+1 -8
View File
@@ -644,9 +644,6 @@ class InsightsEngine:
lines.append(f" Sessions: {o['total_sessions']:<12} Messages: {o['total_messages']:,}")
lines.append(f" Tool calls: {o['total_tool_calls']:<12,} User messages: {o['user_messages']:,}")
lines.append(f" Input tokens: {o['total_input_tokens']:<12,} Output tokens: {o['total_output_tokens']:,}")
cache_total = o.get("total_cache_read_tokens", 0) + o.get("total_cache_write_tokens", 0)
if cache_total > 0:
lines.append(f" Cache read: {o['total_cache_read_tokens']:<12,} Cache write: {o['total_cache_write_tokens']:,}")
cost_str = f"${o['estimated_cost']:.2f}"
if o.get("models_without_pricing"):
cost_str += " *"
@@ -749,11 +746,7 @@ class InsightsEngine:
# Overview
lines.append(f"**Sessions:** {o['total_sessions']} | **Messages:** {o['total_messages']:,} | **Tool calls:** {o['total_tool_calls']:,}")
cache_total = o.get("total_cache_read_tokens", 0) + o.get("total_cache_write_tokens", 0)
if cache_total > 0:
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,} / cache: {cache_total:,})")
else:
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
lines.append(f"**Tokens:** {o['total_tokens']:,} (in: {o['total_input_tokens']:,} / out: {o['total_output_tokens']:,})")
cost_note = ""
if o.get("models_without_pricing"):
cost_note = " _(excludes custom/self-hosted models)_"
-2
View File
@@ -127,7 +127,6 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
"api_mode": primary.get("api_mode"),
"command": primary.get("command"),
"args": list(primary.get("args") or []),
"credential_pool": primary.get("credential_pool"),
},
"label": None,
"signature": (
@@ -163,7 +162,6 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
"api_mode": primary.get("api_mode"),
"command": primary.get("command"),
"args": list(primary.get("args") or []),
"credential_pool": primary.get("credential_pool"),
},
"label": None,
"signature": (
+17 -51
View File
@@ -1077,16 +1077,12 @@ class HermesCLI:
# streaming: stream tokens to the terminal as they arrive (display.streaming in config.yaml)
self.streaming_enabled = CLI_CONFIG["display"].get("streaming", False)
# Inline diff previews for write actions (display.inline_diffs in config.yaml)
self._inline_diffs_enabled = CLI_CONFIG["display"].get("inline_diffs", True)
# Streaming display state
self._stream_buf = "" # Partial line buffer for line-buffered rendering
self._stream_started = False # True once first delta arrives
self._stream_box_opened = False # True once the response box header is printed
self._reasoning_stream_started = False # True once live reasoning starts streaming
self._reasoning_preview_buf = "" # Coalesce tiny reasoning chunks for [thinking] output
self._pending_edit_snapshots = {}
# Configuration - priority: CLI args > env vars > config file
# Model comes from: CLI arg or config.yaml (single source of truth).
@@ -2028,7 +2024,6 @@ class HermesCLI:
"api_mode": self.api_mode,
"command": self.acp_command,
"args": list(self.acp_args or []),
"credential_pool": getattr(self, "_credential_pool", None),
},
)
@@ -2136,8 +2131,6 @@ class HermesCLI:
checkpoint_max_snapshots=self.checkpoint_max_snapshots,
pass_session_id=self.pass_session_id,
tool_progress_callback=self._on_tool_progress,
tool_start_callback=self._on_tool_start if self._inline_diffs_enabled else None,
tool_complete_callback=self._on_tool_complete if self._inline_diffs_enabled else None,
stream_delta_callback=self._stream_delta if self.streaming_enabled else None,
tool_gen_callback=self._on_tool_gen_start if self.streaming_enabled else None,
)
@@ -2169,12 +2162,6 @@ class HermesCLI:
def show_banner(self):
"""Display the welcome banner in Claude Code style."""
self.console.clear()
# Get context length for display before branching so it remains
# available to the low-context warning logic in compact mode too.
ctx_len = None
if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'context_compressor'):
ctx_len = self.agent.context_compressor.context_length
# Auto-compact for narrow terminals — the full banner with caduceus
# + tool list needs ~80 columns minimum to render without wrapping.
@@ -2191,6 +2178,11 @@ class HermesCLI:
# Get terminal working directory (where commands will execute)
cwd = os.getenv("TERMINAL_CWD", os.getcwd())
# Get context length for display
ctx_len = None
if hasattr(self, 'agent') and self.agent and hasattr(self.agent, 'context_compressor'):
ctx_len = self.agent.context_compressor.context_length
# Build and display the banner
build_welcome_banner(
console=self.console,
@@ -5040,33 +5032,6 @@ class HermesCLI:
except Exception:
pass
def _on_tool_start(self, tool_call_id: str, function_name: str, function_args: dict):
"""Capture local before-state for write-capable tools."""
try:
from agent.display import capture_local_edit_snapshot
snapshot = capture_local_edit_snapshot(function_name, function_args)
if snapshot is not None:
self._pending_edit_snapshots[tool_call_id] = snapshot
except Exception:
logger.debug("Edit snapshot capture failed for %s", function_name, exc_info=True)
def _on_tool_complete(self, tool_call_id: str, function_name: str, function_args: dict, function_result: str):
"""Render file edits with inline diff after write-capable tools complete."""
snapshot = self._pending_edit_snapshots.pop(tool_call_id, None)
try:
from agent.display import render_edit_diff_with_delta
render_edit_diff_with_delta(
function_name,
function_result,
function_args=function_args,
snapshot=snapshot,
print_fn=_cprint,
)
except Exception:
logger.debug("Edit diff preview failed for %s", function_name, exc_info=True)
# ====================================================================
# Voice mode methods
# ====================================================================
@@ -6378,17 +6343,6 @@ class HermesCLI:
def run(self):
"""Run the interactive CLI loop with persistent input at bottom."""
# Push the entire TUI to the bottom of the terminal so the banner,
# responses, and prompt all appear pinned to the bottom — empty
# space stays above, not below. This prints enough blank lines to
# scroll the cursor to the last row before any content is rendered.
try:
_term_lines = shutil.get_terminal_size().lines
if _term_lines > 2:
print("\n" * (_term_lines - 1), end="", flush=True)
except Exception:
pass
self.show_banner()
# One-line Honcho session indicator (TTY-only, not captured by agent).
@@ -7615,6 +7569,18 @@ class HermesCLI:
self._agent_running = False
self._spinner_text = ""
# Push the input prompt toward the bottom of the
# terminal so it doesn't sit mid-screen after short
# responses. patch_stdout renders these newlines
# above the input area, creating visual separation
# and anchoring the prompt near the bottom.
try:
_pad = shutil.get_terminal_size().lines // 2
if _pad > 2:
_cprint("\n" * _pad)
except Exception:
pass
app.invalidate() # Refresh status line
# Continuous voice: auto-restart recording after agent responds.
@@ -1,324 +0,0 @@
"""
HermesAgent for tau2-bench evaluation.
Implements the tau2 HalfDuplexAgent interface using litellm with OpenRouter,
matching the inference path used across the rest of the Hermes Agent codebase.
Usage:
python environments/benchmarks/taubench/run_eval.py \\
--model anthropic/claude-sonnet-4-5 \\
--base-url openrouter \\
--env retail
"""
import json
import os
import sys
from pathlib import Path
from typing import Optional
import litellm
from pydantic import BaseModel
_repo_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from environments.tool_call_parsers import get_parser
from tau2.agent.base_agent import HalfDuplexAgent, ValidAgentInputMessage
from tau2.data_model.message import (
AssistantMessage,
Message,
MultiToolMessage,
SystemMessage,
ToolCall,
ToolMessage,
UserMessage,
)
from tau2.environment.tool import Tool
class HermesAgentState(BaseModel):
system_messages: list[SystemMessage]
messages: list
class HermesAgent(HalfDuplexAgent[HermesAgentState]):
"""
tau2 HalfDuplexAgent backed by litellm, using OpenRouter (or any
OpenAI-compatible endpoint).
Registered as "hermes_agent" in the tau2 registry by run_eval.py.
"""
SYSTEM_PROMPT = (
"You are a customer service agent that helps the user according to the "
"<policy> provided below.\n"
"In each turn you can either:\n"
"- Send a message to the user.\n"
"- Make a tool call.\n"
"You cannot do both at the same time.\n\n"
"Try to be helpful and always follow the policy. "
"Always make sure you generate valid JSON only.\n\n"
"<policy>\n{domain_policy}\n</policy>"
)
# System prompt variant for qwen3_coder tool format — tools are embedded
# directly in the system prompt as <tools> XML instead of passed via the
# OpenAI tools= parameter.
SYSTEM_PROMPT_QWEN3_CODER = (
"You are a customer service agent that helps the user according to the "
"<policy> provided below.\n"
"In each turn you can either:\n"
"- Send a message to the user.\n"
"- Make a tool call.\n"
"You cannot do both at the same time.\n\n"
"Try to be helpful and always follow the policy. "
"Always make sure you generate valid JSON only.\n\n"
"You may call one or more functions to assist with the user query.\n\n"
"You are provided with function signatures within <tools></tools> XML tags:\n"
"<tools>\n{tools_json}\n</tools>\n\n"
"<policy>\n{domain_policy}\n</policy>"
)
def __init__(
self,
tools: list[Tool],
domain_policy: str,
model: str,
base_url: Optional[str] = None,
api_key: Optional[str] = None,
temperature: float = 0.0,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
thinking: bool = False,
tool_parser: Optional[str] = None,
):
super().__init__(tools=tools, domain_policy=domain_policy)
self.model = model
self.base_url = base_url
self.api_key = api_key
self.temperature = temperature
self.max_tokens = max_tokens
self.top_p = top_p
self.thinking = thinking
self.tool_parser = tool_parser
self._parser = get_parser(tool_parser) if tool_parser else None
# OpenRouter requires specific headers; pass them via litellm extra_headers
self._extra_headers: dict = {}
if base_url and "openrouter" in base_url.lower():
self._extra_headers = {
"HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-Title": "Hermes Agent",
}
@property
def system_prompt(self) -> str:
if self.tool_parser == "qwen3_coder" and self.tools:
tools_json = json.dumps(
[t.openai_schema for t in self.tools], indent=2, ensure_ascii=False
)
return self.SYSTEM_PROMPT_QWEN3_CODER.format(
tools_json=tools_json,
domain_policy=self.domain_policy,
)
return self.SYSTEM_PROMPT.format(domain_policy=self.domain_policy)
def get_init_state(
self, message_history: Optional[list[Message]] = None
) -> HermesAgentState:
return HermesAgentState(
system_messages=[SystemMessage(role="system", content=self.system_prompt)],
messages=list(message_history or []),
)
def generate_next_message(
self, message: ValidAgentInputMessage, state: HermesAgentState
) -> tuple[AssistantMessage, HermesAgentState]:
# Append incoming message(s) to history
if isinstance(message, MultiToolMessage):
state.messages.extend(message.tool_messages)
else:
state.messages.append(message)
# Build litellm-compatible message list
all_messages = state.system_messages + state.messages
lm_messages = [_to_litellm_message(m) for m in all_messages]
kwargs = dict(
model=self.model,
messages=lm_messages,
temperature=self.temperature,
)
if self.tools:
kwargs["tools"] = [t.openai_schema for t in self.tools]
if self.max_tokens is not None:
kwargs["max_tokens"] = self.max_tokens
if self.top_p is not None:
kwargs["top_p"] = self.top_p
# Enable thinking/reasoning mode. OpenRouter exposes this as
# `include_reasoning` for nemotron (per supported_parameters in the
# model metadata). Pass via extra_body to bypass litellm filtering.
if self.thinking:
kwargs["extra_body"] = {"include_reasoning": True}
# Only pass base_url when model doesn't already have a provider prefix
# (litellm uses either the prefix OR base_url, not both)
if self.base_url and not self.model.startswith("openrouter/"):
kwargs["base_url"] = self.base_url
if self.api_key:
kwargs["api_key"] = self.api_key
if self._extra_headers:
kwargs["extra_headers"] = self._extra_headers
response = litellm.completion(**kwargs)
assistant_msg = _litellm_response_to_assistant_message(response, parser=self._parser)
state.messages.append(assistant_msg)
return assistant_msg, state
# ---------------------------------------------------------------------------
# Conversion helpers
# ---------------------------------------------------------------------------
def _to_litellm_message(msg) -> dict:
"""Convert a tau2 message object to a litellm-compatible dict."""
if isinstance(msg, SystemMessage):
return {"role": "system", "content": msg.content or ""}
if isinstance(msg, UserMessage):
if msg.tool_calls:
# User tool calls (tau2 v2 feature — user has tools too)
return {
"role": "user",
"content": msg.content or "",
"tool_calls": [_tool_call_to_dict(tc) for tc in msg.tool_calls],
}
return {"role": "user", "content": msg.content or ""}
if isinstance(msg, AssistantMessage):
d: dict = {"role": "assistant", "content": msg.content or ""}
if msg.tool_calls:
d["tool_calls"] = [_tool_call_to_dict(tc) for tc in msg.tool_calls]
return d
if isinstance(msg, ToolMessage):
return {
"role": "tool",
"tool_call_id": msg.id,
"content": msg.content or "",
}
# Fallback
return {"role": getattr(msg, "role", "user"), "content": str(getattr(msg, "content", ""))}
def _tool_call_to_dict(tc: ToolCall) -> dict:
import json
return {
"id": tc.id or "call_0",
"type": "function",
"function": {
"name": tc.name,
"arguments": json.dumps(tc.arguments),
},
}
def _litellm_response_to_assistant_message(response, parser=None) -> AssistantMessage:
"""Convert a litellm ModelResponse to a tau2 AssistantMessage."""
import json
choice = response.choices[0]
msg = choice.message
content = msg.content or ""
tool_calls_raw = getattr(msg, "tool_calls", None)
tau2_tool_calls: Optional[list[ToolCall]] = None
if parser and content:
# Use the custom tool parser (e.g. qwen3_coder) to extract tool calls
# from the raw text response.
parsed_content, parsed_tool_calls = parser.parse(content)
if parsed_tool_calls:
content = parsed_content or ""
tau2_tool_calls = []
for tc in parsed_tool_calls:
try:
arguments = json.loads(tc.function.arguments or "{}")
except json.JSONDecodeError:
arguments = {}
tau2_tool_calls.append(
ToolCall(
id=tc.id or "call_0",
name=tc.function.name,
arguments=arguments,
requestor="assistant",
)
)
elif tool_calls_raw:
tau2_tool_calls = []
for tc in tool_calls_raw:
if hasattr(tc, "function"):
name = tc.function.name
try:
arguments = json.loads(tc.function.arguments or "{}")
except json.JSONDecodeError:
arguments = {}
tau2_tool_calls.append(
ToolCall(
id=tc.id or "call_0",
name=name,
arguments=arguments,
requestor="assistant",
)
)
cost = None
try:
cost = litellm.completion_cost(response)
except Exception:
pass
usage = None
if hasattr(response, "usage") and response.usage:
usage = dict(response.usage)
return AssistantMessage(
role="assistant",
content=content if not tau2_tool_calls else None,
tool_calls=tau2_tool_calls,
cost=cost,
usage=usage,
)
def create_hermes_agent(tools: list[Tool], domain_policy: str, **kwargs) -> HermesAgent:
"""
Factory function registered with the tau2 registry.
Expected kwargs:
model (str): litellm model string
base_url (str): API base URL (optional)
api_key (str): API key (optional)
temperature (float): sampling temperature (default 0.0)
top_p (float): nucleus sampling (optional)
max_tokens (int): max tokens (optional)
thinking (bool): enable reasoning/thinking mode (default False)
"""
return HermesAgent(
tools=tools,
domain_policy=domain_policy,
model=kwargs["model"],
base_url=kwargs.get("base_url"),
api_key=kwargs.get("api_key"),
temperature=kwargs.get("temperature", 0.0),
top_p=kwargs.get("top_p"),
max_tokens=kwargs.get("max_tokens"),
thinking=kwargs.get("thinking", False),
tool_parser=kwargs.get("tool_parser"),
)
@@ -1,288 +0,0 @@
"""
tau2-bench evaluation runner for Hermes Agent.
Runs the tau2-bench retail, airline, telecom, or banking_knowledge evaluation
using HermesAgent backed by litellm — the same inference path used across the
rest of the Hermes Agent codebase.
Usage:
# Against OpenRouter (auto-detects OPENROUTER_API_KEY)
python environments/benchmarks/taubench/run_eval.py \\
--model openrouter/anthropic/claude-sonnet-4-5 \\
--base-url openrouter \\
--env retail
# Against OpenAI directly
python environments/benchmarks/taubench/run_eval.py \\
--model gpt-4o \\
--env retail
# Local vLLM
python environments/benchmarks/taubench/run_eval.py \\
--model openai/NousResearch/Hermes-3-Llama-3.1-70B \\
--base-url http://localhost:8000/v1 \\
--env retail \\
--num-trials 3
# Specific tasks only
python environments/benchmarks/taubench/run_eval.py \\
--model openrouter/anthropic/claude-sonnet-4-5 \\
--base-url openrouter \\
--env retail \\
--task-ids task_1 task_2 task_5
Results are saved to results/tau2bench/ as JSON.
Dependencies (requires Python 3.12+):
pip install "tau2 @ git+https://github.com/sierra-research/tau2-bench.git"
# or: pip install -e ".[tau2bench]"
"""
import argparse
import logging
import os
import sys
from pathlib import Path
from typing import Optional
_repo_root = Path(__file__).resolve().parent.parent.parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
from tau2.data_model.simulation import Results, TextRunConfig
from tau2.evaluator.evaluator import EvaluationType
from tau2.registry import registry
from tau2.runner.batch import run_tasks
from tau2.runner.helpers import get_tasks
from environments.benchmarks.taubench.hermes_agent import create_hermes_agent
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
AGENT_NAME = "hermes_agent"
def _register_agent(
model: str,
base_url: Optional[str],
api_key: Optional[str],
temperature: float,
top_p: Optional[float],
max_tokens: Optional[int],
thinking: bool,
tool_parser: Optional[str],
) -> None:
"""Register the HermesAgent factory with the tau2 registry (idempotent)."""
if registry.get_agent_factory(AGENT_NAME) is not None:
return
def factory(tools, domain_policy, **kwargs):
return create_hermes_agent(
tools=tools,
domain_policy=domain_policy,
model=model,
base_url=base_url,
api_key=api_key,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
thinking=thinking,
tool_parser=tool_parser,
)
registry.register_agent_factory(factory=factory, name=AGENT_NAME)
logger.info("Registered agent factory: %s (model=%s, thinking=%s, tool_parser=%s)", AGENT_NAME, model, thinking, tool_parser)
def run_eval(
model: str,
base_url: Optional[str],
api_key: Optional[str],
user_model: str,
env_name: str,
task_split: Optional[str],
num_trials: int,
max_concurrency: int,
max_steps: int,
temperature: float,
top_p: Optional[float],
max_tokens: Optional[int],
thinking: bool,
tool_parser: Optional[str],
task_ids: Optional[list],
start_index: int,
end_index: int,
log_dir: str,
seed: int,
) -> Results:
# Resolve OpenRouter shorthand
if base_url and base_url.strip().lower() == "openrouter":
base_url = OPENROUTER_BASE_URL
is_openrouter = base_url and "openrouter" in base_url.lower()
# litellm requires the "openrouter/" prefix to route correctly
if is_openrouter and not model.startswith("openrouter/"):
model = f"openrouter/{model}"
if is_openrouter and not user_model.startswith("openrouter/"):
user_model = f"openrouter/{user_model}"
# Resolve API key
if is_openrouter:
api_key = api_key or os.environ.get("OPENROUTER_API_KEY") or os.environ.get("OPENAI_API_KEY")
# litellm reads OPENAI_API_KEY for base_url overrides; set it so the
# user simulator's generate() call also authenticates correctly.
if api_key and not os.environ.get("OPENAI_API_KEY"):
os.environ["OPENAI_API_KEY"] = api_key
else:
api_key = api_key or os.environ.get("OPENAI_API_KEY")
_register_agent(
model=model,
base_url=base_url,
api_key=api_key,
temperature=temperature,
top_p=top_p,
max_tokens=max_tokens,
thinking=thinking,
tool_parser=tool_parser,
)
# Load tasks — task_ids in tau2 are strings like "task_1"
tasks = get_tasks(
task_set_name=env_name,
task_split_name=task_split,
task_ids=[str(i) for i in task_ids] if task_ids else None,
)
if not task_ids and (end_index != -1 or start_index != 0):
end = end_index if end_index != -1 else len(tasks)
tasks = tasks[start_index:end]
logger.info(
"Running tau2-%s eval: %d tasks, %d trial(s), concurrency=%d",
env_name, len(tasks), num_trials, max_concurrency,
)
save_path = Path(log_dir) / f"tau2-{env_name}-{model.split('/')[-1]}.json"
save_path.parent.mkdir(parents=True, exist_ok=True)
# Pass api_key/base_url to user sim via llm_args so tau2's generate() authenticates.
# When using OpenRouter for the user sim, mirror the agent's key + endpoint.
user_llm_args: dict = {}
if is_openrouter and api_key:
user_llm_args["api_key"] = api_key
user_llm_args["base_url"] = base_url
config = TextRunConfig(
domain=env_name,
agent=AGENT_NAME,
user="user_simulator",
llm_agent=model,
llm_args_agent={},
llm_user=user_model,
llm_args_user=user_llm_args,
num_trials=num_trials,
max_steps=max_steps,
max_concurrency=max_concurrency,
seed=seed,
)
results = run_tasks(
config,
tasks,
save_path=save_path,
console_display=True,
# ALL: respects each task's reward_basis. NL assertions are skipped
# gracefully (scored as pass) rather than raising an error, so tasks
# are evaluated only on their actual basis components (DB, ACTION, etc.)
evaluation_type=EvaluationType.ALL,
)
logger.info("Results saved to %s", save_path)
return results
def main():
parser = argparse.ArgumentParser(
description="Run tau2-bench evaluation with Hermes Agent (requires Python 3.12+)",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--model", required=True,
help="litellm model string, e.g. 'openrouter/anthropic/claude-sonnet-4-5' or 'gpt-4o'",
)
parser.add_argument(
"--base-url", default=None,
help="API base URL. Use 'openrouter' as shorthand for https://openrouter.ai/api/v1.",
)
parser.add_argument("--api-key", default=None, help="API key (falls back to OPENROUTER_API_KEY / OPENAI_API_KEY)")
parser.add_argument("--temperature", type=float, default=1.0,
help="Sampling temperature. NVIDIA used 1.0 for nemotron-super.")
parser.add_argument("--top-p", type=float, default=0.95,
help="Nucleus sampling. NVIDIA used 0.95 for nemotron-super.")
parser.add_argument("--max-tokens", type=int, default=None)
parser.add_argument("--thinking", action="store_true", default=False,
help="Enable reasoning/thinking mode (use_reasoning=true). "
"Required to match NVIDIA's reported nemotron-super scores.")
parser.add_argument("--tool-parser", default=None,
help="Tool call parser to use (e.g. 'qwen3_coder'). When set, tools are "
"embedded in the system prompt as <tools> XML and responses are parsed "
"from raw text instead of using OpenAI function calling format.")
parser.add_argument(
"--user-model", default="qwen/qwen3-235b-a22b-2507:nitro",
help="litellm model string for the tau2 user simulator. "
"Defaults to qwen/qwen3-235b-a22b-2507:nitro (instruct, non-thinking) to match NVIDIA's eval setup. "
"When using --base-url openrouter the openrouter/ prefix is added automatically.",
)
parser.add_argument(
"--env", default="retail",
choices=["retail", "airline", "telecom", "banking_knowledge", "mock"],
)
parser.add_argument(
"--task-split", default=None,
help="Task split name (e.g. 'base'). Defaults to the domain default.",
)
parser.add_argument("--num-trials", type=int, default=1)
parser.add_argument("--max-concurrency", type=int, default=8)
parser.add_argument("--max-steps", type=int, default=50)
parser.add_argument(
"--task-ids", nargs="*", default=None,
help="Specific task IDs to run (tau2 task IDs are strings like 'task_1')",
)
parser.add_argument("--start-index", type=int, default=0)
parser.add_argument("--end-index", type=int, default=-1)
parser.add_argument("--seed", type=int, default=10)
parser.add_argument("--log-dir", default="results/tau2bench")
args = parser.parse_args()
run_eval(
model=args.model,
base_url=args.base_url,
api_key=args.api_key,
user_model=args.user_model,
env_name=args.env,
task_split=args.task_split,
num_trials=args.num_trials,
max_concurrency=args.max_concurrency,
max_steps=args.max_steps,
temperature=args.temperature,
top_p=args.top_p,
max_tokens=args.max_tokens,
thinking=args.thinking,
tool_parser=args.tool_parser,
task_ids=args.task_ids,
start_index=args.start_index,
end_index=args.end_index,
log_dir=args.log_dir,
seed=args.seed,
)
if __name__ == "__main__":
main()
-4
View File
@@ -742,10 +742,6 @@ class TelegramAdapter(BasePlatformAdapter):
if not self._bot:
return SendResult(success=False, error="Not connected")
# Skip whitespace-only text to prevent Telegram 400 empty-text errors.
if not content or not content.strip():
return SendResult(success=True, message_id=None)
try:
# Format and split message if needed
formatted = self.format_message(content)
+3 -58
View File
@@ -24,7 +24,6 @@ import signal
import tempfile
import threading
import time
import uuid
from logging.handlers import RotatingFileHandler
from pathlib import Path
from datetime import datetime
@@ -789,7 +788,6 @@ class GatewayRunner:
"api_mode": runtime_kwargs.get("api_mode"),
"command": runtime_kwargs.get("command"),
"args": list(runtime_kwargs.get("args") or []),
"credential_pool": runtime_kwargs.get("credential_pool"),
}
return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
@@ -4721,13 +4719,9 @@ class GatewayRunner:
_APPROVAL_TIMEOUT_SECONDS = 300 # 5 minutes
async def _handle_approve_command(self, event: MessageEvent) -> Optional[str]:
async def _handle_approve_command(self, event: MessageEvent) -> str:
"""Handle /approve command — execute a pending dangerous command.
After execution, re-invokes the agent with the command result so it
can continue its multi-step task (fixes the "dead agent" bug where
the agent loop exited on approval_required and never resumed).
Usage:
/approve approve and execute the pending command
/approve session approve and remember for this session
@@ -4776,57 +4770,8 @@ class GatewayRunner:
logger.info("User approved dangerous command via /approve: %s...%s", cmd[:60], scope_msg)
from tools.terminal_tool import terminal_tool
result = await asyncio.to_thread(terminal_tool, command=cmd, force=True)
# Send immediate feedback so the user sees the command output right away
immediate_msg = f"✅ Command approved and executed{scope_msg}.\n\n```\n{result[:3500]}\n```"
adapter = self.adapters.get(source.platform)
if adapter:
try:
await adapter.send(source.chat_id, immediate_msg)
except Exception as e:
logger.warning("Failed to send approval feedback: %s", e)
# Re-invoke the agent with the command result so it can continue its task.
# The agent's conversation history (persisted in SQLite) already contains
# the tool call that returned approval_required — the continuation message
# provides the actual execution output so the agent can pick up where it
# left off.
continuation_text = (
f"[System: The user approved the previously blocked command and it has been executed.\n"
f"Command: {cmd}\n"
f"<command_output>\n{result[:3500]}\n</command_output>\n\n"
f"Continue with the task you were working on.]"
)
synthetic_event = MessageEvent(
text=continuation_text,
source=source,
message_id=f"approve-continuation-{uuid.uuid4().hex}",
)
async def _continue_agent():
try:
response = await self._handle_message(synthetic_event)
if response and adapter:
await adapter.send(source.chat_id, response)
except Exception as e:
logger.error("Failed to continue agent after /approve: %s", e)
if adapter:
try:
await adapter.send(
source.chat_id,
f"⚠️ Failed to resume agent after approval: {e}"
)
except Exception:
pass
_task = asyncio.create_task(_continue_agent())
self._background_tasks.add(_task)
_task.add_done_callback(self._background_tasks.discard)
# Return None — we already sent the immediate feedback and the agent
# continuation is running in the background.
return None
result = terminal_tool(command=cmd, force=True)
return f"✅ Command approved and executed{scope_msg}.\n\n```\n{result[:3500]}\n```"
async def _handle_deny_command(self, event: MessageEvent) -> str:
"""Handle /deny command — reject a pending dangerous command."""
-8
View File
@@ -247,13 +247,6 @@ DEFAULT_CONFIG = {
"command_timeout": 30, # Timeout for browser commands in seconds (screenshot, navigate, etc.)
"record_sessions": False, # Auto-record browser sessions as WebM videos
"allow_private_urls": False, # Allow navigating to private/internal IPs (localhost, 192.168.x.x, etc.)
"camofox": {
# When true, Hermes sends a stable profile-scoped userId to Camofox
# so the server can map it to a persistent browser profile directory.
# Requires Camofox server to be configured with CAMOFOX_PROFILE_DIR.
# When false (default), each session gets a random userId (ephemeral).
"managed_persistence": False,
},
},
# Filesystem checkpoints — automatic snapshots before destructive file ops.
@@ -359,7 +352,6 @@ DEFAULT_CONFIG = {
"bell_on_complete": False,
"show_reasoning": False,
"streaming": False,
"inline_diffs": True, # Show inline diff previews for write actions (write_file, patch, skill_manage)
"show_cost": False, # Show $ cost in the status bar (off by default)
"skin": "default",
"tool_progress_command": False, # Enable /verbose command in messaging gateway
+2 -28
View File
@@ -463,32 +463,6 @@ def _build_user_local_paths(home: Path, path_entries: list[str]) -> list[str]:
return [p for p in candidates if p not in path_entries and Path(p).exists()]
def _hermes_home_for_target_user(target_home_dir: str) -> str:
"""Remap the current HERMES_HOME to the equivalent under a target user's home.
When installing a system service via sudo, get_hermes_home() resolves to
root's home. This translates it to the target user's equivalent path:
/root/.hermes /home/alice/.hermes
/root/.hermes/profiles/coder /home/alice/.hermes/profiles/coder
/opt/custom-hermes /opt/custom-hermes (kept as-is)
"""
current_hermes = get_hermes_home().resolve()
current_default = (Path.home() / ".hermes").resolve()
target_default = Path(target_home_dir) / ".hermes"
# Default ~/.hermes → remap to target user's default
if current_hermes == current_default:
return str(target_default)
# Profile or subdir of ~/.hermes → preserve the relative structure
try:
relative = current_hermes.relative_to(current_default)
return str(target_default / relative)
except ValueError:
# Completely custom path (not under ~/.hermes) — keep as-is
return str(current_hermes)
def generate_systemd_unit(system: bool = False, run_as_user: str | None = None) -> str:
python_path = get_python_path()
working_dir = str(PROJECT_ROOT)
@@ -504,11 +478,12 @@ def generate_systemd_unit(system: bool = False, run_as_user: str | None = None)
if resolved_node_dir not in path_entries:
path_entries.append(resolved_node_dir)
hermes_home = str(get_hermes_home().resolve())
common_bin_paths = ["/usr/local/sbin", "/usr/local/bin", "/usr/sbin", "/usr/bin", "/sbin", "/bin"]
if system:
username, group_name, home_dir = _system_service_identity(run_as_user)
hermes_home = _hermes_home_for_target_user(home_dir)
path_entries.extend(_build_user_local_paths(Path(home_dir), path_entries))
path_entries.extend(common_bin_paths)
sane_path = ":".join(path_entries)
@@ -543,7 +518,6 @@ StandardError=journal
WantedBy=multi-user.target
"""
hermes_home = str(get_hermes_home().resolve())
path_entries.extend(_build_user_local_paths(Path.home(), path_entries))
path_entries.extend(common_bin_paths)
sane_path = ":".join(path_entries)
-76
View File
@@ -58,32 +58,6 @@ _CLONE_ALL_STRIP = [
"processes.json",
]
# Directories/files to exclude when exporting the default (~/.hermes) profile.
# The default profile contains infrastructure (repo checkout, worktrees, DBs,
# caches, binaries) that named profiles don't have. We exclude those so the
# export is a portable, reasonable-size archive of actual profile data.
_DEFAULT_EXPORT_EXCLUDE_ROOT = frozenset({
# Infrastructure
"hermes-agent", # repo checkout (multi-GB)
".worktrees", # git worktrees
"profiles", # other profiles — never recursive-export
"bin", # installed binaries (tirith, etc.)
"node_modules", # npm packages
# Databases & runtime state
"state.db", "state.db-shm", "state.db-wal",
"hermes_state.db",
"response_store.db", "response_store.db-shm", "response_store.db-wal",
"gateway.pid", "gateway_state.json", "processes.json",
"auth.lock", "active_profile", ".update_check",
"errors.log",
".hermes_history",
# Caches (regenerated on use)
"image_cache", "audio_cache", "document_cache",
"browser_screenshots", "checkpoints",
"sandboxes",
"logs", # gateway logs
})
# Names that cannot be used as profile aliases
_RESERVED_NAMES = frozenset({
"hermes", "default", "test", "tmp", "root", "sudo",
@@ -711,37 +685,11 @@ def get_active_profile_name() -> str:
# Export / Import
# ---------------------------------------------------------------------------
def _default_export_ignore(root_dir: Path):
"""Return an *ignore* callable for :func:`shutil.copytree`.
At the root level it excludes everything in ``_DEFAULT_EXPORT_EXCLUDE_ROOT``.
At all levels it excludes ``__pycache__``, sockets, and temp files.
"""
def _ignore(directory: str, contents: list) -> set:
ignored: set = set()
for entry in contents:
# Universal exclusions (any depth)
if entry == "__pycache__" or entry.endswith((".sock", ".tmp")):
ignored.add(entry)
# npm lockfiles can appear at root
elif entry in ("package.json", "package-lock.json"):
ignored.add(entry)
# Root-level exclusions
if Path(directory) == root_dir:
ignored.update(c for c in contents if c in _DEFAULT_EXPORT_EXCLUDE_ROOT)
return ignored
return _ignore
def export_profile(name: str, output_path: str) -> Path:
"""Export a profile to a tar.gz archive.
Returns the output file path.
"""
import tempfile
validate_profile_name(name)
profile_dir = get_profile_dir(name)
if not profile_dir.is_dir():
@@ -750,21 +698,6 @@ def export_profile(name: str, output_path: str) -> Path:
output = Path(output_path)
# shutil.make_archive wants the base name without extension
base = str(output).removesuffix(".tar.gz").removesuffix(".tgz")
if name == "default":
# The default profile IS ~/.hermes itself — its parent is ~/ and its
# directory name is ".hermes", not "default". We stage a clean copy
# under a temp dir so the archive contains ``default/...``.
with tempfile.TemporaryDirectory() as tmpdir:
staged = Path(tmpdir) / "default"
shutil.copytree(
profile_dir,
staged,
ignore=_default_export_ignore(profile_dir),
)
result = shutil.make_archive(base, "gztar", tmpdir, "default")
return Path(result)
result = shutil.make_archive(base, "gztar", str(profile_dir.parent), name)
return Path(result)
@@ -855,15 +788,6 @@ def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
"Specify it explicitly: hermes profile import <archive> --name <name>"
)
# Archives exported from the default profile have "default/" as top-level
# dir. Importing as "default" would target ~/.hermes itself — disallow
# that and guide the user toward a named profile.
if inferred_name == "default":
raise ValueError(
"Cannot import as 'default' — that is the built-in root profile (~/.hermes). "
"Specify a different name: hermes profile import <archive> --name <name>"
)
validate_profile_name(inferred_name)
profile_dir = get_profile_dir(inferred_name)
if profile_dir.exists():
-2
View File
@@ -72,8 +72,6 @@ rl = [
"wandb>=0.15.0,<1",
]
yc-bench = ["yc-bench @ git+https://github.com/collinear-ai/yc-bench.git ; python_version >= '3.12'"]
taubench = ["tau-bench @ git+https://github.com/sierra-research/tau-bench.git"]
tau2bench = ["tau2 @ git+https://github.com/sierra-research/tau2-bench.git"]
all = [
"hermes-agent[modal]",
"hermes-agent[daytona]",
+9 -68
View File
@@ -320,12 +320,8 @@ def _extract_parallel_scope_path(tool_name: str, function_args: dict) -> Path |
if not isinstance(raw_path, str) or not raw_path.strip():
return None
expanded = Path(raw_path).expanduser()
if expanded.is_absolute():
return Path(os.path.abspath(str(expanded)))
# Avoid resolve(); the file may not exist yet.
return Path(os.path.abspath(str(Path.cwd() / expanded)))
return Path(raw_path).expanduser()
def _paths_overlap(left: Path, right: Path) -> bool:
@@ -490,8 +486,6 @@ class AIAgent:
provider_data_collection: str = None,
session_id: str = None,
tool_progress_callback: callable = None,
tool_start_callback: callable = None,
tool_complete_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
@@ -626,8 +620,6 @@ class AIAgent:
).start()
self.tool_progress_callback = tool_progress_callback
self.tool_start_callback = tool_start_callback
self.tool_complete_callback = tool_complete_callback
self.thinking_callback = thinking_callback
self.reasoning_callback = reasoning_callback
self._reasoning_deltas_fired = False # Set by _fire_reasoning_delta, reset per API call
@@ -3494,33 +3486,14 @@ class AIAgent:
@staticmethod
def _is_openai_client_closed(client: Any) -> bool:
"""Check if an OpenAI client is closed.
Handles both property and method forms of is_closed:
- httpx.Client.is_closed is a bool property
- openai.OpenAI.is_closed is a method returning bool
Prior bug: getattr(client, "is_closed", False) returned the bound method,
which is always truthy, causing unnecessary client recreation on every call.
"""
from unittest.mock import Mock
if isinstance(client, Mock):
return False
is_closed_attr = getattr(client, "is_closed", None)
if is_closed_attr is not None:
# Handle method (openai SDK) vs property (httpx)
if callable(is_closed_attr):
if is_closed_attr():
return True
elif bool(is_closed_attr):
return True
if bool(getattr(client, "is_closed", False)):
return True
http_client = getattr(client, "_client", None)
if http_client is not None:
return bool(getattr(http_client, "is_closed", False))
return False
return bool(getattr(http_client, "is_closed", False))
def _create_openai_client(self, client_kwargs: dict, *, reason: str, shared: bool) -> Any:
if self.provider == "copilot-acp" or str(client_kwargs.get("base_url", "")).startswith("acp://copilot"):
@@ -5561,7 +5534,7 @@ class AIAgent:
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {name}({list(args.keys())}) - {args_preview}")
for tc, name, args in parsed_calls:
for _, name, args in parsed_calls:
if self.tool_progress_callback:
try:
preview = _build_tool_preview(name, args)
@@ -5569,13 +5542,6 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
for tc, name, args in parsed_calls:
if self.tool_start_callback:
try:
self.tool_start_callback(tc.id, name, args)
except Exception as cb_err:
logging.debug(f"Tool start callback error: {cb_err}")
# ── Concurrent execution ─────────────────────────────────────────
# Each slot holds (function_name, function_args, function_result, duration, error_flag)
results = [None] * num_tools
@@ -5646,12 +5612,6 @@ class AIAgent:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s - {response_preview}")
if self.tool_complete_callback:
try:
self.tool_complete_callback(tc.id, name, args, function_result)
except Exception as cb_err:
logging.debug(f"Tool complete callback error: {cb_err}")
# Truncate oversized results
MAX_TOOL_RESULT_CHARS = 100_000
if len(function_result) > MAX_TOOL_RESULT_CHARS:
@@ -5740,12 +5700,6 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
if self.tool_start_callback:
try:
self.tool_start_callback(tool_call.id, function_name, function_args)
except Exception as cb_err:
logging.debug(f"Tool start callback error: {cb_err}")
# Checkpoint: snapshot working dir before file-mutating tools
if function_name in ("write_file", "patch") and self._checkpoint_mgr.enabled:
try:
@@ -5910,12 +5864,6 @@ class AIAgent:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result ({len(function_result)} chars): {function_result}")
if self.tool_complete_callback:
try:
self.tool_complete_callback(tool_call.id, function_name, function_args, function_result)
except Exception as cb_err:
logging.debug(f"Tool complete callback error: {cb_err}")
# Guard against tools returning absurdly large content that would
# blow up the context window. 100K chars ≈ 25K tokens — generous
# enough for any reasonable tool output but prevents catastrophic
@@ -7230,17 +7178,10 @@ class AIAgent:
or "quota" in error_msg
)
if is_rate_limited and self._fallback_index < len(self._fallback_chain):
# Don't eagerly fallback if credential pool rotation may
# still recover. The pool's retry-then-rotate cycle needs
# at least one more attempt to fire — jumping to a fallback
# provider here short-circuits it.
pool = self._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
self._emit_status("⚠️ Rate limited — switching to fallback provider...")
if self._try_activate_fallback():
retry_count = 0
continue
self._emit_status("⚠️ Rate limited — switching to fallback provider...")
if self._try_activate_fallback():
retry_count = 0
continue
is_payload_too_large = (
status_code == 413
+5 -71
View File
@@ -4,7 +4,6 @@ Verifies that dangerous command approvals require explicit /approve or /deny
slash commands, not bare "yes"/"no" text matching.
"""
import asyncio
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
@@ -50,7 +49,6 @@ def _make_runner():
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._background_tasks = set()
runner._session_db = None
runner._reasoning_config = None
runner._provider_routing = {}
@@ -80,32 +78,20 @@ class TestApproveCommand:
@pytest.mark.asyncio
async def test_approve_executes_pending_command(self):
"""Basic /approve executes the pending command and sends feedback."""
"""Basic /approve executes the pending command."""
runner = _make_runner()
source = _make_source()
session_key = runner._session_key_for_source(source)
runner._pending_approvals[session_key] = _make_pending_approval()
event = _make_event("/approve")
with (
patch("tools.terminal_tool.terminal_tool", return_value="done") as mock_term,
patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value="agent continued"),
):
with patch("tools.terminal_tool.terminal_tool", return_value="done") as mock_term:
result = await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
# Returns None because feedback is sent directly via adapter
assert result is None
assert "✅ Command approved and executed" in result
mock_term.assert_called_once_with(command="sudo rm -rf /tmp/test", force=True)
assert session_key not in runner._pending_approvals
# Immediate feedback sent via adapter
adapter = runner.adapters[Platform.TELEGRAM]
sent_text = adapter.send.call_args_list[0][0][1]
assert "Command approved and executed" in sent_text
@pytest.mark.asyncio
async def test_approve_session_remembers_pattern(self):
"""/approve session approves the pattern for the session."""
@@ -118,21 +104,12 @@ class TestApproveCommand:
with (
patch("tools.terminal_tool.terminal_tool", return_value="done"),
patch("tools.approval.approve_session") as mock_session,
patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value=None),
):
result = await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
assert result is None
assert "pattern approved for this session" in result
mock_session.assert_called_once_with(session_key, "sudo")
# Verify scope message in adapter feedback
adapter = runner.adapters[Platform.TELEGRAM]
sent_text = adapter.send.call_args_list[0][0][1]
assert "pattern approved for this session" in sent_text
@pytest.mark.asyncio
async def test_approve_always_approves_permanently(self):
"""/approve always approves the pattern permanently."""
@@ -145,21 +122,12 @@ class TestApproveCommand:
with (
patch("tools.terminal_tool.terminal_tool", return_value="done"),
patch("tools.approval.approve_permanent") as mock_perm,
patch.object(runner, "_handle_message", new_callable=AsyncMock, return_value=None),
):
result = await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
assert result is None
assert "pattern approved permanently" in result
mock_perm.assert_called_once_with("sudo")
# Verify scope message in adapter feedback
adapter = runner.adapters[Platform.TELEGRAM]
sent_text = adapter.send.call_args_list[0][0][1]
assert "pattern approved permanently" in sent_text
@pytest.mark.asyncio
async def test_approve_no_pending(self):
"""/approve with no pending approval returns helpful message."""
@@ -184,40 +152,6 @@ class TestApproveCommand:
assert "expired" in result
assert session_key not in runner._pending_approvals
@pytest.mark.asyncio
async def test_approve_reinvokes_agent_with_result(self):
"""After executing, /approve re-invokes the agent with command output."""
runner = _make_runner()
source = _make_source()
session_key = runner._session_key_for_source(source)
runner._pending_approvals[session_key] = _make_pending_approval()
event = _make_event("/approve")
mock_handle = AsyncMock(return_value="I continued the task.")
with (
patch("tools.terminal_tool.terminal_tool", return_value="file deleted"),
patch.object(runner, "_handle_message", mock_handle),
):
await runner._handle_approve_command(event)
# Yield to let the background continuation task run.
# This works because mocks return immediately (no real await points).
await asyncio.sleep(0)
# Agent was re-invoked via _handle_message with a synthetic event
mock_handle.assert_called_once()
synthetic_event = mock_handle.call_args[0][0]
assert "approved" in synthetic_event.text.lower()
assert "file deleted" in synthetic_event.text
assert "sudo rm -rf /tmp/test" in synthetic_event.text
# The continuation response was sent to the user
adapter = runner.adapters[Platform.TELEGRAM]
# First call: immediate feedback, second call: agent continuation
assert adapter.send.call_count == 2
continuation_response = adapter.send.call_args_list[1][0][1]
assert continuation_response == "I continued the task."
# ------------------------------------------------------------------
# /deny command
-96
View File
@@ -339,102 +339,6 @@ class TestDetectVenvDir:
assert result is None
class TestSystemUnitHermesHome:
"""HERMES_HOME in system units must reference the target user, not root."""
def test_system_unit_uses_target_user_home_not_calling_user(self, monkeypatch):
# Simulate sudo: Path.home() returns /root, target user is alice
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.delenv("HERMES_HOME", raising=False)
monkeypatch.setattr(
gateway_cli, "_system_service_identity",
lambda run_as_user=None: ("alice", "alice", "/home/alice"),
)
monkeypatch.setattr(
gateway_cli, "_build_user_local_paths",
lambda home, existing: [],
)
unit = gateway_cli.generate_systemd_unit(system=True, run_as_user="alice")
assert 'HERMES_HOME=/home/alice/.hermes' in unit
assert '/root/.hermes' not in unit
def test_system_unit_remaps_profile_to_target_user(self, monkeypatch):
# Simulate sudo with a profile: HERMES_HOME was resolved under root
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/root/.hermes/profiles/coder")
monkeypatch.setattr(
gateway_cli, "_system_service_identity",
lambda run_as_user=None: ("alice", "alice", "/home/alice"),
)
monkeypatch.setattr(
gateway_cli, "_build_user_local_paths",
lambda home, existing: [],
)
unit = gateway_cli.generate_systemd_unit(system=True, run_as_user="alice")
assert 'HERMES_HOME=/home/alice/.hermes/profiles/coder' in unit
assert '/root/' not in unit
def test_system_unit_preserves_custom_hermes_home(self, monkeypatch):
# Custom HERMES_HOME not under any user's home — keep as-is
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/opt/hermes-shared")
monkeypatch.setattr(
gateway_cli, "_system_service_identity",
lambda run_as_user=None: ("alice", "alice", "/home/alice"),
)
monkeypatch.setattr(
gateway_cli, "_build_user_local_paths",
lambda home, existing: [],
)
unit = gateway_cli.generate_systemd_unit(system=True, run_as_user="alice")
assert 'HERMES_HOME=/opt/hermes-shared' in unit
def test_user_unit_unaffected_by_change(self):
# User-scope units should still use the calling user's HERMES_HOME
unit = gateway_cli.generate_systemd_unit(system=False)
hermes_home = str(gateway_cli.get_hermes_home().resolve())
assert f'HERMES_HOME={hermes_home}' in unit
class TestHermesHomeForTargetUser:
"""Unit tests for _hermes_home_for_target_user()."""
def test_remaps_default_home(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.delenv("HERMES_HOME", raising=False)
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/home/alice/.hermes"
def test_remaps_profile_path(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/root/.hermes/profiles/coder")
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/home/alice/.hermes/profiles/coder"
def test_keeps_custom_path(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/root")))
monkeypatch.setenv("HERMES_HOME", "/opt/hermes")
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/opt/hermes"
def test_noop_when_same_user(self, monkeypatch):
monkeypatch.setattr(Path, "home", staticmethod(lambda: Path("/home/alice")))
monkeypatch.delenv("HERMES_HOME", raising=False)
result = gateway_cli._hermes_home_for_target_user("/home/alice")
assert result == "/home/alice/.hermes"
class TestGeneratedUnitUsesDetectedVenv:
def test_systemd_unit_uses_dot_venv_when_detected(self, tmp_path, monkeypatch):
dot_venv = tmp_path / ".venv"
-143
View File
@@ -488,149 +488,6 @@ class TestExportImport:
with pytest.raises(FileNotFoundError):
export_profile("nonexistent", str(tmp_path / "out.tar.gz"))
# ---------------------------------------------------------------
# Default profile export / import
# ---------------------------------------------------------------
def test_export_default_creates_valid_archive(self, profile_env, tmp_path):
"""Exporting the default profile produces a valid tar.gz."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("model: test")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
result = export_profile("default", str(output))
assert Path(result).exists()
assert tarfile.is_tarfile(str(result))
def test_export_default_includes_profile_data(self, profile_env, tmp_path):
"""Profile data files end up in the archive."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("model: test")
(default_dir / ".env").write_text("KEY=val")
(default_dir / "SOUL.md").write_text("Be nice.")
mem_dir = default_dir / "memories"
mem_dir.mkdir(exist_ok=True)
(mem_dir / "MEMORY.md").write_text("remember this")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(output))
with tarfile.open(str(output), "r:gz") as tf:
names = tf.getnames()
assert "default/config.yaml" in names
assert "default/.env" in names
assert "default/SOUL.md" in names
assert "default/memories/MEMORY.md" in names
def test_export_default_excludes_infrastructure(self, profile_env, tmp_path):
"""Repo checkout, worktrees, profiles, databases are excluded."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
# Create dirs/files that should be excluded
for d in ("hermes-agent", ".worktrees", "profiles", "bin",
"image_cache", "logs", "sandboxes", "checkpoints"):
sub = default_dir / d
sub.mkdir(exist_ok=True)
(sub / "marker.txt").write_text("excluded")
for f in ("state.db", "gateway.pid", "gateway_state.json",
"processes.json", "errors.log", ".hermes_history",
"active_profile", ".update_check", "auth.lock"):
(default_dir / f).write_text("excluded")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(output))
with tarfile.open(str(output), "r:gz") as tf:
names = tf.getnames()
# Config is present
assert "default/config.yaml" in names
# Infrastructure excluded
excluded_prefixes = [
"default/hermes-agent", "default/.worktrees", "default/profiles",
"default/bin", "default/image_cache", "default/logs",
"default/sandboxes", "default/checkpoints",
]
for prefix in excluded_prefixes:
assert not any(n.startswith(prefix) for n in names), \
f"Expected {prefix} to be excluded but found it in archive"
excluded_files = [
"default/state.db", "default/gateway.pid",
"default/gateway_state.json", "default/processes.json",
"default/errors.log", "default/.hermes_history",
"default/active_profile", "default/.update_check",
"default/auth.lock",
]
for f in excluded_files:
assert f not in names, f"Expected {f} to be excluded"
def test_export_default_excludes_pycache_at_any_depth(self, profile_env, tmp_path):
"""__pycache__ dirs are excluded even inside nested directories."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
nested = default_dir / "skills" / "my-skill" / "__pycache__"
nested.mkdir(parents=True)
(nested / "cached.pyc").write_text("bytecode")
output = tmp_path / "export" / "default.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(output))
with tarfile.open(str(output), "r:gz") as tf:
names = tf.getnames()
assert not any("__pycache__" in n for n in names)
def test_import_default_without_name_raises(self, profile_env, tmp_path):
"""Importing a default export without --name gives clear guidance."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
archive = tmp_path / "export" / "default.tar.gz"
archive.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(archive))
with pytest.raises(ValueError, match="Cannot import as 'default'"):
import_profile(str(archive))
def test_import_default_with_explicit_default_name_raises(self, profile_env, tmp_path):
"""Explicitly importing as 'default' is also rejected."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("ok")
archive = tmp_path / "export" / "default.tar.gz"
archive.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(archive))
with pytest.raises(ValueError, match="Cannot import as 'default'"):
import_profile(str(archive), name="default")
def test_import_default_export_with_new_name_roundtrip(self, profile_env, tmp_path):
"""Export default → import under a different name → data preserved."""
default_dir = get_profile_dir("default")
(default_dir / "config.yaml").write_text("model: opus")
mem_dir = default_dir / "memories"
mem_dir.mkdir(exist_ok=True)
(mem_dir / "MEMORY.md").write_text("important fact")
archive = tmp_path / "export" / "default.tar.gz"
archive.parent.mkdir(parents=True, exist_ok=True)
export_profile("default", str(archive))
imported = import_profile(str(archive), name="backup")
assert imported.is_dir()
assert (imported / "config.yaml").read_text() == "model: opus"
assert (imported / "memories" / "MEMORY.md").read_text() == "important fact"
# ===================================================================
# TestProfileIsolation
-14
View File
@@ -32,8 +32,6 @@ def cli_obj(_isolate):
obj.session_id = None
obj.api_key = "test"
obj.base_url = ""
obj.provider = "test"
obj._provider_source = None
# Mock agent with context compressor
obj.agent = SimpleNamespace(
context_compressor=SimpleNamespace(context_length=None)
@@ -147,15 +145,3 @@ class TestLowContextWarning:
calls = [str(c) for c in cli_obj.console.print.call_args_list]
warning_calls = [c for c in calls if "too low" in c]
assert len(warning_calls) == 0
def test_compact_banner_does_not_crash_on_narrow_terminal(self, cli_obj):
"""Compact mode should still have ctx_len defined for warning logic."""
cli_obj.agent.context_compressor.context_length = 4096
with patch("shutil.get_terminal_size", return_value=os.terminal_size((70, 40))), \
patch("cli._build_compact_banner", return_value="compact banner"):
cli_obj.show_banner()
calls = [str(c) for c in cli_obj.console.print.call_args_list]
warning_calls = [c for c in calls if "too low" in c]
assert len(warning_calls) == 1
-350
View File
@@ -1,350 +0,0 @@
"""Tests for credential pool preservation through smart routing and 429 recovery.
Covers:
1. credential_pool flows through resolve_turn_route (no-route and fallback paths)
2. CLI _resolve_turn_agent_config passes credential_pool to primary dict
3. Gateway _resolve_turn_agent_config passes credential_pool to primary dict
4. Eager fallback deferred when credential pool has credentials
5. Eager fallback fires when no credential pool exists
6. Full 429 rotation cycle: retry-same rotate exhaust fallback
"""
import os
import time
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, PropertyMock
import pytest
# ---------------------------------------------------------------------------
# 1. smart_model_routing: credential_pool preserved in no-route path
# ---------------------------------------------------------------------------
class TestSmartRoutingPoolPreservation:
def test_no_route_preserves_credential_pool(self):
from agent.smart_model_routing import resolve_turn_route
fake_pool = MagicMock(name="CredentialPool")
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
# routing disabled
result = resolve_turn_route("hello", None, primary)
assert result["runtime"]["credential_pool"] is fake_pool
def test_no_route_none_pool(self):
from agent.smart_model_routing import resolve_turn_route
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
}
result = resolve_turn_route("hello", None, primary)
assert result["runtime"]["credential_pool"] is None
def test_routing_disabled_preserves_pool(self):
from agent.smart_model_routing import resolve_turn_route
fake_pool = MagicMock(name="CredentialPool")
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
# routing explicitly disabled
result = resolve_turn_route("hello", {"enabled": False}, primary)
assert result["runtime"]["credential_pool"] is fake_pool
def test_route_fallback_on_resolve_error_preserves_pool(self, monkeypatch):
"""When smart routing picks a cheap model but resolve_runtime_provider
fails, the fallback to primary must still include credential_pool."""
from agent.smart_model_routing import resolve_turn_route
fake_pool = MagicMock(name="CredentialPool")
primary = {
"model": "gpt-5.4",
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
routing_config = {
"enabled": True,
"cheap_model": "openai/gpt-4.1-mini",
"cheap_provider": "openrouter",
"max_tokens": 200,
"patterns": ["^(hi|hello|hey)"],
}
# Force resolve_runtime_provider to fail so it falls back to primary
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
MagicMock(side_effect=RuntimeError("no credentials")),
)
result = resolve_turn_route("hi", routing_config, primary)
assert result["runtime"]["credential_pool"] is fake_pool
# ---------------------------------------------------------------------------
# 2 & 3. CLI and Gateway _resolve_turn_agent_config include credential_pool
# ---------------------------------------------------------------------------
class TestCliTurnRoutePool:
def test_resolve_turn_includes_pool(self, monkeypatch, tmp_path):
"""CLI's _resolve_turn_agent_config must pass credential_pool to primary."""
from agent.smart_model_routing import resolve_turn_route
captured = {}
def spy_resolve(user_message, routing_config, primary):
captured["primary"] = primary
return resolve_turn_route(user_message, routing_config, primary)
monkeypatch.setattr(
"agent.smart_model_routing.resolve_turn_route", spy_resolve
)
# Build a minimal HermesCLI-like object with the method
shell = SimpleNamespace(
model="gpt-5.4",
api_key="sk-test",
base_url=None,
provider="openai-codex",
api_mode="codex_responses",
acp_command=None,
acp_args=[],
_credential_pool=MagicMock(name="FakePool"),
_smart_model_routing={"enabled": False},
)
# Import and bind the real method
from cli import HermesCLI
bound = HermesCLI._resolve_turn_agent_config.__get__(shell)
bound("test message")
assert "credential_pool" in captured["primary"]
assert captured["primary"]["credential_pool"] is shell._credential_pool
class TestGatewayTurnRoutePool:
def test_resolve_turn_includes_pool(self, monkeypatch):
"""Gateway's _resolve_turn_agent_config must pass credential_pool."""
from agent.smart_model_routing import resolve_turn_route
captured = {}
def spy_resolve(user_message, routing_config, primary):
captured["primary"] = primary
return resolve_turn_route(user_message, routing_config, primary)
monkeypatch.setattr(
"agent.smart_model_routing.resolve_turn_route", spy_resolve
)
from gateway.run import GatewayRunner
runner = SimpleNamespace(
_smart_model_routing={"enabled": False},
)
runtime_kwargs = {
"api_key": "sk-test",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": MagicMock(name="FakePool"),
}
bound = GatewayRunner._resolve_turn_agent_config.__get__(runner)
bound("test message", "gpt-5.4", runtime_kwargs)
assert "credential_pool" in captured["primary"]
assert captured["primary"]["credential_pool"] is runtime_kwargs["credential_pool"]
# ---------------------------------------------------------------------------
# 4 & 5. Eager fallback deferred/fires based on credential pool
# ---------------------------------------------------------------------------
class TestEagerFallbackWithPool:
"""Test the eager fallback guard in run_agent.py's error handling loop."""
def _make_agent(self, has_pool=True, pool_has_creds=True, has_fallback=True):
"""Create a minimal AIAgent mock with the fields needed."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
agent._credential_pool = None
if has_pool:
pool = MagicMock()
pool.has_available.return_value = pool_has_creds
agent._credential_pool = pool
agent._fallback_chain = [{"model": "fallback/model"}] if has_fallback else []
agent._fallback_index = 0
agent._try_activate_fallback = MagicMock(return_value=True)
agent._emit_status = MagicMock()
return agent
def test_eager_fallback_deferred_when_pool_has_credentials(self):
"""429 with active pool should NOT trigger eager fallback."""
agent = self._make_agent(has_pool=True, pool_has_creds=True, has_fallback=True)
# Simulate the check from run_agent.py lines 7180-7191
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_not_called()
def test_eager_fallback_fires_when_no_pool(self):
"""429 without pool should trigger eager fallback."""
agent = self._make_agent(has_pool=False, has_fallback=True)
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_called_once()
def test_eager_fallback_fires_when_pool_exhausted(self):
"""429 with exhausted pool should trigger eager fallback."""
agent = self._make_agent(has_pool=True, pool_has_creds=False, has_fallback=True)
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_called_once()
# ---------------------------------------------------------------------------
# 6. Full 429 rotation cycle via _recover_with_credential_pool
# ---------------------------------------------------------------------------
class TestPoolRotationCycle:
"""Verify the retry-same → rotate → exhaust flow in _recover_with_credential_pool."""
def _make_agent_with_pool(self, pool_entries=3):
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
entries = []
for i in range(pool_entries):
e = MagicMock(name=f"entry_{i}")
e.id = f"cred-{i}"
entries.append(e)
pool = MagicMock()
pool.has_credentials.return_value = True
# mark_exhausted_and_rotate returns next entry until exhausted
self._rotation_index = 0
def rotate(status_code=None):
self._rotation_index += 1
if self._rotation_index < pool_entries:
return entries[self._rotation_index]
pool.has_credentials.return_value = False
return None
pool.mark_exhausted_and_rotate = MagicMock(side_effect=rotate)
agent._credential_pool = pool
agent._swap_credential = MagicMock()
agent.log_prefix = ""
return agent, pool, entries
def test_first_429_sets_retry_flag_no_rotation(self):
"""First 429 should just set has_retried_429=True, no rotation."""
agent, pool, _ = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert recovered is False
assert has_retried is True
pool.mark_exhausted_and_rotate.assert_not_called()
def test_second_429_rotates_to_next(self):
"""Second consecutive 429 should rotate to next credential."""
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=True
)
assert recovered is True
assert has_retried is False # reset after rotation
pool.mark_exhausted_and_rotate.assert_called_once_with(status_code=429)
agent._swap_credential.assert_called_once_with(entries[1])
def test_pool_exhaustion_returns_false(self):
"""When all credentials exhausted, recovery should return False."""
agent, pool, _ = self._make_agent_with_pool(1)
# First 429 sets flag
_, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert has_retried is True
# Second 429 tries to rotate but pool is exhausted (only 1 entry)
recovered, _ = agent._recover_with_credential_pool(
status_code=429, has_retried_429=True
)
assert recovered is False
def test_402_immediate_rotation(self):
"""402 (billing) should immediately rotate, no retry-first."""
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=402, has_retried_429=False
)
assert recovered is True
assert has_retried is False
pool.mark_exhausted_and_rotate.assert_called_once_with(status_code=402)
def test_no_pool_returns_false(self):
"""No pool should return (False, unchanged)."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
agent._credential_pool = None
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert recovered is False
assert has_retried is False
+2 -119
View File
@@ -1,17 +1,7 @@
"""Tests for agent/display.py — build_tool_preview() and inline diff previews."""
"""Tests for agent/display.py — build_tool_preview()."""
import os
import pytest
from unittest.mock import MagicMock, patch
from agent.display import (
build_tool_preview,
capture_local_edit_snapshot,
extract_edit_diff,
_render_inline_unified_diff,
_summarize_rendered_diff_sections,
render_edit_diff_with_delta,
)
from agent.display import build_tool_preview
class TestBuildToolPreview:
@@ -93,110 +83,3 @@ class TestBuildToolPreview:
assert build_tool_preview("terminal", 0) is None
assert build_tool_preview("terminal", "") is None
assert build_tool_preview("terminal", []) is None
class TestEditDiffPreview:
def test_extract_edit_diff_for_patch(self):
diff = extract_edit_diff("patch", '{"success": true, "diff": "--- a/x\\n+++ b/x\\n"}')
assert diff is not None
assert "+++ b/x" in diff
def test_render_inline_unified_diff_colors_added_and_removed_lines(self):
rendered = _render_inline_unified_diff(
"--- a/cli.py\n"
"+++ b/cli.py\n"
"@@ -1,2 +1,2 @@\n"
"-old line\n"
"+new line\n"
" context\n"
)
assert "a/cli.py" in rendered[0]
assert "b/cli.py" in rendered[0]
assert any("old line" in line for line in rendered)
assert any("new line" in line for line in rendered)
assert any("48;2;" in line for line in rendered)
def test_extract_edit_diff_ignores_non_edit_tools(self):
assert extract_edit_diff("web_search", '{"diff": "--- a\\n+++ b\\n"}') is None
def test_extract_edit_diff_uses_local_snapshot_for_write_file(self, tmp_path):
target = tmp_path / "note.txt"
target.write_text("old\n", encoding="utf-8")
snapshot = capture_local_edit_snapshot("write_file", {"path": str(target)})
target.write_text("new\n", encoding="utf-8")
diff = extract_edit_diff(
"write_file",
'{"bytes_written": 4}',
function_args={"path": str(target)},
snapshot=snapshot,
)
assert diff is not None
assert "--- a/" in diff
assert "+++ b/" in diff
assert "-old" in diff
assert "+new" in diff
def test_render_edit_diff_with_delta_invokes_printer(self):
printer = MagicMock()
rendered = render_edit_diff_with_delta(
"patch",
'{"diff": "--- a/x\\n+++ b/x\\n@@ -1 +1 @@\\n-old\\n+new\\n"}',
print_fn=printer,
)
assert rendered is True
assert printer.call_count >= 2
calls = [call.args[0] for call in printer.call_args_list]
assert any("a/x" in line and "b/x" in line for line in calls)
assert any("old" in line for line in calls)
assert any("new" in line for line in calls)
def test_render_edit_diff_with_delta_skips_without_diff(self):
rendered = render_edit_diff_with_delta(
"patch",
'{"success": true}',
)
assert rendered is False
def test_render_edit_diff_with_delta_handles_renderer_errors(self, monkeypatch):
printer = MagicMock()
monkeypatch.setattr("agent.display._summarize_rendered_diff_sections", MagicMock(side_effect=RuntimeError("boom")))
rendered = render_edit_diff_with_delta(
"patch",
'{"diff": "--- a/x\\n+++ b/x\\n"}',
print_fn=printer,
)
assert rendered is False
assert printer.call_count == 0
def test_summarize_rendered_diff_sections_truncates_large_diff(self):
diff = "--- a/x.py\n+++ b/x.py\n" + "".join(f"+line{i}\n" for i in range(120))
rendered = _summarize_rendered_diff_sections(diff, max_lines=20)
assert len(rendered) == 21
assert "omitted" in rendered[-1]
def test_summarize_rendered_diff_sections_limits_file_count(self):
diff = "".join(
f"--- a/file{i}.py\n+++ b/file{i}.py\n+line{i}\n"
for i in range(8)
)
rendered = _summarize_rendered_diff_sections(diff, max_files=3, max_lines=50)
assert any("a/file0.py" in line for line in rendered)
assert any("a/file1.py" in line for line in rendered)
assert any("a/file2.py" in line for line in rendered)
assert not any("a/file7.py" in line for line in rendered)
assert "additional file" in rendered[-1]
-108
View File
@@ -1239,42 +1239,6 @@ class TestConcurrentToolExecution:
)
assert result == "result"
def test_sequential_tool_callbacks_fire_in_order(self, agent):
tool_call = _mock_tool_call(name="web_search", arguments='{"query":"hello"}', call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tool_call])
messages = []
starts = []
completes = []
agent.tool_start_callback = lambda tool_call_id, function_name, function_args: starts.append((tool_call_id, function_name, function_args))
agent.tool_complete_callback = lambda tool_call_id, function_name, function_args, function_result: completes.append((tool_call_id, function_name, function_args, function_result))
with patch("run_agent.handle_function_call", return_value='{"success": true}'):
agent._execute_tool_calls_sequential(mock_msg, messages, "task-1")
assert starts == [("c1", "web_search", {"query": "hello"})]
assert completes == [("c1", "web_search", {"query": "hello"}, '{"success": true}')]
def test_concurrent_tool_callbacks_fire_for_each_tool(self, agent):
tc1 = _mock_tool_call(name="web_search", arguments='{"query":"one"}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{"query":"two"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
starts = []
completes = []
agent.tool_start_callback = lambda tool_call_id, function_name, function_args: starts.append((tool_call_id, function_name, function_args))
agent.tool_complete_callback = lambda tool_call_id, function_name, function_args, function_result: completes.append((tool_call_id, function_name, function_args, function_result))
with patch("run_agent.handle_function_call", side_effect=['{"id":1}', '{"id":2}']):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert starts == [
("c1", "web_search", {"query": "one"}),
("c2", "web_search", {"query": "two"}),
]
assert len(completes) == 2
assert {entry[0] for entry in completes} == {"c1", "c2"}
assert {entry[3] for entry in completes} == {'{"id":1}', '{"id":2}'}
def test_invoke_tool_handles_agent_level_tools(self, agent):
"""_invoke_tool should handle todo tool directly."""
with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo:
@@ -1316,38 +1280,6 @@ class TestPathsOverlap:
assert not _paths_overlap(Path("src/a.py"), Path(""))
class TestParallelScopePathNormalization:
def test_extract_parallel_scope_path_normalizes_relative_to_cwd(self, tmp_path, monkeypatch):
from run_agent import _extract_parallel_scope_path
monkeypatch.chdir(tmp_path)
scoped = _extract_parallel_scope_path("write_file", {"path": "./notes.txt"})
assert scoped == tmp_path / "notes.txt"
def test_extract_parallel_scope_path_treats_relative_and_absolute_same_file_as_same_scope(self, tmp_path, monkeypatch):
from run_agent import _extract_parallel_scope_path, _paths_overlap
monkeypatch.chdir(tmp_path)
abs_path = tmp_path / "notes.txt"
rel_scoped = _extract_parallel_scope_path("write_file", {"path": "notes.txt"})
abs_scoped = _extract_parallel_scope_path("write_file", {"path": str(abs_path)})
assert rel_scoped == abs_scoped
assert _paths_overlap(rel_scoped, abs_scoped)
def test_should_parallelize_tool_batch_rejects_same_file_with_mixed_path_spellings(self, tmp_path, monkeypatch):
from run_agent import _should_parallelize_tool_batch
monkeypatch.chdir(tmp_path)
tc1 = _mock_tool_call(name="write_file", arguments='{"path":"notes.txt","content":"one"}', call_id="c1")
tc2 = _mock_tool_call(name="write_file", arguments=f'{{"path":"{tmp_path / "notes.txt"}","content":"two"}}', call_id="c2")
assert not _should_parallelize_tool_batch([tc1, tc2])
class TestHandleMaxIterations:
def test_returns_summary(self, agent):
resp = _mock_response(content="Here is a summary of what I did.")
@@ -2809,46 +2741,6 @@ def test_is_openai_client_closed_honors_custom_client_flag():
assert AIAgent._is_openai_client_closed(SimpleNamespace(is_closed=False)) is False
def test_is_openai_client_closed_handles_method_form():
"""Fix for issue #4377: is_closed as method (openai SDK) vs property (httpx).
The openai SDK's is_closed is a method, not a property. Prior to this fix,
getattr(client, "is_closed", False) returned the bound method object, which
is always truthy, causing the function to incorrectly report all clients as
closed and triggering unnecessary client recreation on every API call.
"""
class MethodFormClient:
"""Mimics openai.OpenAI where is_closed() is a method."""
def __init__(self, closed: bool):
self._closed = closed
def is_closed(self) -> bool:
return self._closed
# Method returning False - client is open
open_client = MethodFormClient(closed=False)
assert AIAgent._is_openai_client_closed(open_client) is False
# Method returning True - client is closed
closed_client = MethodFormClient(closed=True)
assert AIAgent._is_openai_client_closed(closed_client) is True
def test_is_openai_client_closed_falls_back_to_http_client():
"""Verify fallback to _client.is_closed when top-level is_closed is None."""
class ClientWithHttpClient:
is_closed = None # No top-level is_closed
def __init__(self, http_closed: bool):
self._client = SimpleNamespace(is_closed=http_closed)
assert AIAgent._is_openai_client_closed(ClientWithHttpClient(http_closed=False)) is False
assert AIAgent._is_openai_client_closed(ClientWithHttpClient(http_closed=True)) is True
class TestAnthropicBaseUrlPassthrough:
"""Bug fix: base_url was filtered with 'anthropic in base_url', blocking proxies."""
@@ -1,242 +0,0 @@
"""Persistence tests for the Camofox browser backend.
Tests that managed persistence uses stable identity while default mode
uses random identity. The actual browser profile persistence is handled
by the Camofox server (when CAMOFOX_PROFILE_DIR is set).
"""
import json
from unittest.mock import MagicMock, patch
import pytest
from tools.browser_camofox import (
_drop_session,
_get_session,
_managed_persistence_enabled,
camofox_close,
camofox_navigate,
check_camofox_available,
cleanup_all_camofox_sessions,
get_vnc_url,
)
from tools.browser_camofox_state import get_camofox_identity
def _mock_response(status=200, json_data=None):
resp = MagicMock()
resp.status_code = status
resp.json.return_value = json_data or {}
resp.raise_for_status = MagicMock()
return resp
def _enable_persistence():
"""Return a patch context that enables managed persistence via config."""
config = {"browser": {"camofox": {"managed_persistence": True}}}
return patch("tools.browser_camofox.load_config", return_value=config)
@pytest.fixture(autouse=True)
def _clear_session_state():
import tools.browser_camofox as mod
yield
with mod._sessions_lock:
mod._sessions.clear()
mod._vnc_url = None
mod._vnc_url_checked = False
class TestManagedPersistenceToggle:
def test_disabled_by_default(self):
config = {"browser": {"camofox": {"managed_persistence": False}}}
with patch("tools.browser_camofox.load_config", return_value=config):
assert _managed_persistence_enabled() is False
def test_enabled_via_config_yaml(self):
config = {"browser": {"camofox": {"managed_persistence": True}}}
with patch("tools.browser_camofox.load_config", return_value=config):
assert _managed_persistence_enabled() is True
def test_disabled_when_key_missing(self):
config = {"browser": {}}
with patch("tools.browser_camofox.load_config", return_value=config):
assert _managed_persistence_enabled() is False
def test_disabled_on_config_load_error(self):
with patch("tools.browser_camofox.load_config", side_effect=Exception("fail")):
assert _managed_persistence_enabled() is False
class TestEphemeralMode:
"""Default behavior: random userId, no persistence."""
def test_session_gets_random_user_id(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
session = _get_session("task-1")
assert session["user_id"].startswith("hermes_")
assert session["managed"] is False
def test_different_tasks_get_different_user_ids(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
s1 = _get_session("task-1")
s2 = _get_session("task-2")
assert s1["user_id"] != s2["user_id"]
def test_session_reuse_within_same_task(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
s1 = _get_session("task-1")
s2 = _get_session("task-1")
assert s1 is s2
class TestManagedPersistenceMode:
"""With managed_persistence: stable userId derived from Hermes profile."""
def test_session_gets_stable_user_id(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
session = _get_session("task-1")
expected = get_camofox_identity("task-1")
assert session["user_id"] == expected["user_id"]
assert session["session_key"] == expected["session_key"]
assert session["managed"] is True
def test_same_user_id_after_session_drop(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
s1 = _get_session("task-1")
uid1 = s1["user_id"]
_drop_session("task-1")
s2 = _get_session("task-1")
assert s2["user_id"] == uid1
def test_same_user_id_across_tasks(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
s1 = _get_session("task-a")
s2 = _get_session("task-b")
# Same profile = same userId, different session keys
assert s1["user_id"] == s2["user_id"]
assert s1["session_key"] != s2["session_key"]
def test_different_profiles_get_different_user_ids(self, tmp_path, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
with _enable_persistence():
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "profile-a"))
s1 = _get_session("task-1")
uid_a = s1["user_id"]
_drop_session("task-1")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "profile-b"))
s2 = _get_session("task-1")
assert s2["user_id"] != uid_a
def test_navigate_uses_stable_identity(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
requests_seen = []
def _capture_post(url, json=None, timeout=None):
requests_seen.append(json)
return _mock_response(
json_data={"tabId": "tab-1", "url": "https://example.com"}
)
with _enable_persistence(), \
patch("tools.browser_camofox.requests.post", side_effect=_capture_post):
result = json.loads(camofox_navigate("https://example.com", task_id="task-1"))
assert result["success"] is True
expected = get_camofox_identity("task-1")
assert requests_seen[0]["userId"] == expected["user_id"]
def test_navigate_reuses_identity_after_close(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
requests_seen = []
def _capture_post(url, json=None, timeout=None):
requests_seen.append(json)
return _mock_response(
json_data={"tabId": f"tab-{len(requests_seen)}", "url": "https://example.com"}
)
with (
_enable_persistence(),
patch("tools.browser_camofox.requests.post", side_effect=_capture_post),
patch("tools.browser_camofox.requests.delete", return_value=_mock_response()),
):
first = json.loads(camofox_navigate("https://example.com", task_id="task-1"))
camofox_close("task-1")
second = json.loads(camofox_navigate("https://example.com", task_id="task-1"))
assert first["success"] is True
assert second["success"] is True
tab_requests = [req for req in requests_seen if "userId" in req]
assert len(tab_requests) == 2
assert tab_requests[0]["userId"] == tab_requests[1]["userId"]
class TestVncUrlDiscovery:
"""VNC URL is derived from the Camofox health endpoint."""
def test_vnc_url_from_health_port(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://myhost:9377")
health_resp = _mock_response(json_data={"ok": True, "vncPort": 6080})
with patch("tools.browser_camofox.requests.get", return_value=health_resp):
assert check_camofox_available() is True
assert get_vnc_url() == "http://myhost:6080"
def test_vnc_url_none_when_headless(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
health_resp = _mock_response(json_data={"ok": True})
with patch("tools.browser_camofox.requests.get", return_value=health_resp):
check_camofox_available()
assert get_vnc_url() is None
def test_vnc_url_rejects_invalid_port(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
health_resp = _mock_response(json_data={"ok": True, "vncPort": "bad"})
with patch("tools.browser_camofox.requests.get", return_value=health_resp):
check_camofox_available()
assert get_vnc_url() is None
def test_vnc_url_only_probed_once(self, monkeypatch):
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
health_resp = _mock_response(json_data={"ok": True, "vncPort": 6080})
with patch("tools.browser_camofox.requests.get", return_value=health_resp) as mock_get:
check_camofox_available()
check_camofox_available()
# Second call still hits /health for availability but doesn't re-parse vncPort
assert get_vnc_url() == "http://localhost:6080"
def test_navigate_includes_vnc_hint(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
import tools.browser_camofox as mod
mod._vnc_url = "http://localhost:6080"
mod._vnc_url_checked = True
with patch("tools.browser_camofox.requests.post", return_value=_mock_response(
json_data={"tabId": "t1", "url": "https://example.com"}
)):
result = json.loads(camofox_navigate("https://example.com", task_id="vnc-test"))
assert result["vnc_url"] == "http://localhost:6080"
assert "vnc_hint" in result
-66
View File
@@ -1,66 +0,0 @@
"""Tests for Hermes-managed Camofox state helpers."""
from unittest.mock import patch
import pytest
def _load_module():
from tools import browser_camofox_state as state
return state
class TestCamofoxStatePaths:
def test_paths_are_profile_scoped(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
assert state.get_camofox_state_dir() == tmp_path / "browser_auth" / "camofox"
class TestCamofoxIdentity:
def test_identity_is_deterministic(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
first = state.get_camofox_identity("task-1")
second = state.get_camofox_identity("task-1")
assert first == second
def test_identity_differs_by_task(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
a = state.get_camofox_identity("task-a")
b = state.get_camofox_identity("task-b")
# Same user (same profile), different session keys
assert a["user_id"] == b["user_id"]
assert a["session_key"] != b["session_key"]
def test_identity_differs_by_profile(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path / "profile-a"):
a = state.get_camofox_identity("task-1")
with patch.object(state, "get_hermes_home", return_value=tmp_path / "profile-b"):
b = state.get_camofox_identity("task-1")
assert a["user_id"] != b["user_id"]
def test_default_task_id(self, tmp_path):
state = _load_module()
with patch.object(state, "get_hermes_home", return_value=tmp_path):
identity = state.get_camofox_identity()
assert "user_id" in identity
assert "session_key" in identity
assert identity["user_id"].startswith("hermes_")
assert identity["session_key"].startswith("task_")
class TestCamofoxConfigDefaults:
def test_default_config_includes_managed_persistence_toggle(self):
from hermes_cli.config import DEFAULT_CONFIG
browser_cfg = DEFAULT_CONFIG["browser"]
assert browser_cfg["camofox"]["managed_persistence"] is False
def test_config_version_unchanged(self):
from hermes_cli.config import DEFAULT_CONFIG
# managed_persistence is auto-merged by _deep_merge, no version bump needed
assert DEFAULT_CONFIG["_config_version"] == 11
+2 -2
View File
@@ -221,7 +221,7 @@ class TestCheckFileStalenessHelper(unittest.TestCase):
_read_tracker["t1"] = {
"last_key": None, "consecutive": 0,
"read_history": set(), "dedup": {},
"read_timestamps": {"/tmp/other.py": 12345.0},
"file_mtimes": {"/tmp/other.py": 12345.0},
}
self.assertIsNone(_check_file_staleness("/tmp/x.py", "t1"))
@@ -231,7 +231,7 @@ class TestCheckFileStalenessHelper(unittest.TestCase):
_read_tracker["t1"] = {
"last_key": None, "consecutive": 0,
"read_history": set(), "dedup": {},
"read_timestamps": {"/nonexistent/path": 99999.0},
"file_mtimes": {"/nonexistent/path": 99999.0},
}
# File doesn't exist → stat fails → returns None (let write handle it)
self.assertIsNone(_check_file_staleness("/nonexistent/path", "t1"))
-174
View File
@@ -1,174 +0,0 @@
"""Tests for skill fuzzy patching via tools.fuzzy_match."""
import json
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from tools.skill_manager_tool import (
_create_skill,
_patch_skill,
_write_file,
skill_manage,
)
SKILL_CONTENT = """\
---
name: test-skill
description: A test skill for unit testing.
---
# Test Skill
Step 1: Do the thing.
Step 2: Do another thing.
Step 3: Final step.
"""
# ---------------------------------------------------------------------------
# Fuzzy patching
# ---------------------------------------------------------------------------
class TestFuzzyPatchSkill:
@pytest.fixture(autouse=True)
def setup_skills(self, tmp_path, monkeypatch):
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
monkeypatch.setattr("tools.skill_manager_tool.SKILLS_DIR", skills_dir)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
self.skills_dir = skills_dir
def test_exact_match_still_works(self):
_create_skill("test-skill", SKILL_CONTENT)
result = _patch_skill("test-skill", "Step 1: Do the thing.", "Step 1: Done!")
assert result["success"] is True
content = (self.skills_dir / "test-skill" / "SKILL.md").read_text()
assert "Step 1: Done!" in content
def test_whitespace_trimmed_match(self):
"""Patch with extra leading whitespace should still find the target."""
skill = """\
---
name: ws-skill
description: Whitespace test
---
# Commands
def hello():
print("hi")
"""
_create_skill("ws-skill", skill)
# Agent sends patch with no leading whitespace (common LLM behaviour)
result = _patch_skill("ws-skill", "def hello():\n print(\"hi\")", "def hello():\n print(\"hello world\")")
assert result["success"] is True
content = (self.skills_dir / "ws-skill" / "SKILL.md").read_text()
assert 'print("hello world")' in content
def test_indentation_flexible_match(self):
"""Patch where only indentation differs should succeed."""
skill = """\
---
name: indent-skill
description: Indentation test
---
# Steps
1. First step
2. Second step
3. Third step
"""
_create_skill("indent-skill", skill)
# Agent sends with different indentation
result = _patch_skill(
"indent-skill",
"1. First step\n2. Second step",
"1. Updated first\n2. Updated second"
)
assert result["success"] is True
content = (self.skills_dir / "indent-skill" / "SKILL.md").read_text()
assert "Updated first" in content
def test_multiple_matches_blocked_without_replace_all(self):
"""Multiple fuzzy matches should return an error without replace_all."""
skill = """\
---
name: dup-skill
description: Duplicate test
---
# Steps
word word word
"""
_create_skill("dup-skill", skill)
result = _patch_skill("dup-skill", "word", "replaced")
assert result["success"] is False
assert "match" in result["error"].lower()
def test_replace_all_with_fuzzy(self):
skill = """\
---
name: dup-skill
description: Duplicate test
---
# Steps
word word word
"""
_create_skill("dup-skill", skill)
result = _patch_skill("dup-skill", "word", "replaced", replace_all=True)
assert result["success"] is True
content = (self.skills_dir / "dup-skill" / "SKILL.md").read_text()
assert "word" not in content
assert "replaced" in content
def test_no_match_returns_preview(self):
_create_skill("test-skill", SKILL_CONTENT)
result = _patch_skill("test-skill", "this does not exist anywhere", "replacement")
assert result["success"] is False
assert "file_preview" in result
def test_fuzzy_patch_on_supporting_file(self):
"""Fuzzy matching should also work on supporting files."""
_create_skill("test-skill", SKILL_CONTENT)
ref_content = " function hello() {\n console.log('hi');\n }"
_write_file("test-skill", "references/code.js", ref_content)
# Patch with stripped indentation
result = _patch_skill(
"test-skill",
"function hello() {\nconsole.log('hi');\n}",
"function hello() {\nconsole.log('hello world');\n}",
file_path="references/code.js"
)
assert result["success"] is True
content = (self.skills_dir / "test-skill" / "references" / "code.js").read_text()
assert "hello world" in content
def test_patch_preserves_frontmatter_validation(self):
"""Fuzzy matching should still run frontmatter validation on SKILL.md."""
_create_skill("test-skill", SKILL_CONTENT)
# Try to destroy the frontmatter via patch
result = _patch_skill("test-skill", "---\nname: test-skill", "BROKEN")
assert result["success"] is False
assert "structure" in result["error"].lower() or "frontmatter" in result["error"].lower()
def test_skill_manage_patch_uses_fuzzy(self):
"""The dispatcher should route to the fuzzy-matching patch."""
_create_skill("test-skill", SKILL_CONTENT)
raw = skill_manage(
action="patch",
name="test-skill",
old_string=" Step 1: Do the thing.", # extra leading space
new_string="Step 1: Updated.",
)
result = json.loads(raw)
# Should succeed via line-trimmed or indentation-flexible matching
assert result["success"] is True
+2 -2
View File
@@ -271,7 +271,7 @@ class TestPatchSkill:
_create_skill("my-skill", VALID_SKILL_CONTENT)
result = _patch_skill("my-skill", "this text does not exist", "replacement")
assert result["success"] is False
assert "not found" in result["error"].lower() or "could not find" in result["error"].lower()
assert "not found" in result["error"]
def test_patch_ambiguous_match_rejected(self, tmp_path):
content = """\
@@ -288,7 +288,7 @@ word word
_create_skill("my-skill", content)
result = _patch_skill("my-skill", "word", "replaced")
assert result["success"] is False
assert "match" in result["error"].lower()
assert "matched" in result["error"]
def test_patch_replace_all(self, tmp_path):
content = """\
-215
View File
@@ -1,215 +0,0 @@
"""Tests for skill content size limits.
Agent writes (create/edit/patch/write_file) are constrained to
MAX_SKILL_CONTENT_CHARS (100k) and MAX_SKILL_FILE_BYTES (1 MiB).
Hand-placed and hub-installed skills have no hard limit.
"""
import json
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from tools.skill_manager_tool import (
MAX_SKILL_CONTENT_CHARS,
MAX_SKILL_FILE_BYTES,
_validate_content_size,
skill_manage,
)
@pytest.fixture(autouse=True)
def isolate_skills(tmp_path, monkeypatch):
"""Redirect SKILLS_DIR to a temp directory."""
skills_dir = tmp_path / "skills"
skills_dir.mkdir()
monkeypatch.setattr("tools.skill_manager_tool.SKILLS_DIR", skills_dir)
monkeypatch.setattr("tools.skills_tool.SKILLS_DIR", skills_dir)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
return skills_dir
def _make_skill_content(body_chars: int) -> str:
"""Generate valid SKILL.md content with a body of the given character count."""
frontmatter = (
"---\n"
"name: test-skill\n"
"description: A test skill\n"
"---\n"
)
body = "# Test Skill\n\n" + ("x" * max(0, body_chars - 15))
return frontmatter + body
class TestValidateContentSize:
"""Unit tests for _validate_content_size."""
def test_within_limit(self):
assert _validate_content_size("a" * 1000) is None
def test_at_limit(self):
assert _validate_content_size("a" * MAX_SKILL_CONTENT_CHARS) is None
def test_over_limit(self):
err = _validate_content_size("a" * (MAX_SKILL_CONTENT_CHARS + 1))
assert err is not None
assert "100,001" in err
assert "100,000" in err
def test_custom_label(self):
err = _validate_content_size("a" * (MAX_SKILL_CONTENT_CHARS + 1), label="references/api.md")
assert "references/api.md" in err
class TestCreateSkillSizeLimit:
"""create action rejects oversized content."""
def test_create_within_limit(self, isolate_skills):
content = _make_skill_content(5000)
result = json.loads(skill_manage(action="create", name="small-skill", content=content))
assert result["success"] is True
def test_create_over_limit(self, isolate_skills):
content = _make_skill_content(MAX_SKILL_CONTENT_CHARS + 100)
result = json.loads(skill_manage(action="create", name="huge-skill", content=content))
assert result["success"] is False
assert "100,000" in result["error"]
def test_create_at_limit(self, isolate_skills):
# Content at exactly the limit should succeed
frontmatter = "---\nname: edge-skill\ndescription: Edge case\n---\n# Edge\n\n"
body_budget = MAX_SKILL_CONTENT_CHARS - len(frontmatter)
content = frontmatter + ("x" * body_budget)
assert len(content) == MAX_SKILL_CONTENT_CHARS
result = json.loads(skill_manage(action="create", name="edge-skill", content=content))
assert result["success"] is True
class TestEditSkillSizeLimit:
"""edit action rejects oversized content."""
def test_edit_over_limit(self, isolate_skills):
# Create a small skill first
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="grow-me", content=small))
# Try to edit it to be oversized
big = _make_skill_content(MAX_SKILL_CONTENT_CHARS + 100)
# Fix the name in frontmatter
big = big.replace("name: test-skill", "name: grow-me")
result = json.loads(skill_manage(action="edit", name="grow-me", content=big))
assert result["success"] is False
assert "100,000" in result["error"]
class TestPatchSkillSizeLimit:
"""patch action checks resulting size, not just the new_string."""
def test_patch_that_would_exceed_limit(self, isolate_skills):
# Create a skill near the limit
near_limit = _make_skill_content(MAX_SKILL_CONTENT_CHARS - 50)
json.loads(skill_manage(action="create", name="near-limit", content=near_limit))
# Patch that adds enough to go over
result = json.loads(skill_manage(
action="patch",
name="near-limit",
old_string="# Test Skill",
new_string="# Test Skill\n" + ("y" * 200),
))
assert result["success"] is False
assert "100,000" in result["error"]
def test_patch_that_reduces_size_on_oversized_skill(self, isolate_skills, tmp_path):
"""Patches that shrink an already-oversized skill should succeed."""
# Manually create an oversized skill (simulating hand-placed)
skill_dir = tmp_path / "skills" / "bloated"
skill_dir.mkdir(parents=True)
oversized = _make_skill_content(MAX_SKILL_CONTENT_CHARS + 5000)
oversized = oversized.replace("name: test-skill", "name: bloated")
(skill_dir / "SKILL.md").write_text(oversized, encoding="utf-8")
assert len(oversized) > MAX_SKILL_CONTENT_CHARS
# Patch that removes content to bring it under the limit.
# Use replace_all to replace the repeated x's with a shorter string.
result = json.loads(skill_manage(
action="patch",
name="bloated",
old_string="x" * 100,
new_string="y",
replace_all=True,
))
# Should succeed because the result is well within limits
assert result["success"] is True
def test_patch_supporting_file_size_limit(self, isolate_skills):
"""Patch on a supporting file also checks size."""
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="with-ref", content=small))
# Create a supporting file
json.loads(skill_manage(
action="write_file",
name="with-ref",
file_path="references/data.md",
file_content="# Data\n\nSmall content.",
))
# Try to patch it to be oversized
result = json.loads(skill_manage(
action="patch",
name="with-ref",
old_string="Small content.",
new_string="x" * (MAX_SKILL_CONTENT_CHARS + 100),
file_path="references/data.md",
))
assert result["success"] is False
assert "references/data.md" in result["error"]
class TestWriteFileSizeLimit:
"""write_file action enforces both char and byte limits."""
def test_write_file_over_char_limit(self, isolate_skills):
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="file-test", content=small))
result = json.loads(skill_manage(
action="write_file",
name="file-test",
file_path="references/huge.md",
file_content="x" * (MAX_SKILL_CONTENT_CHARS + 1),
))
assert result["success"] is False
assert "100,000" in result["error"]
def test_write_file_within_limit(self, isolate_skills):
small = _make_skill_content(1000)
json.loads(skill_manage(action="create", name="file-ok", content=small))
result = json.loads(skill_manage(
action="write_file",
name="file-ok",
file_path="references/normal.md",
file_content="# Normal\n\n" + ("x" * 5000),
))
assert result["success"] is True
class TestHandPlacedSkillsNoLimit:
"""Skills dropped directly on disk are not constrained."""
def test_oversized_handplaced_skill_loads(self, isolate_skills, tmp_path):
"""A hand-placed 200k skill can still be read via skill_view."""
from tools.skills_tool import skill_view
skill_dir = tmp_path / "skills" / "manual-giant"
skill_dir.mkdir(parents=True)
huge = _make_skill_content(200_000)
huge = huge.replace("name: test-skill", "name: manual-giant")
(skill_dir / "SKILL.md").write_text(huge, encoding="utf-8")
result = json.loads(skill_view("manual-giant"))
assert "content" in result
# The full content is returned — no truncation at the storage layer
assert len(result["content"]) > MAX_SKILL_CONTENT_CHARS
+9 -74
View File
@@ -34,9 +34,6 @@ from typing import Any, Dict, Optional
import requests
from hermes_cli.config import load_config
from tools.browser_camofox_state import get_camofox_identity
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
@@ -45,8 +42,6 @@ logger = logging.getLogger(__name__)
_DEFAULT_TIMEOUT = 30 # seconds per HTTP request
_SNAPSHOT_MAX_CHARS = 80_000 # camofox paginates at this limit
_vnc_url: Optional[str] = None # cached from /health response
_vnc_url_checked = False # only probe once per process
def get_camofox_url() -> str:
@@ -61,52 +56,16 @@ def is_camofox_mode() -> bool:
def check_camofox_available() -> bool:
"""Verify the Camofox server is reachable."""
global _vnc_url, _vnc_url_checked
url = get_camofox_url()
if not url:
return False
try:
resp = requests.get(f"{url}/health", timeout=5)
if resp.status_code == 200 and not _vnc_url_checked:
try:
data = resp.json()
vnc_port = data.get("vncPort")
if isinstance(vnc_port, int) and 1 <= vnc_port <= 65535:
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname or "localhost"
_vnc_url = f"http://{host}:{vnc_port}"
except (ValueError, KeyError):
pass
_vnc_url_checked = True
return resp.status_code == 200
except Exception:
return False
def get_vnc_url() -> Optional[str]:
"""Return the VNC URL if the Camofox server exposes one, or None."""
if not _vnc_url_checked:
check_camofox_available()
return _vnc_url
def _managed_persistence_enabled() -> bool:
"""Return whether Hermes-managed persistence is enabled for Camofox.
When enabled, sessions use a stable profile-scoped userId so the
Camofox server can map it to a persistent browser profile directory.
When disabled (default), each session gets a random userId (ephemeral).
Controlled by ``browser.camofox.managed_persistence`` in config.yaml.
"""
try:
camofox_cfg = load_config().get("browser", {}).get("camofox", {})
except Exception:
return False
return bool(camofox_cfg.get("managed_persistence"))
# ---------------------------------------------------------------------------
# Session management
# ---------------------------------------------------------------------------
@@ -116,31 +75,16 @@ _sessions_lock = threading.Lock()
def _get_session(task_id: Optional[str]) -> Dict[str, Any]:
"""Get or create a camofox session for the given task.
When managed persistence is enabled, uses a deterministic userId
derived from the Hermes profile so the Camofox server can map it
to the same persistent browser profile across restarts.
"""
"""Get or create a camofox session for the given task."""
task_id = task_id or "default"
with _sessions_lock:
if task_id in _sessions:
return _sessions[task_id]
if _managed_persistence_enabled():
identity = get_camofox_identity(task_id)
session = {
"user_id": identity["user_id"],
"tab_id": None,
"session_key": identity["session_key"],
"managed": True,
}
else:
session = {
"user_id": f"hermes_{uuid.uuid4().hex[:10]}",
"tab_id": None,
"session_key": f"task_{task_id[:16]}",
"managed": False,
}
session = {
"user_id": f"hermes_{uuid.uuid4().hex[:10]}",
"tab_id": None,
"session_key": f"task_{task_id[:16]}",
}
_sessions[task_id] = session
return session
@@ -228,19 +172,11 @@ def camofox_navigate(url: str, task_id: Optional[str] = None) -> str:
{"userId": session["user_id"], "url": url},
timeout=60,
)
result = {
return json.dumps({
"success": True,
"url": data.get("url", url),
"title": data.get("title", ""),
}
vnc = get_vnc_url()
if vnc:
result["vnc_url"] = vnc
result["vnc_hint"] = (
"Browser is visible via VNC. "
"Share this link with the user so they can watch the browser live."
)
return json.dumps(result)
})
except requests.HTTPError as e:
return json.dumps({"success": False, "error": f"Navigation failed: {e}"})
except requests.ConnectionError:
@@ -500,7 +436,7 @@ def camofox_vision(question: str, annotate: bool = False,
except Exception:
_vision_timeout = 120
response = call_llm(
analysis = call_llm(
messages=[{
"role": "user",
"content": [
@@ -516,7 +452,6 @@ def camofox_vision(question: str, annotate: bool = False,
task="vision",
timeout=_vision_timeout,
)
analysis = response.choices[0].message.content if response.choices else ""
return json.dumps({
"success": True,
-47
View File
@@ -1,47 +0,0 @@
"""Hermes-managed Camofox state helpers.
Provides profile-scoped identity and state directory paths for Camofox
persistent browser profiles. When managed persistence is enabled, Hermes
sends a deterministic userId derived from the active profile so that
Camofox can map it to the same persistent browser profile directory
across restarts.
"""
from __future__ import annotations
import uuid
from pathlib import Path
from typing import Dict, Optional
from hermes_constants import get_hermes_home
CAMOFOX_STATE_DIR_NAME = "browser_auth"
CAMOFOX_STATE_SUBDIR = "camofox"
def get_camofox_state_dir() -> Path:
"""Return the profile-scoped root directory for Camofox persistence."""
return get_hermes_home() / CAMOFOX_STATE_DIR_NAME / CAMOFOX_STATE_SUBDIR
def get_camofox_identity(task_id: Optional[str] = None) -> Dict[str, str]:
"""Return the stable Hermes-managed Camofox identity for this profile.
The user identity is profile-scoped (same Hermes profile = same userId).
The session key is scoped to the logical browser task so newly created
tabs within the same profile reuse the same identity contract.
"""
scope_root = str(get_camofox_state_dir())
logical_scope = task_id or "default"
user_digest = uuid.uuid5(
uuid.NAMESPACE_URL,
f"camofox-user:{scope_root}",
).hex[:10]
session_digest = uuid.uuid5(
uuid.NAMESPACE_URL,
f"camofox-session:{scope_root}:{logical_scope}",
).hex[:16]
return {
"user_id": f"hermes_{user_digest}",
"session_key": f"task_{session_digest}",
}
-8
View File
@@ -596,14 +596,6 @@ def execute_code(
stdout_text = strip_ansi(stdout_text)
stderr_text = strip_ansi(stderr_text)
# Redact secrets (API keys, tokens, etc.) from sandbox output.
# The sandbox env-var filter (lines 434-454) blocks os.environ access,
# but scripts can still read secrets from disk (e.g. open('~/.hermes/.env')).
# This ensures leaked secrets never enter the model context.
from agent.redact import redact_sensitive_text
stdout_text = redact_sensitive_text(stdout_text)
stderr_text = redact_sensitive_text(stderr_text)
# Build response
result: Dict[str, Any] = {
"status": status,
+5 -34
View File
@@ -136,12 +136,9 @@ _file_ops_cache: dict = {}
# Used to skip re-reads of unchanged files. Reset on
# context compression (the original content is summarised
# away so the model needs the full content again).
# "read_timestamps": dict mapping resolved_path → modification-time float
# recorded when the file was last read (or written) by
# this task. Used by write_file and patch to detect
# external changes between the agent's read and write.
# Updated after successful writes so consecutive edits
# by the same task don't trigger false warnings.
# "file_mtimes": dict mapping resolved_path → mtime float at last read.
# Used by write_file and patch to detect when a file was
# modified externally between the agent's read and write.
_read_tracker_lock = threading.Lock()
_read_tracker: dict = {}
@@ -404,7 +401,7 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
try:
_mtime_now = os.path.getmtime(resolved_str)
task_data["dedup"][dedup_key] = _mtime_now
task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now
task_data.setdefault("file_mtimes", {})[resolved_str] = _mtime_now
except OSError:
pass # Can't stat — skip tracking for this entry
@@ -503,24 +500,6 @@ def notify_other_tool_call(task_id: str = "default"):
task_data["consecutive"] = 0
def _update_read_timestamp(filepath: str, task_id: str) -> None:
"""Record the file's current modification time after a successful write.
Called after write_file and patch so that consecutive edits by the
same task don't trigger false staleness warnings — each write
refreshes the stored timestamp to match the file's new state.
"""
try:
resolved = str(Path(filepath).expanduser().resolve())
current_mtime = os.path.getmtime(resolved)
except (OSError, ValueError):
return
with _read_tracker_lock:
task_data = _read_tracker.get(task_id)
if task_data is not None:
task_data.setdefault("read_timestamps", {})[resolved] = current_mtime
def _check_file_staleness(filepath: str, task_id: str) -> str | None:
"""Check whether a file was modified since the agent last read it.
@@ -536,7 +515,7 @@ def _check_file_staleness(filepath: str, task_id: str) -> str | None:
task_data = _read_tracker.get(task_id)
if not task_data:
return None
read_mtime = task_data.get("read_timestamps", {}).get(resolved)
read_mtime = task_data.get("file_mtimes", {}).get(resolved)
if read_mtime is None:
return None # File was never read — nothing to compare against
try:
@@ -564,9 +543,6 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning
# Refresh the stored timestamp so consecutive writes by this
# task don't trigger false staleness warnings.
_update_read_timestamp(path, task_id)
return json.dumps(result_dict, ensure_ascii=False)
except Exception as e:
if _is_expected_write_exception(e):
@@ -618,11 +594,6 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_dict = result.to_dict()
if stale_warnings:
result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings)
# Refresh stored timestamps for all successfully-patched paths so
# consecutive edits by this task don't trigger false warnings.
if not result_dict.get("error"):
for _p in _paths_to_check:
_update_read_timestamp(_p, task_id)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.
+16 -57
View File
@@ -82,8 +82,6 @@ SKILLS_DIR = HERMES_HOME / "skills"
MAX_NAME_LENGTH = 64
MAX_DESCRIPTION_LENGTH = 1024
MAX_SKILL_CONTENT_CHARS = 100_000 # ~36k tokens at 2.75 chars/token
MAX_SKILL_FILE_BYTES = 1_048_576 # 1 MiB per supporting file
# Characters allowed in skill names (filesystem-safe, URL-friendly)
VALID_NAME_RE = re.compile(r'^[a-z0-9][a-z0-9._-]*$')
@@ -179,21 +177,6 @@ def _validate_frontmatter(content: str) -> Optional[str]:
return None
def _validate_content_size(content: str, label: str = "SKILL.md") -> Optional[str]:
"""Check that content doesn't exceed the character limit for agent writes.
Returns an error message or None if within bounds.
"""
if len(content) > MAX_SKILL_CONTENT_CHARS:
return (
f"{label} content is {len(content):,} characters "
f"(limit: {MAX_SKILL_CONTENT_CHARS:,}). "
f"Consider splitting into a smaller SKILL.md with supporting files "
f"in references/ or templates/."
)
return None
def _resolve_skill_dir(name: str, category: str = None) -> Path:
"""Build the directory path for a new skill, optionally under a category."""
if category:
@@ -292,10 +275,6 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An
if err:
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
return {"success": False, "error": err}
# Check for name collisions across all directories
existing = _find_skill(name)
if existing:
@@ -339,10 +318,6 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]:
if err:
return {"success": False, "error": err}
err = _validate_content_size(content)
if err:
return {"success": False, "error": err}
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
@@ -404,29 +379,27 @@ def _patch_skill(
content = target.read_text(encoding="utf-8")
# Use the same fuzzy matching engine as the file patch tool.
# This handles whitespace normalization, indentation differences,
# escape sequences, and block-anchor matching — saving the agent
# from exact-match failures on minor formatting mismatches.
from tools.fuzzy_match import fuzzy_find_and_replace
new_content, match_count, match_error = fuzzy_find_and_replace(
content, old_string, new_string, replace_all
)
if match_error:
count = content.count(old_string)
if count == 0:
# Show a short preview of the file so the model can self-correct
preview = content[:500] + ("..." if len(content) > 500 else "")
return {
"success": False,
"error": match_error,
"error": "old_string not found in the file.",
"file_preview": preview,
}
# Check size limit on the result
target_label = "SKILL.md" if not file_path else file_path
err = _validate_content_size(new_content, label=target_label)
if err:
return {"success": False, "error": err}
if count > 1 and not replace_all:
return {
"success": False,
"error": (
f"old_string matched {count} times. Provide more surrounding context "
f"to make the match unique, or set replace_all=true to replace all occurrences."
),
"match_count": count,
}
new_content = content.replace(old_string, new_string) if replace_all else content.replace(old_string, new_string, 1)
# If patching SKILL.md, validate frontmatter is still intact
if not file_path:
@@ -446,9 +419,10 @@ def _patch_skill(
_atomic_write_text(target, original_content)
return {"success": False, "error": scan_error}
replacements = count if replace_all else 1
return {
"success": True,
"message": f"Patched {'SKILL.md' if not file_path else file_path} in skill '{name}' ({match_count} replacement{'s' if match_count > 1 else ''}).",
"message": f"Patched {'SKILL.md' if not file_path else file_path} in skill '{name}' ({replacements} replacement{'s' if replacements > 1 else ''}).",
}
@@ -481,21 +455,6 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]:
if not file_content and file_content != "":
return {"success": False, "error": "file_content is required."}
# Check size limits
content_bytes = len(file_content.encode("utf-8"))
if content_bytes > MAX_SKILL_FILE_BYTES:
return {
"success": False,
"error": (
f"File content is {content_bytes:,} bytes "
f"(limit: {MAX_SKILL_FILE_BYTES:,} bytes / 1 MiB). "
f"Consider splitting into smaller files."
),
}
err = _validate_content_size(file_content, label=file_path)
if err:
return {"success": False, "error": err}
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Create it first with action='create'."}
-16
View File
@@ -2525,22 +2525,6 @@ def install_from_quarantine(
if install_dir.exists():
shutil.rmtree(install_dir)
# Warn (but don't block) if SKILL.md is very large
skill_md = quarantine_path / "SKILL.md"
if skill_md.exists():
try:
skill_size = skill_md.stat().st_size
if skill_size > 100_000:
logger.warning(
"Skill '%s' has a large SKILL.md (%s chars). "
"Large skills consume significant context when loaded. "
"Consider asking the author to split it into smaller files.",
safe_skill_name,
f"{skill_size:,}",
)
except OSError:
pass
install_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(quarantine_path), str(install_dir))
@@ -85,7 +85,6 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
| `BROWSERBASE_PROJECT_ID` | Browserbase project ID |
| `BROWSER_USE_API_KEY` | Browser Use cloud browser API key ([browser-use.com](https://browser-use.com/)) |
| `BROWSER_CDP_URL` | Chrome DevTools Protocol URL for local browser (set via `/browser connect`, e.g. `ws://localhost:9222`) |
| `CAMOFOX_URL` | Camofox local anti-detection browser URL (default: `http://localhost:9377`) |
| `BROWSER_INACTIVITY_TIMEOUT` | Browser session inactivity timeout in seconds |
| `FAL_KEY` | Image generation ([fal.ai](https://fal.ai/)) |
| `GROQ_API_KEY` | Groq Whisper STT API key ([groq.com](https://groq.com/)) |
-2
View File
@@ -1016,8 +1016,6 @@ browser:
inactivity_timeout: 120 # Seconds before auto-closing idle sessions
command_timeout: 30 # Timeout in seconds for browser commands (screenshot, navigate, etc.)
record_sessions: false # Auto-record browser sessions as WebM videos to ~/.hermes/browser_recordings/
camofox:
managed_persistence: false # When true, Camofox sessions persist cookies/logins across restarts
```
The browser toolset supports multiple providers. See the [Browser feature page](/docs/user-guide/features/browser) for details on Browserbase, Browser Use, and local Chrome CDP setup.
@@ -11,7 +11,6 @@ Hermes Agent includes a full browser automation toolset with multiple backend op
- **Browserbase cloud mode** via [Browserbase](https://browserbase.com) for managed cloud browsers and anti-bot tooling
- **Browser Use cloud mode** via [Browser Use](https://browser-use.com) as an alternative cloud browser provider
- **Camofox local mode** via [Camofox](https://github.com/jo-inc/camofox-browser) for local anti-detection browsing (Firefox-based fingerprint spoofing)
- **Local Chrome via CDP** — connect browser tools to your own Chrome instance using `/browser connect`
- **Local browser mode** via the `agent-browser` CLI and a local Chromium installation
@@ -55,50 +54,6 @@ BROWSER_USE_API_KEY=***
Get your API key at [browser-use.com](https://browser-use.com). Browser Use provides a cloud browser via its REST API. If both Browserbase and Browser Use credentials are set, Browserbase takes priority.
### Camofox local mode
[Camofox](https://github.com/jo-inc/camofox-browser) is a self-hosted Node.js server wrapping Camoufox (a Firefox fork with C++ fingerprint spoofing). It provides local anti-detection browsing without cloud dependencies.
```bash
# Install and run
git clone https://github.com/jo-inc/camofox-browser && cd camofox-browser
npm install && npm start # downloads Camoufox (~300MB) on first run
# Or via Docker
docker run -d --network host -e CAMOFOX_PORT=9377 jo-inc/camofox-browser
```
Then set in `~/.hermes/.env`:
```bash
CAMOFOX_URL=http://localhost:9377
```
Or configure via `hermes tools` → Browser Automation → Camofox.
When `CAMOFOX_URL` is set, all browser tools automatically route through Camofox instead of Browserbase or agent-browser.
#### Persistent browser sessions
By default, each Camofox session gets a random identity — cookies and logins don't survive across agent restarts. To enable persistent browser sessions:
```yaml
# In ~/.hermes/config.yaml
browser:
camofox:
managed_persistence: true
```
When enabled, Hermes sends a stable profile-scoped identity to Camofox. The Camofox server maps this identity to a persistent browser profile directory, so cookies, logins, and localStorage survive across restarts. Different Hermes profiles get different browser profiles (profile isolation).
:::note
The Camofox server must also be configured with `CAMOFOX_PROFILE_DIR` on the server side for persistence to work.
:::
#### VNC live view
When Camofox runs in headed mode (with a visible browser window), it exposes a VNC port in its health check response. Hermes automatically discovers this and includes the VNC URL in navigation responses, so the agent can share a link for you to watch the browser live.
### Local Chrome via CDP (`/browser connect`)
Instead of a cloud provider, you can attach Hermes browser tools to your own running Chrome instance via the Chrome DevTools Protocol (CDP). This is useful when you want to see what the agent is doing in real-time, interact with pages that require your own cookies/sessions, or avoid cloud browser costs.
-8
View File
@@ -67,14 +67,6 @@
border-bottom: 1px solid rgba(255, 215, 0, 0.08);
}
/* backdrop-filter creates a stacking context that hides
.navbar-sidebar menu content (Docusaurus #6996). Remove it
while the mobile sidebar is open both classes live on the
same <nav> element. */
.navbar.navbar-sidebar--show {
backdrop-filter: none;
}
.navbar__title {
font-weight: 600;
letter-spacing: -0.02em;