Compare commits

...

34 Commits

Author SHA1 Message Date
kshitijk4poor fba9e02474 fix(gemini): honor explicit aux endpoints and harden SSE parsing
- preserve explicit Gemini base_url/api_key in auxiliary auto routing
- make native SSE parsing handle multiline data frames correctly
- add regression tests for auxiliary base_url precedence and SSE parsing
- document native Gemini defaults and explicit OpenAI-compatible overrides
2026-04-20 01:01:58 +05:30
kshitijk4poor 7aa1d37598 fix(gemini): tighten native routing and streaming replay
- only use the native adapter for the canonical Gemini native endpoint
- keep custom and /openai base URLs on the OpenAI-compatible path
- preserve Hermes keepalive transport injection for native Gemini clients
- stabilize streaming tool-call replay across repeated SSE events
- add follow-up tests for base_url precedence, async streaming, and duplicate tool-call chunks
2026-04-20 00:41:20 +05:30
kshitijk4poor cc6d295503 feat(providers): route gemini through the native AI Studio API
- add a native Gemini adapter over generateContent/streamGenerateContent
- switch the built-in gemini provider off the OpenAI-compatible endpoint
- preserve thought signatures and native functionResponse replay
- route auxiliary Gemini clients through the same adapter
- add focused unit coverage plus native-provider integration checks
2026-04-20 00:00:50 +05:30
Teknium 13294c2d18 feat(compression): summaries now respect the conversation's language
Context compaction summaries were always produced in English regardless
of the conversation language, which injected English context into
non-English conversations and muddied the continuation experience.

Adds a one-sentence instruction to the shared `_summarizer_preamble`
used by both the initial-compaction and iterative-update prompt paths.
Placing it in the preamble (rather than adding it separately to each
prompt) means both code paths stay in sync with one edit.

Ported from anomalyco/opencode#20581. The original PR (#4670) landed
before main's prompt templates were refactored to share the
`_summarizer_preamble` and `_template_sections` blocks, so the
cherry-pick conflicted on the now-obsolete inline sections; re-applied
the essential one-line change on top of the current structure.

Verified: 48/48 existing compressor tests pass.
2026-04-19 11:05:14 -07:00
kshitijk4poor 7bd1a3a4b1 test(compression): cover real init feasibility override 2026-04-19 10:40:26 -07:00
kshitijk4poor 045b28733e fix(compression): resolve missing config attribute in feasibility check
Commit 4a9c3565 added a reference to `self.config` in
`_check_compression_model_feasibility()` to pass the user-configured
`auxiliary.compression.context_length` to `get_model_context_length()`.
However, `AIAgent` never stores the loaded config dict as an instance
attribute — the config is loaded into a local variable `_agent_cfg` in
`__init__()` and discarded after init.

This causes an `AttributeError: 'AIAgent' object has no attribute
'config'` on every session start when compression is enabled, caught by
the try/except and logged as a non-fatal DEBUG message.

Fix: store the loaded config as `self._config` in `__init__()` and
update the reference in the feasibility check to use `self._config`.
2026-04-19 10:40:26 -07:00
brooklyn! 6af04474a3 Merge pull request #12560 from NousResearch/bb/tui-gateway-rpc-pool
fix(tui-gateway): dispatch slow RPC handlers on a thread pool (#12546)
2026-04-19 09:49:39 -05:00
Brooklyn Nicholson d32e8d2ace fix(tui): drain message queue on every busy → false transition
Previously the queue only drained inside the message.complete event
handler, so anything enqueued while a shell.exec (!sleep, !cmd) or a
failed agent turn was running would stay stuck forever — neither of
those paths emits message.complete. After Ctrl+C an interrupted
session would also orphan the queue because idle() flips busy=false
locally without going through message.complete.

Single source of truth: a useEffect that watches ui.busy. When the
session is settled (sid present, busy false, not editing a queue
item), pull one message and send it. Covers agent turn end,
interrupt, shell.exec completion, error recovery, and the original
startup hydration (first-sid case) all at once.

Dropped the now-redundant dequeue/sendQueued from
createGatewayEventHandler.message.complete and the accompanying
GatewayEventHandlerContext.composer field — the effect handles it.
2026-04-19 08:56:29 -05:00
Brooklyn Nicholson 393175e60c chore(tui-gateway): inline _run_and_emit — one-off wrapper, belongs inside dispatch 2026-04-19 07:58:33 -05:00
Brooklyn Nicholson 596280a40b chore(tui): /clean pass — inline one-off locals, tighten ConfirmPrompt
- providers.ts: drop the `dup` intermediate, fold the ternary inline
- paths.ts (fmtCwdBranch): inline `b` into the `tag` template
- prompts.tsx (ConfirmPrompt): hoist a single `lower = ch.toLowerCase()`,
  collapse the three early-return branches into two, drop the
  redundant bounds checks on arrow-key handlers (setSel is idempotent
  at 0/1), inline the `confirmLabel`/`cancelLabel` defaults at the
  use site
- modelPicker.tsx / config/env.ts / providers.test.ts: auto-formatter
  reflows picked up by `npm run fix`
- useInputHandlers.ts: drop the stray blank line that was tripping
  perfectionist/sort-imports (pre-existing lint error)
2026-04-19 07:55:38 -05:00
Brooklyn Nicholson ab6eaaff26 chore(tui-gateway): inline one-off RPC_POOL_WORKERS, compact _LONG_HANDLERS 2026-04-19 07:53:01 -05:00
Brooklyn Nicholson a6fe5d0872 fix(tui-gateway): dispatch slow RPC handlers on a thread pool (#12546)
The stdin-read loop in entry.py calls handle_request() inline, so the
five handlers that can block for seconds to minutes
(slash.exec, cli.exec, shell.exec, session.resume, session.branch)
freeze the dispatcher. While one is running, any inbound RPC —
notably approval.respond and session.interrupt — sits unread in the
pipe buffer and lands only after the slow handler returns.

Route only those five onto a small ThreadPoolExecutor; every other
handler stays on the main thread so the fast-path ordering is
unchanged and the audit surface stays small. write_json is already
_stdout_lock-guarded, so concurrent response writes are safe. Pool
size defaults to 4 (overridable via HERMES_TUI_RPC_POOL_WORKERS).

- add _LONG_HANDLERS set + ThreadPoolExecutor + atexit shutdown
- new dispatch(req) function: pool for long handlers, inline for rest
- _run_and_emit wraps pool work in a try/except so a misbehaving
  handler still surfaces as a JSON-RPC error instead of silently
  dying in a worker
- entry.py swaps handle_request → dispatch
- 5 new tests: sync path still inline, long handlers emit via stdout,
  fast handler not blocked behind slow one, handler exceptions map to
  error responses, non-long methods always take the sync path

Manual repro confirms the fix: shell.exec(sleep 3) + terminal.resize
sent back-to-back now returns the resize response at t=0s while the
sleep finishes independently at t=3s. Before, both landed together
at t=3s.

Fixes #12546.
2026-04-19 07:47:15 -05:00
Teknium a521005fe5 fix(discord): close two low-severity adapter races (#12558)
Two small races in gateway/platforms/discord.py, bundled together
since they're adjacent in the adapter and both narrow in impact.

1. on_message vs _resolve_allowed_usernames (startup window)
   DISCORD_ALLOWED_USERS accepts both numeric IDs and raw usernames.
   At connect-time, _resolve_allowed_usernames walks the bot's guilds
   (fetch_members can take multiple seconds) to swap usernames for IDs.
   on_message can fire during that window; _is_allowed_user compares
   the numeric author.id against a set that may still contain raw
   usernames — legitimate users get silently rejected for a few
   seconds after every reconnect.

   Fix: on_message awaits _ready_event (with a 30s timeout) when it
   isn't already set.  on_ready sets the event after the resolve
   completes.  In steady state this is a no-op (event already set);
   only the startup / reconnect window ever blocks.

2. join_voice_channel check-and-connect
   The existing-connection check at _voice_clients.get() and the
   channel.connect() call straddled an await boundary with no lock.
   Two concurrent /voice channel invocations could both see None and
   both call connect(); discord.py raises ClientException
   ("Already connected") on the loser.  Same race class for leave
   running concurrently with _voice_timeout_handler.

   Fix: per-guild asyncio.Lock (_voice_locks dict with lazy alloc via
   _voice_lock_for).  join_voice_channel and leave_voice_channel both
   run their body under the lock.  Sequential within a guild, still
   fully concurrent across guilds.

Both: LOW severity.  The first only affects username-based allowlists
on fast-follow-up messages at startup; the second is a narrow
exception on simultaneous voice commands.  Bundled so the adapter
gets a single coherent polish pass.

Tests (tests/gateway/test_discord_race_polish.py): 2 regression cases.
- test_concurrent_joins_do_not_double_connect: two concurrent
  join_voice_channel calls on the same guild result in exactly one
  channel.connect() invocation.
- test_on_message_blocks_until_ready_event_set: asserts the expected
  wait pattern is present in on_message (source inspection, since
  full discord.py client setup isn't practical here).

Regression-guard validated: against unpatched gateway/platforms/discord.py
both tests fail.  With the fix they pass.  Full Discord suite (118
tests) green.
2026-04-19 05:45:59 -07:00
Teknium c567adb58a fix(tui): session.create build thread must clean up if session.close races (#12555)
When a user hits /new or /resume before the previous session finishes
initializing, session.close runs while the previous session.create's
_build thread is still constructing the agent.  session.close pops
_sessions[sid] and closes whatever slash_worker it finds (None at that
point — _build hasn't installed it yet), then returns.  _build keeps
running in the background, installs the slash_worker subprocess and
registers an approval-notify callback on a session dict that's now
unreachable via _sessions.  The subprocess leaks until process exit;
the notify callback lingers in the global registry.

Fix: _build now tracks what it allocates (worker, notify_registered)
and checks in its finally block whether _sessions[sid] still points
to the session it's building for.  If not, the build was orphaned by
a racing close, so clean up the subprocess and unregister the notify
ourselves.

tui_gateway/server.py:
- _build reads _sessions.get(sid) safely (returns early if already gone)
- tracks allocated worker + notify registration
- finally checks orphan status and cleans up

Tests (tests/test_tui_gateway_server.py): 2 new cases.
- test_session_create_close_race_does_not_orphan_worker: slow
  _make_agent, close mid-build, verify worker.close() and
  unregister_gateway_notify both fire from the build thread's
  cleanup path.
- test_session_create_no_race_keeps_worker_alive: regression guard —
  happy path does NOT over-eagerly clean up a live worker.

Validated: against the unpatched code, the race test fails with
'orphan worker was not cleaned up — closed_workers=[]'.  Live E2E
against the live Python environment confirmed the cleanup fires
exactly when the race happens.
2026-04-19 05:35:45 -07:00
Teknium 37524a574e docs: add PR review guides, rework quickstart, slim down installation
Adds two complementary GitHub PR review guides from contest submissions:
- Cron-based PR review agent (from PR #5836 by @dieutx) — polls on a
  schedule, no server needed, teaches skills + memory authoring
- Webhook-based PR review (from PR #6503 by @gaijinkush) — real-time via
  GitHub webhooks, documents previously undocumented webhook feature
Both guides are cross-linked so users can pick the approach that fits.

Reworks quickstart.md by integrating the best content from PR #5744
by @aidil2105:
- Opinionated decision table ('The fastest path')
- Common failure modes table with causes and fixes
- Recovery toolkit sequence
- Session lifecycle verification step
- Better first-chat guidance with example prompts

Slims down installation.md:
- Removes 10-step manual/dev install section (already covered in
  developer-guide/contributing.md)
- Links to Contributing guide for dev setup
- Keeps focused on the automated installer + prerequisites + troubleshooting
2026-04-19 05:30:50 -07:00
Teknium d5fc8a5e00 fix(tui): reject /model and agent-mutating slash passthroughs while running (#12548)
agent.switch_model() mutates self.model, self.provider, self.base_url,
self.api_key, self.api_mode, and rebuilds self.client / self._anthropic_client
in place.  The worker thread running agent.run_conversation reads those
fields on every iteration.  A concurrent config.set key=model or slash-
worker-mirrored /model / /personality / /prompt / /compress can send an
HTTP request with mismatched model + base_url (or the old client keeps
running against a new endpoint) — 400/404s the user never asked for.

Fix: same pattern as the session.undo / session.compress guards
(PR #12416) and the gateway runner's running-agent /model guard (PR
#12334).  Reject with 4009 'session busy' when session.running is True.

Two call sites guarded:
- config.set with key=model: primary /model entry point from Ink
- _mirror_slash_side_effects for model / personality / prompt /
  compress: slash-worker passthrough path that applies live-agent
  side effects

Idle sessions still switch models normally — regression guard test
verifies this.

Tests (tests/test_tui_gateway_server.py): 4 new cases.
- test_config_set_model_rejects_while_running
- test_config_set_model_allowed_when_idle (regression guard)
- test_mirror_slash_side_effects_rejects_mutating_commands_while_running
- test_mirror_slash_side_effects_allowed_when_idle (regression guard)

Validated: against unpatched server.py, the two 'rejects_while_running'
tests fail with the exact race they assert against.  With the fix all
4 pass.  Live E2E against the live Python environment confirmed both
guards enforce 4009 / 'session busy' exactly as designed.
2026-04-19 05:19:57 -07:00
Teknium a3b76ae36d chore(attribution): add AUTHOR_MAP entry for Mibayy
Adds the Mibayy noreply email to the AUTHOR_MAP so CI attribution checks
pass for the #3884 maps skill feat commit (7fa01faf).
2026-04-19 05:19:51 -07:00
Teknium ea0bd81b84 feat(skills): consolidate find-nearby into maps as a single location skill
find-nearby and the (new) maps optional skill both used OpenStreetMap's
Overpass + Nominatim to answer the same question — 'what's near this
location?' — so shipping both would be duplicate code for overlapping
capability. Consolidate into one active-by-default skill at
skills/productivity/maps/ that is a strict superset of find-nearby.

Moves + deletions:
- optional-skills/productivity/maps/ → skills/productivity/maps/ (active,
  no install step needed)
- skills/leisure/find-nearby/ → DELETED (fully superseded)

Upgrades to maps_client.py so it covers everything find-nearby did:
- Overpass server failover — tries overpass-api.de then
  overpass.kumi.systems so a single-mirror outage doesn't break the skill
  (new overpass_query helper, used by both nearby and bbox)
- nearby now accepts --near "<address>" as a shortcut that auto-geocodes,
  so one command replaces the old 'search → copy coords → nearby' chain
- nearby now accepts --category (repeatable) for multi-type queries in
  one call (e.g. --category restaurant --category bar), results merged
  and deduped by (osm_type, osm_id), sorted by distance, capped at --limit
- Each nearby result now includes maps_url (clickable Google Maps search
  link) and directions_url (Google Maps directions from the search point
  — only when a ref point is known)
- Promoted commonly-useful OSM tags to top-level fields on each result:
  cuisine, hours (opening_hours), phone, website — instead of forcing
  callers to dig into the raw tags dict

SKILL.md:
- Version bumped 1.1.0 → 1.2.0, description rewritten to lead with
  capability surface
- New 'Working With Telegram Location Pins' section replacing
  find-nearby's equivalent workflow
- metadata.hermes.supersedes: [find-nearby] so tooling can flag any
  lingering references to the old skill

External references updated:
- optional-skills/productivity/telephony/SKILL.md — related_skills
  find-nearby → maps
- website/docs/reference/skills-catalog.md — removed the (now-empty)
  'leisure' section, added 'maps' row under productivity
- website/docs/user-guide/features/cron.md — find-nearby example
  usages swapped to maps
- tests/tools/test_cronjob_tools.py, tests/hermes_cli/test_cron.py,
  tests/cron/test_scheduler.py — fixture string values swapped
- cli.py:5290 — /cron help-hint example swapped

Not touched:
- RELEASE_v0.2.0.md — historical record, left intact

E2E-verified live (Nominatim + Overpass, one query each):
- nearby --near "Times Square" --category restaurant --category bar → 3 results,
  sorted by distance, all with maps_url, directions_url, cuisine, phone, website
  where OSM had the tags

All 111 targeted tests pass across tests/cron/, tests/tools/, tests/hermes_cli/.
2026-04-19 05:19:22 -07:00
Teknium de491fdf0e chore: remove unit tests from maps skill
Skills are self-contained scripts — they don't need test suites in
the repo.
2026-04-19 05:19:22 -07:00
Mibayy 7fa01fafa5 feat: add maps skill (OpenStreetMap + Overpass + OSRM, no API key)
Adds a maps optional skill with 8 commands, 44 POI categories, and
zero external dependencies. Uses free open data: Nominatim, Overpass
API, OSRM, and TimeAPI.io.

Commands: search, reverse, nearby, distance, directions, timezone,
area, bbox.

Improvements over original PR #2015:
- Fixed directory structure (optional-skills/productivity/maps/)
- Fixed distance argparse (--to flag instead of broken dual nargs=+)
- Fixed timezone (TimeAPI.io instead of broken worldtimeapi heuristic)
- Expanded POI categories from 12 to 44
- Added directions command with turn-by-turn OSRM steps
- Added area command (bounding box + dimensions for a named place)
- Added bbox command (POI search within a geographic rectangle)
- Added 23 unit tests
- Improved haversine (atan2 for numerical stability)
- Comprehensive SKILL.md with workflow examples

Co-authored-by: Mibayy <Mibayy@users.noreply.github.com>
2026-04-19 05:19:22 -07:00
Teknium 206a449b29 feat(webhook): direct delivery mode for zero-LLM push notifications (#12473)
External services can now push plain-text notifications to a user's chat
via the webhook adapter without invoking the agent. Set deliver_only=true
on a route and the rendered prompt template becomes the literal message
body — dispatched directly to the configured target (Telegram, Discord,
Slack, GitHub PR comment, etc.).

Reuses all existing webhook infrastructure: HMAC-SHA256 signature
validation, per-route rate limiting, idempotency cache, body-size limits,
template rendering with dot-notation, home-channel fallback. No new HTTP
server, no new auth scheme, no new port.

Use cases: Supabase/Firebase webhooks → user notifications, monitoring
alert forwarding, inter-agent pings, background job completion alerts.

Changes:
- gateway/platforms/webhook.py: new _direct_deliver() helper + early
  dispatch branch in _handle_webhook when deliver_only=true. Startup
  validation rejects deliver_only with deliver=log.
- hermes_cli/main.py + hermes_cli/webhook.go: --deliver-only flag on
  subscribe; list/show output marks direct-delivery routes.
- website/docs/user-guide/messaging/webhooks.md: new Direct Delivery
  Mode section with config example, CLI example, response codes.
- skills/devops/webhook-subscriptions/SKILL.md: document --deliver-only
  with use cases (bumped to v1.1.0).
- tests/gateway/test_webhook_deliver_only.py: 14 new tests covering
  agent bypass, template rendering, status codes, HMAC still enforced,
  idempotency still applies, rate limit still applies, startup
  validation, and direct-deliver dispatch.

Validation: 78 webhook tests pass (64 existing + 14 new). E2E verified
with real aiohttp server + real urllib POST — agent not invoked, target
adapter.send() called with rendered template, duplicate delivery_id
suppressed.

Closes the gap identified in PR #12117 (thanks to @H1an1 / Antenna team)
without adding a second HTTP ingress server.
2026-04-19 05:18:19 -07:00
Teknium 66ee081dc1 skills: move 7 niche mlops/mcp skills to optional (#12474)
Built-in → optional-skills/:
  mlops/training/peft         → optional-skills/mlops/peft
  mlops/training/pytorch-fsdp → optional-skills/mlops/pytorch-fsdp
  mlops/models/clip           → optional-skills/mlops/clip
  mlops/models/stable-diffusion → optional-skills/mlops/stable-diffusion
  mlops/models/whisper        → optional-skills/mlops/whisper
  mlops/cloud/modal           → optional-skills/mlops/modal
  mcp/mcporter                → optional-skills/mcp/mcporter

Built-in mlops training kept: axolotl, trl-fine-tuning, unsloth.
Built-in mlops models kept: audiocraft, segment-anything.
Built-in mlops evaluation/research/huggingface-hub/inference all kept.
native-mcp stays built-in (documents the native MCP tool); mcporter was a
redundant alternative CLI.

Also: removed now-empty skills/mlops/cloud/ dir, refreshed
skills/mlops/models/DESCRIPTION.md and skills/mcp/DESCRIPTION.md to match
what's left, and synchronized both catalog pages (skills-catalog.md,
optional-skills-catalog.md).
2026-04-19 05:14:17 -07:00
kshitijk4poor 957ca79e8e fix(feishu): drop dead helper and cover repeated fenced blocks 2026-04-19 03:30:36 -07:00
kshitijk4poor a9debf10ff fix(feishu): harden fenced post row splitting 2026-04-19 03:30:36 -07:00
sgaofen cc59d133dc fix(feishu): split fenced code blocks in post payload 2026-04-19 03:30:36 -07:00
kshitijk4poor 4f0e49dc7b chore: add sgaofen to AUTHOR_MAP 2026-04-19 03:30:03 -07:00
kshitijk4poor 4b6ff0eb7f fix: tighten gateway interrupt salvage follow-ups
Follow-up on top of the helix4u #12388 cherry-picks:
- make deferred post-delivery callbacks generation-aware end-to-end so
  stale runs cannot clear callbacks registered by a fresher run for the
  same session
- bind callback ownership to the active session event at run start and
  snapshot that generation inside base adapter processing so later event
  mutation cannot retarget cleanup
- pass run_generation through proxy mode and drop stale proxy streams /
  final results the same way local runs are dropped
- centralize stop/new interrupt cleanup into one helper and replace the
  open-coded branches with shared logic
- unify internal control interrupt reason strings via shared constants
- remove the return from base.py's finally block so cleanup no longer
  swallows cancellation/exception flow
- add focused regressions for generation forwarding, proxy stale
  suppression, and newer-callback preservation

This addresses all review findings from the initial #12388 review while
keeping the fix scoped to stale-output/typing-loop interrupt handling.
2026-04-19 03:03:57 -07:00
helix4u 8466268ca5 fix(gateway): keep typing loop overrides backward-compatible 2026-04-19 03:03:57 -07:00
helix4u 150382e8b7 fix(gateway): stop typing loops on session interrupt 2026-04-19 03:03:57 -07:00
helix4u b05d30418d docs: clarify profiles vs workspaces 2026-04-19 02:00:46 -07:00
kshitijk4poor ff63e2e005 fix: tighten telegram docker-media salvage follow-ups
Follow-up on top of the helix4u #6392 cherry-pick:
- reuse one helper for actionable Docker-local file-not-found errors
  across document/image/video/audio local-media send paths
- include /outputs/... alongside /output/... in the container-local
  path hint
- soften the gateway startup warning so it does not imply custom
  host-visible mounts are broken; the warning now targets the specific
  risky pattern of emitting container-local MEDIA paths without an
  explicit export mount
- add focused regressions for /outputs/... and non-document media hint
  coverage

This keeps the salvage aligned with the actual MEDIA delivery problem on
current main while reducing false-positive operator messaging.
2026-04-19 01:55:33 -07:00
helix4u 588333908c fix(telegram): warn on docker-only media paths 2026-04-19 01:55:33 -07:00
Tranquil-Flow b668c09ab2 fix(gateway): strip cursor from frozen message on empty fallback continuation (#7183)
When _send_fallback_final() is called with nothing new to deliver
(the visible partial already matches final_text), the last edit may
still show the cursor character because fallback mode was entered
after a failed edit.  Before this fix the early-return path left
_already_sent = True without attempting to strip the cursor, so the
message stayed frozen with a visible ▉ permanently.

Adds a best-effort edit inside the empty-continuation branch to clean
the cursor off the last-sent text.  Harmless when fallback mode
wasn't actually armed or when the cursor isn't present.  If the strip
edit itself fails (flood still active), we return without crashing
and without corrupting _last_sent_text.

Adapted from PR #7429 onto current main — the surrounding fallback
block grew the #10807 stale-prefix handling since #7429 was written,
so the cursor strip lives in the new else-branch where we still
return early.

3 unit tests covering: cursor stripped on empty continuation, no edit
attempted when cursor is not configured, cursor-strip edit failure
handled without crash.

Originally proposed as PR #7429.
2026-04-19 01:51:12 -07:00
Teknium 62ce6a38ae fix(gateway): cancel_background_tasks must drain late-arrivals (#12471)
During gateway shutdown, a message arriving while
cancel_background_tasks is mid-await (inside asyncio.gather) spawns
a fresh _process_message_background task via handle_message and adds
it to self._background_tasks.  The original implementation's
_background_tasks.clear() at the end of cancel_background_tasks
dropped the reference; the task ran untracked against a disconnecting
adapter, logged send-failures, and lingered until it completed on
its own.

Fix: wrap the cancel+gather in a bounded loop (MAX_DRAIN_ROUNDS=5).
If new tasks appeared during the gather, cancel them in the next
round.  The .clear() at the end is preserved as a safety net for
any task that appeared after MAX_DRAIN_ROUNDS — but in practice the
drain stabilizes in 1-2 rounds.

Tests: tests/gateway/test_cancel_background_drain.py — 3 cases.
- test_cancel_background_tasks_drains_late_arrivals: spawn M1, start
  cancel, inject M2 during M1's shielded cleanup, verify M2 is
  cancelled.
- test_cancel_background_tasks_handles_no_tasks: no-op path still
  terminates cleanly.
- test_cancel_background_tasks_bounded_rounds: baseline — single
  task cancels in one round, loop terminates.

Regression-guard validated: against the unpatched implementation,
the late-arrival test fails with exactly the expected message
('task leaked').  With the fix it passes.

Blast radius is shutdown-only; the audit classified this as MED.
Shipping because the fix is small and the hygiene is worth it.

While investigating the audit's other MEDs (busy-handler double-ack,
Discord ExecApprovalView double-resolve, UpdatePromptView
double-resolve), I verified all three were false positives — the
check-and-set patterns have no await between them, so they're
atomic on single-threaded asyncio.  No fix needed for those.
2026-04-19 01:48:42 -07:00
94 changed files with 6517 additions and 874 deletions
+36 -4
View File
@@ -775,6 +775,11 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
if model is None:
continue # skip provider if we don't know a valid aux model
logger.debug("Auxiliary text client: %s (%s) via pool", pconfig.name, model)
if provider_id == "gemini":
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
if is_native_gemini_base_url(base_url):
return GeminiNativeClient(api_key=api_key, base_url=base_url), model
extra = {}
if "api.kimi.com" in base_url.lower():
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
@@ -796,6 +801,11 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
if model is None:
continue # skip provider if we don't know a valid aux model
logger.debug("Auxiliary text client: %s (%s)", pconfig.name, model)
if provider_id == "gemini":
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
if is_native_gemini_base_url(base_url):
return GeminiNativeClient(api_key=api_key, base_url=base_url), model
extra = {}
if "api.kimi.com" in base_url.lower():
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
@@ -1298,6 +1308,13 @@ def _resolve_auto(main_runtime: Optional[Dict[str, Any]] = None) -> Tuple[Option
resolved_provider = "custom"
explicit_base_url = runtime_base_url
explicit_api_key = runtime_api_key or None
elif runtime_base_url and main_provider == "gemini":
# Preserve explicit Gemini endpoint selection from the live runtime.
# If the main agent is intentionally pinned to Google's OpenAI-
# compatible route (or another explicit Gemini endpoint), auxiliary
# tasks must honor that instead of silently switching transports.
explicit_base_url = runtime_base_url
explicit_api_key = runtime_api_key or None
client, resolved = resolve_provider_client(
resolved_provider,
main_model,
@@ -1348,6 +1365,13 @@ def _to_async_client(sync_client, model: str):
return AsyncCodexAuxiliaryClient(sync_client), model
if isinstance(sync_client, AnthropicAuxiliaryClient):
return AsyncAnthropicAuxiliaryClient(sync_client), model
try:
from agent.gemini_native_adapter import GeminiNativeClient, AsyncGeminiNativeClient
if isinstance(sync_client, GeminiNativeClient):
return AsyncGeminiNativeClient(sync_client), model
except ImportError:
pass
try:
from agent.copilot_acp_client import CopilotACPClient
if isinstance(sync_client, CopilotACPClient):
@@ -1623,7 +1647,7 @@ def resolve_provider_client(
return (_to_async_client(client, final_model) if async_mode else (client, final_model))
creds = resolve_api_key_provider_credentials(provider)
api_key = str(creds.get("api_key", "")).strip()
api_key = str((explicit_api_key or "")).strip() or str(creds.get("api_key", "")).strip()
if not api_key:
tried_sources = list(pconfig.api_key_env_vars)
if provider == "copilot":
@@ -1633,13 +1657,21 @@ def resolve_provider_client(
provider, ", ".join(tried_sources))
return None, None
base_url = _to_openai_base_url(
str(creds.get("base_url", "")).strip().rstrip("/") or pconfig.inference_base_url
)
configured_base = str(creds.get("base_url", "")).strip().rstrip("/") or pconfig.inference_base_url
base_url = _to_openai_base_url((explicit_base_url or configured_base).strip().rstrip("/"))
default_model = _API_KEY_PROVIDER_AUX_MODELS.get(provider, "")
final_model = _normalize_resolved_model(model or default_model, provider)
if provider == "gemini":
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
if is_native_gemini_base_url(base_url):
client = GeminiNativeClient(api_key=api_key, base_url=base_url)
logger.debug("resolve_provider_client: %s (%s)", provider, final_model)
return (_to_async_client(client, final_model) if async_mode
else (client, final_model))
# Provider-specific headers
headers = {}
if "api.kimi.com" in base_url.lower():
+3 -1
View File
@@ -633,7 +633,9 @@ class ContextCompressor(ContextEngine):
"assistant that continues the conversation. "
"Do NOT respond to any questions or requests in the conversation — "
"only output the structured summary. "
"Do NOT include any preamble, greeting, or prefix."
"Do NOT include any preamble, greeting, or prefix. "
"Write the summary in the same language the user was using in the "
"conversation — do not translate or switch to English."
)
# Shared structured template (used by both paths).
+867
View File
@@ -0,0 +1,867 @@
"""OpenAI-compatible facade over Google AI Studio's native Gemini API.
Hermes keeps ``api_mode='chat_completions'`` for the ``gemini`` provider so the
main agent loop can keep using its existing OpenAI-shaped message flow.
This adapter is the transport shim that converts those OpenAI-style
``messages[]`` / ``tools[]`` requests into Gemini's native
``models/{model}:generateContent`` schema and converts the responses back.
Why this exists
---------------
Google's OpenAI-compatible endpoint has been brittle for Hermes's multi-turn
agent/tool loop (auth churn, tool-call replay quirks, thought-signature
requirements). The native Gemini API is the canonical path and avoids the
OpenAI-compat layer entirely.
"""
from __future__ import annotations
import asyncio
import base64
import json
import logging
import time
import uuid
from types import SimpleNamespace
from typing import Any, Dict, Iterator, List, Optional
import httpx
logger = logging.getLogger(__name__)
DEFAULT_GEMINI_BASE_URL = "https://generativelanguage.googleapis.com/v1beta"
def is_native_gemini_base_url(base_url: str) -> bool:
"""Return True when the endpoint speaks Gemini's native REST API."""
normalized = str(base_url or "").strip().rstrip("/").lower()
if not normalized:
return False
if "generativelanguage.googleapis.com" not in normalized:
return False
return not normalized.endswith("/openai")
class GeminiAPIError(Exception):
"""Error shape compatible with Hermes retry/error classification."""
def __init__(
self,
message: str,
*,
code: str = "gemini_api_error",
status_code: Optional[int] = None,
response: Optional[httpx.Response] = None,
retry_after: Optional[float] = None,
details: Optional[Dict[str, Any]] = None,
) -> None:
super().__init__(message)
self.code = code
self.status_code = status_code
self.response = response
self.retry_after = retry_after
self.details = details or {}
def _coerce_content_to_text(content: Any) -> str:
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
pieces: List[str] = []
for part in content:
if isinstance(part, str):
pieces.append(part)
elif isinstance(part, dict) and part.get("type") == "text":
text = part.get("text")
if isinstance(text, str):
pieces.append(text)
return "\n".join(pieces)
return str(content)
def _extract_multimodal_parts(content: Any) -> List[Dict[str, Any]]:
if not isinstance(content, list):
text = _coerce_content_to_text(content)
return [{"text": text}] if text else []
parts: List[Dict[str, Any]] = []
for item in content:
if isinstance(item, str):
parts.append({"text": item})
continue
if not isinstance(item, dict):
continue
ptype = item.get("type")
if ptype == "text":
text = item.get("text")
if isinstance(text, str) and text:
parts.append({"text": text})
elif ptype == "image_url":
url = ((item.get("image_url") or {}).get("url") or "")
if not isinstance(url, str) or not url.startswith("data:"):
continue
try:
header, encoded = url.split(",", 1)
mime = header.split(":", 1)[1].split(";", 1)[0]
raw = base64.b64decode(encoded)
except Exception:
continue
parts.append(
{
"inlineData": {
"mimeType": mime,
"data": base64.b64encode(raw).decode("ascii"),
}
}
)
return parts
def _tool_call_extra_signature(tool_call: Dict[str, Any]) -> Optional[str]:
extra = tool_call.get("extra_content") or {}
if not isinstance(extra, dict):
return None
google = extra.get("google") or extra.get("thought_signature")
if isinstance(google, dict):
sig = google.get("thought_signature") or google.get("thoughtSignature")
return str(sig) if isinstance(sig, str) and sig else None
if isinstance(google, str) and google:
return google
return None
def _translate_tool_call_to_gemini(tool_call: Dict[str, Any]) -> Dict[str, Any]:
fn = tool_call.get("function") or {}
args_raw = fn.get("arguments", "")
try:
args = json.loads(args_raw) if isinstance(args_raw, str) and args_raw else {}
except json.JSONDecodeError:
args = {"_raw": args_raw}
if not isinstance(args, dict):
args = {"_value": args}
part: Dict[str, Any] = {
"functionCall": {
"name": str(fn.get("name") or ""),
"args": args,
}
}
thought_signature = _tool_call_extra_signature(tool_call)
if thought_signature:
part["thoughtSignature"] = thought_signature
return part
def _translate_tool_result_to_gemini(
message: Dict[str, Any],
tool_name_by_call_id: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
tool_name_by_call_id = tool_name_by_call_id or {}
tool_call_id = str(message.get("tool_call_id") or "")
name = str(
message.get("name")
or tool_name_by_call_id.get(tool_call_id)
or tool_call_id
or "tool"
)
content = _coerce_content_to_text(message.get("content"))
try:
parsed = json.loads(content) if content.strip().startswith(("{", "[")) else None
except json.JSONDecodeError:
parsed = None
response = parsed if isinstance(parsed, dict) else {"output": content}
return {
"functionResponse": {
"name": name,
"response": response,
}
}
def _build_gemini_contents(messages: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], Optional[Dict[str, Any]]]:
system_text_parts: List[str] = []
contents: List[Dict[str, Any]] = []
tool_name_by_call_id: Dict[str, str] = {}
for msg in messages:
if not isinstance(msg, dict):
continue
role = str(msg.get("role") or "user")
if role == "system":
system_text_parts.append(_coerce_content_to_text(msg.get("content")))
continue
if role in {"tool", "function"}:
contents.append(
{
"role": "user",
"parts": [
_translate_tool_result_to_gemini(
msg,
tool_name_by_call_id=tool_name_by_call_id,
)
],
}
)
continue
gemini_role = "model" if role == "assistant" else "user"
parts: List[Dict[str, Any]] = []
content_parts = _extract_multimodal_parts(msg.get("content"))
parts.extend(content_parts)
tool_calls = msg.get("tool_calls") or []
if isinstance(tool_calls, list):
for tool_call in tool_calls:
if isinstance(tool_call, dict):
tool_call_id = str(tool_call.get("id") or tool_call.get("call_id") or "")
tool_name = str(((tool_call.get("function") or {}).get("name") or ""))
if tool_call_id and tool_name:
tool_name_by_call_id[tool_call_id] = tool_name
parts.append(_translate_tool_call_to_gemini(tool_call))
if parts:
contents.append({"role": gemini_role, "parts": parts})
system_instruction = None
joined_system = "\n".join(part for part in system_text_parts if part).strip()
if joined_system:
system_instruction = {"parts": [{"text": joined_system}]}
return contents, system_instruction
def _translate_tools_to_gemini(tools: Any) -> List[Dict[str, Any]]:
if not isinstance(tools, list):
return []
declarations: List[Dict[str, Any]] = []
for tool in tools:
if not isinstance(tool, dict):
continue
fn = tool.get("function") or {}
if not isinstance(fn, dict):
continue
name = fn.get("name")
if not isinstance(name, str) or not name:
continue
decl: Dict[str, Any] = {"name": name}
description = fn.get("description")
if isinstance(description, str) and description:
decl["description"] = description
parameters = fn.get("parameters")
if isinstance(parameters, dict):
decl["parameters"] = parameters
declarations.append(decl)
return [{"functionDeclarations": declarations}] if declarations else []
def _translate_tool_choice_to_gemini(tool_choice: Any) -> Optional[Dict[str, Any]]:
if tool_choice is None:
return None
if isinstance(tool_choice, str):
if tool_choice == "auto":
return {"functionCallingConfig": {"mode": "AUTO"}}
if tool_choice == "required":
return {"functionCallingConfig": {"mode": "ANY"}}
if tool_choice == "none":
return {"functionCallingConfig": {"mode": "NONE"}}
if isinstance(tool_choice, dict):
fn = tool_choice.get("function") or {}
name = fn.get("name")
if isinstance(name, str) and name:
return {"functionCallingConfig": {"mode": "ANY", "allowedFunctionNames": [name]}}
return None
def _normalize_thinking_config(config: Any) -> Optional[Dict[str, Any]]:
if not isinstance(config, dict) or not config:
return None
budget = config.get("thinkingBudget", config.get("thinking_budget"))
include = config.get("includeThoughts", config.get("include_thoughts"))
level = config.get("thinkingLevel", config.get("thinking_level"))
normalized: Dict[str, Any] = {}
if isinstance(budget, (int, float)):
normalized["thinkingBudget"] = int(budget)
if isinstance(include, bool):
normalized["includeThoughts"] = include
if isinstance(level, str) and level.strip():
normalized["thinkingLevel"] = level.strip().lower()
return normalized or None
def build_gemini_request(
*,
messages: List[Dict[str, Any]],
tools: Any = None,
tool_choice: Any = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
stop: Any = None,
thinking_config: Any = None,
) -> Dict[str, Any]:
contents, system_instruction = _build_gemini_contents(messages)
request: Dict[str, Any] = {"contents": contents}
if system_instruction:
request["systemInstruction"] = system_instruction
gemini_tools = _translate_tools_to_gemini(tools)
if gemini_tools:
request["tools"] = gemini_tools
tool_config = _translate_tool_choice_to_gemini(tool_choice)
if tool_config:
request["toolConfig"] = tool_config
generation_config: Dict[str, Any] = {}
if temperature is not None:
generation_config["temperature"] = temperature
if max_tokens is not None:
generation_config["maxOutputTokens"] = max_tokens
if top_p is not None:
generation_config["topP"] = top_p
if stop:
generation_config["stopSequences"] = stop if isinstance(stop, list) else [str(stop)]
normalized_thinking = _normalize_thinking_config(thinking_config)
if normalized_thinking:
generation_config["thinkingConfig"] = normalized_thinking
if generation_config:
request["generationConfig"] = generation_config
return request
def _map_gemini_finish_reason(reason: str) -> str:
mapping = {
"STOP": "stop",
"MAX_TOKENS": "length",
"SAFETY": "content_filter",
"RECITATION": "content_filter",
"OTHER": "stop",
}
return mapping.get(str(reason or "").upper(), "stop")
def _tool_call_extra_from_part(part: Dict[str, Any]) -> Optional[Dict[str, Any]]:
sig = part.get("thoughtSignature")
if isinstance(sig, str) and sig:
return {"google": {"thought_signature": sig}}
return None
def _empty_response(model: str) -> SimpleNamespace:
message = SimpleNamespace(
role="assistant",
content="",
tool_calls=None,
reasoning=None,
reasoning_content=None,
reasoning_details=None,
)
choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
usage = SimpleNamespace(
prompt_tokens=0,
completion_tokens=0,
total_tokens=0,
prompt_tokens_details=SimpleNamespace(cached_tokens=0),
)
return SimpleNamespace(
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
object="chat.completion",
created=int(time.time()),
model=model,
choices=[choice],
usage=usage,
)
def translate_gemini_response(resp: Dict[str, Any], model: str) -> SimpleNamespace:
candidates = resp.get("candidates") or []
if not isinstance(candidates, list) or not candidates:
return _empty_response(model)
cand = candidates[0] if isinstance(candidates[0], dict) else {}
content_obj = cand.get("content") if isinstance(cand, dict) else {}
parts = content_obj.get("parts") if isinstance(content_obj, dict) else []
text_pieces: List[str] = []
reasoning_pieces: List[str] = []
tool_calls: List[SimpleNamespace] = []
for index, part in enumerate(parts or []):
if not isinstance(part, dict):
continue
if part.get("thought") is True and isinstance(part.get("text"), str):
reasoning_pieces.append(part["text"])
continue
if isinstance(part.get("text"), str):
text_pieces.append(part["text"])
continue
fc = part.get("functionCall")
if isinstance(fc, dict) and fc.get("name"):
try:
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False)
except (TypeError, ValueError):
args_str = "{}"
tool_call = SimpleNamespace(
id=f"call_{uuid.uuid4().hex[:12]}",
type="function",
index=index,
function=SimpleNamespace(name=str(fc["name"]), arguments=args_str),
)
extra_content = _tool_call_extra_from_part(part)
if extra_content:
tool_call.extra_content = extra_content
tool_calls.append(tool_call)
finish_reason = "tool_calls" if tool_calls else _map_gemini_finish_reason(str(cand.get("finishReason") or ""))
usage_meta = resp.get("usageMetadata") or {}
usage = SimpleNamespace(
prompt_tokens=int(usage_meta.get("promptTokenCount") or 0),
completion_tokens=int(usage_meta.get("candidatesTokenCount") or 0),
total_tokens=int(usage_meta.get("totalTokenCount") or 0),
prompt_tokens_details=SimpleNamespace(
cached_tokens=int(usage_meta.get("cachedContentTokenCount") or 0),
),
)
reasoning = "".join(reasoning_pieces) or None
message = SimpleNamespace(
role="assistant",
content="".join(text_pieces) if text_pieces else None,
tool_calls=tool_calls or None,
reasoning=reasoning,
reasoning_content=reasoning,
reasoning_details=None,
)
choice = SimpleNamespace(index=0, message=message, finish_reason=finish_reason)
return SimpleNamespace(
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
object="chat.completion",
created=int(time.time()),
model=model,
choices=[choice],
usage=usage,
)
class _GeminiStreamChunk(SimpleNamespace):
pass
def _make_stream_chunk(
*,
model: str,
content: str = "",
tool_call_delta: Optional[Dict[str, Any]] = None,
finish_reason: Optional[str] = None,
reasoning: str = "",
) -> _GeminiStreamChunk:
delta_kwargs: Dict[str, Any] = {
"role": "assistant",
"content": None,
"tool_calls": None,
"reasoning": None,
"reasoning_content": None,
}
if content:
delta_kwargs["content"] = content
if tool_call_delta is not None:
tool_delta = SimpleNamespace(
index=tool_call_delta.get("index", 0),
id=tool_call_delta.get("id") or f"call_{uuid.uuid4().hex[:12]}",
type="function",
function=SimpleNamespace(
name=tool_call_delta.get("name") or "",
arguments=tool_call_delta.get("arguments") or "",
),
)
extra_content = tool_call_delta.get("extra_content")
if isinstance(extra_content, dict):
tool_delta.extra_content = extra_content
delta_kwargs["tool_calls"] = [tool_delta]
if reasoning:
delta_kwargs["reasoning"] = reasoning
delta_kwargs["reasoning_content"] = reasoning
delta = SimpleNamespace(**delta_kwargs)
choice = SimpleNamespace(index=0, delta=delta, finish_reason=finish_reason)
return _GeminiStreamChunk(
id=f"chatcmpl-{uuid.uuid4().hex[:12]}",
object="chat.completion.chunk",
created=int(time.time()),
model=model,
choices=[choice],
usage=None,
)
def _iter_sse_events(response: httpx.Response) -> Iterator[Dict[str, Any]]:
buffer = ""
data_lines: List[str] = []
def _flush_event() -> Iterator[Dict[str, Any]]:
nonlocal data_lines
if not data_lines:
return iter(())
data = "\n".join(data_lines)
data_lines = []
if data == "[DONE]":
return iter(({"__done__": True},))
try:
payload = json.loads(data)
except json.JSONDecodeError:
logger.debug("Non-JSON Gemini SSE event: %s", data[:200])
return iter(())
if isinstance(payload, dict):
return iter((payload,))
return iter(())
for chunk in response.iter_text():
if not chunk:
continue
buffer += chunk
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.rstrip("\r")
if line == "":
for payload in _flush_event():
if payload.get("__done__"):
return
yield payload
continue
if line.startswith(":"):
continue
if line.startswith("data:"):
value = line[5:]
if value.startswith(" "):
value = value[1:]
data_lines.append(value)
for payload in _flush_event():
if payload.get("__done__"):
return
yield payload
def translate_stream_event(event: Dict[str, Any], model: str, tool_call_indices: Dict[str, Dict[str, Any]]) -> List[_GeminiStreamChunk]:
candidates = event.get("candidates") or []
if not candidates:
return []
cand = candidates[0] if isinstance(candidates[0], dict) else {}
parts = ((cand.get("content") or {}).get("parts") or []) if isinstance(cand, dict) else []
chunks: List[_GeminiStreamChunk] = []
for part_index, part in enumerate(parts):
if not isinstance(part, dict):
continue
if part.get("thought") is True and isinstance(part.get("text"), str):
chunks.append(_make_stream_chunk(model=model, reasoning=part["text"]))
continue
if isinstance(part.get("text"), str) and part["text"]:
chunks.append(_make_stream_chunk(model=model, content=part["text"]))
fc = part.get("functionCall")
if isinstance(fc, dict) and fc.get("name"):
name = str(fc["name"])
try:
args_str = json.dumps(fc.get("args") or {}, ensure_ascii=False, sort_keys=True)
except (TypeError, ValueError):
args_str = "{}"
thought_signature = part.get("thoughtSignature") if isinstance(part.get("thoughtSignature"), str) else ""
call_key = json.dumps(
{
"part_index": part_index,
"name": name,
"thought_signature": thought_signature,
},
sort_keys=True,
)
slot = tool_call_indices.get(call_key)
if slot is None:
slot = {
"index": len(tool_call_indices),
"id": f"call_{uuid.uuid4().hex[:12]}",
"last_arguments": "",
}
tool_call_indices[call_key] = slot
emitted_arguments = args_str
last_arguments = str(slot.get("last_arguments") or "")
if last_arguments:
if args_str == last_arguments:
emitted_arguments = ""
elif args_str.startswith(last_arguments):
emitted_arguments = args_str[len(last_arguments):]
slot["last_arguments"] = args_str
chunks.append(
_make_stream_chunk(
model=model,
tool_call_delta={
"index": slot["index"],
"id": slot["id"],
"name": name,
"arguments": emitted_arguments,
"extra_content": _tool_call_extra_from_part(part),
},
)
)
finish_reason_raw = str(cand.get("finishReason") or "")
if finish_reason_raw:
mapped = "tool_calls" if tool_call_indices else _map_gemini_finish_reason(finish_reason_raw)
chunks.append(_make_stream_chunk(model=model, finish_reason=mapped))
return chunks
def gemini_http_error(response: httpx.Response) -> GeminiAPIError:
status = response.status_code
body_text = ""
body_json: Dict[str, Any] = {}
try:
body_text = response.text
except Exception:
body_text = ""
if body_text:
try:
parsed = json.loads(body_text)
if isinstance(parsed, dict):
body_json = parsed
except (ValueError, TypeError):
body_json = {}
err_obj = body_json.get("error") if isinstance(body_json, dict) else None
if not isinstance(err_obj, dict):
err_obj = {}
err_status = str(err_obj.get("status") or "").strip()
err_message = str(err_obj.get("message") or "").strip()
details_list = err_obj.get("details") if isinstance(err_obj.get("details"), list) else []
reason = ""
retry_after: Optional[float] = None
metadata: Dict[str, Any] = {}
for detail in details_list:
if not isinstance(detail, dict):
continue
type_url = str(detail.get("@type") or "")
if not reason and type_url.endswith("/google.rpc.ErrorInfo"):
reason_value = detail.get("reason")
if isinstance(reason_value, str):
reason = reason_value
md = detail.get("metadata")
if isinstance(md, dict):
metadata = md
header_retry = response.headers.get("Retry-After") or response.headers.get("retry-after")
if header_retry:
try:
retry_after = float(header_retry)
except (TypeError, ValueError):
retry_after = None
code = f"gemini_http_{status}"
if status == 401:
code = "gemini_unauthorized"
elif status == 429:
code = "gemini_rate_limited"
elif status == 404:
code = "gemini_model_not_found"
if err_message:
message = f"Gemini HTTP {status} ({err_status or 'error'}): {err_message}"
else:
message = f"Gemini returned HTTP {status}: {body_text[:500]}"
return GeminiAPIError(
message,
code=code,
status_code=status,
response=response,
retry_after=retry_after,
details={
"status": err_status,
"reason": reason,
"metadata": metadata,
"message": err_message,
},
)
class _GeminiChatCompletions:
def __init__(self, client: "GeminiNativeClient"):
self._client = client
def create(self, **kwargs: Any) -> Any:
return self._client._create_chat_completion(**kwargs)
class _AsyncGeminiChatCompletions:
def __init__(self, client: "AsyncGeminiNativeClient"):
self._client = client
async def create(self, **kwargs: Any) -> Any:
return await self._client._create_chat_completion(**kwargs)
class _GeminiChatNamespace:
def __init__(self, client: "GeminiNativeClient"):
self.completions = _GeminiChatCompletions(client)
class _AsyncGeminiChatNamespace:
def __init__(self, client: "AsyncGeminiNativeClient"):
self.completions = _AsyncGeminiChatCompletions(client)
class GeminiNativeClient:
"""Minimal OpenAI-SDK-compatible facade over Gemini's native REST API."""
def __init__(
self,
*,
api_key: str,
base_url: Optional[str] = None,
default_headers: Optional[Dict[str, str]] = None,
timeout: Any = None,
http_client: Optional[httpx.Client] = None,
**_: Any,
) -> None:
self.api_key = api_key
normalized_base = (base_url or DEFAULT_GEMINI_BASE_URL).rstrip("/")
if normalized_base.endswith("/openai"):
normalized_base = normalized_base[: -len("/openai")]
self.base_url = normalized_base
self._default_headers = dict(default_headers or {})
self.chat = _GeminiChatNamespace(self)
self.is_closed = False
self._http = http_client or httpx.Client(
timeout=timeout or httpx.Timeout(connect=15.0, read=600.0, write=30.0, pool=30.0)
)
def close(self) -> None:
self.is_closed = True
try:
self._http.close()
except Exception:
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def _headers(self) -> Dict[str, str]:
headers = {
"Content-Type": "application/json",
"Accept": "application/json",
"x-goog-api-key": self.api_key,
"User-Agent": "hermes-agent (gemini-native)",
}
headers.update(self._default_headers)
return headers
@staticmethod
def _advance_stream_iterator(iterator: Iterator[_GeminiStreamChunk]) -> tuple[bool, Optional[_GeminiStreamChunk]]:
try:
return False, next(iterator)
except StopIteration:
return True, None
def _create_chat_completion(
self,
*,
model: str = "gemini-2.5-flash",
messages: Optional[List[Dict[str, Any]]] = None,
stream: bool = False,
tools: Any = None,
tool_choice: Any = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
top_p: Optional[float] = None,
stop: Any = None,
extra_body: Optional[Dict[str, Any]] = None,
timeout: Any = None,
**_: Any,
) -> Any:
thinking_config = None
if isinstance(extra_body, dict):
thinking_config = extra_body.get("thinking_config") or extra_body.get("thinkingConfig")
request = build_gemini_request(
messages=messages or [],
tools=tools,
tool_choice=tool_choice,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
stop=stop,
thinking_config=thinking_config,
)
if stream:
return self._stream_completion(model=model, request=request, timeout=timeout)
url = f"{self.base_url}/models/{model}:generateContent"
response = self._http.post(url, json=request, headers=self._headers(), timeout=timeout)
if response.status_code != 200:
raise gemini_http_error(response)
try:
payload = response.json()
except ValueError as exc:
raise GeminiAPIError(
f"Invalid JSON from Gemini native API: {exc}",
code="gemini_invalid_json",
status_code=response.status_code,
response=response,
) from exc
return translate_gemini_response(payload, model=model)
def _stream_completion(self, *, model: str, request: Dict[str, Any], timeout: Any = None) -> Iterator[_GeminiStreamChunk]:
url = f"{self.base_url}/models/{model}:streamGenerateContent?alt=sse"
stream_headers = dict(self._headers())
stream_headers["Accept"] = "text/event-stream"
def _generator() -> Iterator[_GeminiStreamChunk]:
try:
with self._http.stream("POST", url, json=request, headers=stream_headers, timeout=timeout) as response:
if response.status_code != 200:
response.read()
raise gemini_http_error(response)
tool_call_indices: Dict[str, Dict[str, Any]] = {}
for event in _iter_sse_events(response):
for chunk in translate_stream_event(event, model, tool_call_indices):
yield chunk
except httpx.HTTPError as exc:
raise GeminiAPIError(
f"Gemini streaming request failed: {exc}",
code="gemini_stream_error",
) from exc
return _generator()
class AsyncGeminiNativeClient:
"""Async wrapper used by auxiliary_client for native Gemini calls."""
def __init__(self, sync_client: GeminiNativeClient):
self._sync = sync_client
self.api_key = sync_client.api_key
self.base_url = sync_client.base_url
self.chat = _AsyncGeminiChatNamespace(self)
async def _create_chat_completion(self, **kwargs: Any) -> Any:
stream = bool(kwargs.get("stream"))
result = await asyncio.to_thread(self._sync.chat.completions.create, **kwargs)
if not stream:
return result
async def _async_stream() -> Any:
while True:
done, chunk = await asyncio.to_thread(self._sync._advance_stream_iterator, result)
if done:
break
yield chunk
return _async_stream()
async def close(self) -> None:
await asyncio.to_thread(self._sync.close)
+1 -1
View File
@@ -5287,7 +5287,7 @@ class HermesCLI:
print(" /cron list")
print(' /cron add "every 2h" "Check server status" [--skill blogwatcher]')
print(' /cron edit <job_id> --schedule "every 4h" --prompt "New task"')
print(" /cron edit <job_id> --skill blogwatcher --skill find-nearby")
print(" /cron edit <job_id> --skill blogwatcher --skill maps")
print(" /cron edit <job_id> --remove-skill blogwatcher")
print(" /cron edit <job_id> --clear-skills")
print(" /cron pause <job_id>")
+121 -17
View File
@@ -6,6 +6,7 @@ and implement the required methods.
"""
import asyncio
import inspect
import ipaddress
import logging
import os
@@ -880,10 +881,11 @@ class BasePlatformAdapter(ABC):
# working on a task after --replace or manual restarts.
self._background_tasks: set[asyncio.Task] = set()
# One-shot callbacks to fire after the main response is delivered.
# Keyed by session_key. GatewayRunner uses this to defer
# background-review notifications ("💾 Skill created") until the
# primary reply has been sent.
self._post_delivery_callbacks: Dict[str, Callable] = {}
# Keyed by session_key. Values are either a bare callback (legacy) or
# a ``(generation, callback)`` tuple so GatewayRunner can make deferred
# deliveries generation-aware and avoid stale runs clearing callbacks
# registered by a fresher run for the same session.
self._post_delivery_callbacks: Dict[str, Any] = {}
self._expected_cancelled_tasks: set[asyncio.Task] = set()
self._busy_session_handler: Optional[Callable[[MessageEvent, str], Awaitable[bool]]] = None
# Chats where auto-TTS on voice input is disabled (set by /voice off)
@@ -1401,7 +1403,13 @@ class BasePlatformAdapter(ABC):
return paths, cleaned
async def _keep_typing(self, chat_id: str, interval: float = 2.0, metadata=None) -> None:
async def _keep_typing(
self,
chat_id: str,
interval: float = 2.0,
metadata=None,
stop_event: asyncio.Event | None = None,
) -> None:
"""
Continuously send typing indicator until cancelled.
@@ -1415,9 +1423,18 @@ class BasePlatformAdapter(ABC):
"""
try:
while True:
if stop_event is not None and stop_event.is_set():
return
if chat_id not in self._typing_paused:
await self.send_typing(chat_id, metadata=metadata)
await asyncio.sleep(interval)
if stop_event is None:
await asyncio.sleep(interval)
continue
try:
await asyncio.wait_for(stop_event.wait(), timeout=interval)
except asyncio.TimeoutError:
continue
return
except asyncio.CancelledError:
pass # Normal cancellation when handler completes
finally:
@@ -1444,6 +1461,59 @@ class BasePlatformAdapter(ABC):
"""Resume typing indicator for a chat after approval resolves."""
self._typing_paused.discard(chat_id)
async def interrupt_session_activity(self, session_key: str, chat_id: str) -> None:
"""Signal the active session loop to stop and clear typing immediately."""
if session_key:
interrupt_event = self._active_sessions.get(session_key)
if interrupt_event is not None:
interrupt_event.set()
try:
await self.stop_typing(chat_id)
except Exception:
pass
def register_post_delivery_callback(
self,
session_key: str,
callback: Callable,
*,
generation: int | None = None,
) -> None:
"""Register a deferred callback to fire after the main response.
``generation`` lets callers tie the callback to a specific gateway run
generation so stale runs cannot clear callbacks owned by a fresher run.
"""
if not session_key or not callable(callback):
return
if generation is None:
self._post_delivery_callbacks[session_key] = callback
else:
self._post_delivery_callbacks[session_key] = (int(generation), callback)
def pop_post_delivery_callback(
self,
session_key: str,
*,
generation: int | None = None,
) -> Callable | None:
"""Pop a deferred callback, optionally requiring generation ownership."""
if not session_key:
return None
entry = self._post_delivery_callbacks.get(session_key)
if entry is None:
return None
if isinstance(entry, tuple) and len(entry) == 2:
entry_generation, callback = entry
if generation is not None and int(entry_generation) != int(generation):
return None
self._post_delivery_callbacks.pop(session_key, None)
return callback if callable(callback) else None
if generation is not None:
return None
self._post_delivery_callbacks.pop(session_key, None)
return entry if callable(entry) else None
# ── Processing lifecycle hooks ──────────────────────────────────────────
# Subclasses override these to react to message processing events
# (e.g. Discord adds 👀/✅/❌ reactions).
@@ -1714,10 +1784,23 @@ class BasePlatformAdapter(ABC):
# Fall back to a new Event only if the entry was removed externally.
interrupt_event = self._active_sessions.get(session_key) or asyncio.Event()
self._active_sessions[session_key] = interrupt_event
callback_generation = getattr(interrupt_event, "_hermes_run_generation", None)
# Start continuous typing indicator (refreshes every 2 seconds)
_thread_metadata = {"thread_id": event.source.thread_id} if event.source.thread_id else None
typing_task = asyncio.create_task(self._keep_typing(event.source.chat_id, metadata=_thread_metadata))
_keep_typing_kwargs = {"metadata": _thread_metadata}
try:
_keep_typing_sig = inspect.signature(self._keep_typing)
except (TypeError, ValueError):
_keep_typing_sig = None
if _keep_typing_sig is None or "stop_event" in _keep_typing_sig.parameters:
_keep_typing_kwargs["stop_event"] = interrupt_event
typing_task = asyncio.create_task(
self._keep_typing(
event.source.chat_id,
**_keep_typing_kwargs,
)
)
try:
await self._run_processing_hook("on_processing_start", event)
@@ -1976,7 +2059,14 @@ class BasePlatformAdapter(ABC):
finally:
# Fire any one-shot post-delivery callback registered for this
# session (e.g. deferred background-review notifications).
_post_cb = getattr(self, "_post_delivery_callbacks", {}).pop(session_key, None)
_callback_generation = callback_generation
if hasattr(self, "pop_post_delivery_callback"):
_post_cb = self.pop_post_delivery_callback(
session_key,
generation=_callback_generation,
)
else:
_post_cb = getattr(self, "_post_delivery_callbacks", {}).pop(session_key, None)
if callable(_post_cb):
try:
_post_cb()
@@ -2022,10 +2112,10 @@ class BasePlatformAdapter(ABC):
pass
# Leave _active_sessions[session_key] populated — the drain
# task's own lifecycle will clean it up.
return
# Clean up session tracking
if session_key in self._active_sessions:
del self._active_sessions[session_key]
else:
# Clean up session tracking
if session_key in self._active_sessions:
del self._active_sessions[session_key]
async def cancel_background_tasks(self) -> None:
"""Cancel any in-flight background message-processing tasks.
@@ -2033,12 +2123,26 @@ class BasePlatformAdapter(ABC):
Used during gateway shutdown/replacement so active sessions from the old
process do not keep running after adapters are being torn down.
"""
tasks = [task for task in self._background_tasks if not task.done()]
for task in tasks:
self._expected_cancelled_tasks.add(task)
task.cancel()
if tasks:
# Loop until no new tasks appear. Without this, a message
# arriving during the `await asyncio.gather` below would spawn
# a fresh _process_message_background task (added to
# self._background_tasks at line ~1668 via handle_message),
# and the _background_tasks.clear() at the end of this method
# would drop the reference — the task runs untracked against a
# disconnecting adapter, logs send-failures, and may linger
# until it completes on its own. Retrying the drain until the
# task set stabilizes closes the window.
MAX_DRAIN_ROUNDS = 5
for _ in range(MAX_DRAIN_ROUNDS):
tasks = [task for task in self._background_tasks if not task.done()]
if not tasks:
break
for task in tasks:
self._expected_cancelled_tasks.add(task)
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
# Loop: late-arrival tasks spawned during the gather above
# will be in self._background_tasks now. Re-check.
self._background_tasks.clear()
self._expected_cancelled_tasks.clear()
self._pending_messages.clear()
+79 -37
View File
@@ -498,6 +498,7 @@ class DiscordAdapter(BasePlatformAdapter):
self._allowed_role_ids: set = set() # For DISCORD_ALLOWED_ROLES filtering
# Voice channel state (per-guild)
self._voice_clients: Dict[int, Any] = {} # guild_id -> VoiceClient
self._voice_locks: Dict[int, asyncio.Lock] = {} # guild_id -> serialize join/leave
# Text batching: merge rapid successive messages (Telegram-style)
self._text_batch_delay_seconds = float(os.getenv("HERMES_DISCORD_TEXT_BATCH_DELAY_SECONDS", "0.6"))
self._text_batch_split_delay_seconds = float(os.getenv("HERMES_DISCORD_TEXT_BATCH_SPLIT_DELAY_SECONDS", "2.0"))
@@ -636,6 +637,30 @@ class DiscordAdapter(BasePlatformAdapter):
@self._client.event
async def on_message(message: DiscordMessage):
# Wait for on_ready to finish resolving username-based
# allowlist entries. Without this block, messages
# arriving between Discord's READY event and the end
# of _resolve_allowed_usernames compare author IDs
# (numeric) against a set that may still contain raw
# usernames (strings) from DISCORD_ALLOWED_USERS —
# legitimate users get silently rejected for the first
# few seconds after every reconnect. The wait is a
# near-instant no-op in steady state (_ready_event is
# already set); only the startup / reconnect window
# ever blocks.
if not adapter_self._ready_event.is_set():
try:
await asyncio.wait_for(
adapter_self._ready_event.wait(),
timeout=30.0,
)
except asyncio.TimeoutError:
logger.warning(
"[%s] on_message timed out waiting for _ready_event; "
"allowlist check may use pre-resolved entries",
adapter_self.name,
)
# Dedup: Discord RESUME replays events after reconnects (#4777)
if adapter_self._dedup.is_duplicate(str(message.id)):
return
@@ -1231,57 +1256,74 @@ class DiscordAdapter(BasePlatformAdapter):
# Voice channel methods (join / leave / play)
# ------------------------------------------------------------------
def _voice_lock_for(self, guild_id: int) -> "asyncio.Lock":
"""Return the per-guild lock, creating it on first use.
Voice join/leave/move must be serialized per guild without
this, two concurrent /voice channel invocations both see
_voice_clients.get(guild_id) return None, both call
channel.connect(), and discord.py raises ClientException
('Already connected') on the loser.
"""
lock = self._voice_locks.get(guild_id)
if lock is None:
lock = asyncio.Lock()
self._voice_locks[guild_id] = lock
return lock
async def join_voice_channel(self, channel) -> bool:
"""Join a Discord voice channel. Returns True on success."""
if not self._client or not DISCORD_AVAILABLE:
return False
guild_id = channel.guild.id
# Already connected in this guild?
existing = self._voice_clients.get(guild_id)
if existing and existing.is_connected():
if existing.channel.id == channel.id:
async with self._voice_lock_for(guild_id):
# Already connected in this guild?
existing = self._voice_clients.get(guild_id)
if existing and existing.is_connected():
if existing.channel.id == channel.id:
self._reset_voice_timeout(guild_id)
return True
await existing.move_to(channel)
self._reset_voice_timeout(guild_id)
return True
await existing.move_to(channel)
vc = await channel.connect()
self._voice_clients[guild_id] = vc
self._reset_voice_timeout(guild_id)
# Start voice receiver (Phase 2: listen to users)
try:
receiver = VoiceReceiver(vc, allowed_user_ids=self._allowed_user_ids)
receiver.start()
self._voice_receivers[guild_id] = receiver
self._voice_listen_tasks[guild_id] = asyncio.ensure_future(
self._voice_listen_loop(guild_id)
)
except Exception as e:
logger.warning("Voice receiver failed to start: %s", e)
return True
vc = await channel.connect()
self._voice_clients[guild_id] = vc
self._reset_voice_timeout(guild_id)
# Start voice receiver (Phase 2: listen to users)
try:
receiver = VoiceReceiver(vc, allowed_user_ids=self._allowed_user_ids)
receiver.start()
self._voice_receivers[guild_id] = receiver
self._voice_listen_tasks[guild_id] = asyncio.ensure_future(
self._voice_listen_loop(guild_id)
)
except Exception as e:
logger.warning("Voice receiver failed to start: %s", e)
return True
async def leave_voice_channel(self, guild_id: int) -> None:
"""Disconnect from the voice channel in a guild."""
# Stop voice receiver first
receiver = self._voice_receivers.pop(guild_id, None)
if receiver:
receiver.stop()
listen_task = self._voice_listen_tasks.pop(guild_id, None)
if listen_task:
listen_task.cancel()
async with self._voice_lock_for(guild_id):
# Stop voice receiver first
receiver = self._voice_receivers.pop(guild_id, None)
if receiver:
receiver.stop()
listen_task = self._voice_listen_tasks.pop(guild_id, None)
if listen_task:
listen_task.cancel()
vc = self._voice_clients.pop(guild_id, None)
if vc and vc.is_connected():
await vc.disconnect()
task = self._voice_timeout_tasks.pop(guild_id, None)
if task:
task.cancel()
self._voice_text_channels.pop(guild_id, None)
self._voice_sources.pop(guild_id, None)
vc = self._voice_clients.pop(guild_id, None)
if vc and vc.is_connected():
await vc.disconnect()
task = self._voice_timeout_tasks.pop(guild_id, None)
if task:
task.cancel()
self._voice_text_channels.pop(guild_id, None)
self._voice_sources.pop(guild_id, None)
# Maximum seconds to wait for voice playback before giving up
PLAYBACK_TIMEOUT = 120
+53 -8
View File
@@ -119,6 +119,8 @@ _MARKDOWN_HINT_RE = re.compile(
re.MULTILINE,
)
_MARKDOWN_LINK_RE = re.compile(r"\[([^\]]+)\]\(([^)]+)\)")
_MARKDOWN_FENCE_OPEN_RE = re.compile(r"^```([^\n`]*)\s*$")
_MARKDOWN_FENCE_CLOSE_RE = re.compile(r"^```\s*$")
_MENTION_RE = re.compile(r"@_user_\d+")
_MULTISPACE_RE = re.compile(r"[ \t]{2,}")
_POST_CONTENT_INVALID_RE = re.compile(r"content format of the post type is incorrect", re.IGNORECASE)
@@ -430,23 +432,66 @@ def _coerce_required_int(value: Any, default: int, min_value: int = 0) -> int:
def _build_markdown_post_payload(content: str) -> str:
rows = _build_markdown_post_rows(content)
return json.dumps(
{
"zh_cn": {
"content": [
[
{
"tag": "md",
"text": content,
}
]
],
"content": rows,
}
},
ensure_ascii=False,
)
def _build_markdown_post_rows(content: str) -> List[List[Dict[str, str]]]:
"""Build Feishu post rows while isolating fenced code blocks.
Feishu's `md` renderer can swallow trailing content when a fenced code block
appears inside one large markdown element. Split the reply at real fence
lines so prose before/after the code block remains visible while code stays
in a dedicated row.
"""
if not content:
return [[{"tag": "md", "text": ""}]]
if "```" not in content:
return [[{"tag": "md", "text": content}]]
rows: List[List[Dict[str, str]]] = []
current: List[str] = []
in_code_block = False
def _flush_current() -> None:
nonlocal current
if not current:
return
segment = "\n".join(current)
if segment.strip():
rows.append([{"tag": "md", "text": segment}])
current = []
for raw_line in content.splitlines():
stripped_line = raw_line.strip()
is_fence = bool(
_MARKDOWN_FENCE_CLOSE_RE.match(stripped_line)
if in_code_block
else _MARKDOWN_FENCE_OPEN_RE.match(stripped_line)
)
if is_fence:
if not in_code_block:
_flush_current()
current.append(raw_line)
in_code_block = not in_code_block
if not in_code_block:
_flush_current()
continue
current.append(raw_line)
_flush_current()
return rows or [[{"tag": "md", "text": content}]]
def parse_feishu_post_payload(payload: Any) -> FeishuPostParseResult:
resolved = _resolve_post_payload(payload)
if not resolved:
+19 -4
View File
@@ -1657,6 +1657,21 @@ class TelegramAdapter(BasePlatformAdapter):
except Exception as exc:
logger.error("Failed to write update response from callback: %s", exc)
def _missing_media_path_error(self, label: str, path: str) -> str:
"""Build an actionable file-not-found error for gateway MEDIA delivery.
Paths like /workspace/... or /output/... often only exist inside the
Docker sandbox, while the gateway process runs on the host.
"""
error = f"{label} file not found: {path}"
if path.startswith(("/workspace/", "/output/", "/outputs/")):
error += (
" (path may only exist inside the Docker sandbox. "
"Bind-mount a host directory and emit the host-visible "
"path in MEDIA: for gateway file delivery.)"
)
return error
async def send_voice(
self,
chat_id: str,
@@ -1673,7 +1688,7 @@ class TelegramAdapter(BasePlatformAdapter):
try:
import os
if not os.path.exists(audio_path):
return SendResult(success=False, error=f"Audio file not found: {audio_path}")
return SendResult(success=False, error=self._missing_media_path_error("Audio", audio_path))
with open(audio_path, "rb") as audio_file:
# .ogg files -> send as voice (round playable bubble)
@@ -1722,7 +1737,7 @@ class TelegramAdapter(BasePlatformAdapter):
try:
import os
if not os.path.exists(image_path):
return SendResult(success=False, error=f"Image file not found: {image_path}")
return SendResult(success=False, error=self._missing_media_path_error("Image", image_path))
_thread = self._metadata_thread_id(metadata)
with open(image_path, "rb") as image_file:
@@ -1759,7 +1774,7 @@ class TelegramAdapter(BasePlatformAdapter):
try:
if not os.path.exists(file_path):
return SendResult(success=False, error=f"File not found: {file_path}")
return SendResult(success=False, error=self._missing_media_path_error("File", file_path))
display_name = file_name or os.path.basename(file_path)
_thread = self._metadata_thread_id(metadata)
@@ -1793,7 +1808,7 @@ class TelegramAdapter(BasePlatformAdapter):
try:
if not os.path.exists(video_path):
return SendResult(success=False, error=f"Video file not found: {video_path}")
return SendResult(success=False, error=self._missing_media_path_error("Video", video_path))
_thread = self._metadata_thread_id(metadata)
with open(video_path, "rb") as f:
+103
View File
@@ -13,6 +13,10 @@ Each route defines:
- skills: optional list of skills to load for the agent
- deliver: where to send the response (github_comment, telegram, etc.)
- deliver_extra: additional delivery config (repo, pr_number, chat_id)
- deliver_only: if true, skip the agent the rendered prompt IS the
message that gets delivered. Use for external push notifications
(Supabase, monitoring alerts, inter-agent pings) where zero LLM cost
and sub-second delivery matter more than agent reasoning.
Security:
- HMAC secret is required per route (validated at startup)
@@ -122,6 +126,19 @@ class WebhookAdapter(BasePlatformAdapter):
f"For testing without auth, set secret to '{_INSECURE_NO_AUTH}'."
)
# deliver_only routes bypass the agent — the POST body becomes a
# direct push notification via the configured delivery target.
# Validate up-front so misconfiguration surfaces at startup rather
# than on the first webhook POST.
if route.get("deliver_only"):
deliver = route.get("deliver", "log")
if not deliver or deliver == "log":
raise ValueError(
f"[webhook] Route '{name}' has deliver_only=true but "
f"deliver is '{deliver}'. Direct delivery requires a "
f"real target (telegram, discord, slack, github_comment, etc.)."
)
app = web.Application()
app.router.add_get("/health", self._handle_health)
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
@@ -419,6 +436,64 @@ class WebhookAdapter(BasePlatformAdapter):
)
self._seen_deliveries[delivery_id] = now
# ── Direct delivery mode (deliver_only) ─────────────────
# Skip the agent entirely — the rendered prompt IS the message we
# deliver. Use case: external services (Supabase, monitoring,
# cron jobs, other agents) that need to push a plain notification
# to a user's chat with zero LLM cost. Reuses the same HMAC auth,
# rate limiting, idempotency, and template rendering as agent mode.
if route_config.get("deliver_only"):
delivery = {
"deliver": route_config.get("deliver", "log"),
"deliver_extra": self._render_delivery_extra(
route_config.get("deliver_extra", {}), payload
),
"payload": payload,
}
logger.info(
"[webhook] direct-deliver event=%s route=%s target=%s msg_len=%d delivery=%s",
event_type,
route_name,
delivery["deliver"],
len(prompt),
delivery_id,
)
try:
result = await self._direct_deliver(prompt, delivery)
except Exception:
logger.exception(
"[webhook] direct-deliver failed route=%s delivery=%s",
route_name,
delivery_id,
)
return web.json_response(
{"status": "error", "error": "Delivery failed", "delivery_id": delivery_id},
status=502,
)
if result.success:
return web.json_response(
{
"status": "delivered",
"route": route_name,
"target": delivery["deliver"],
"delivery_id": delivery_id,
},
status=200,
)
# Delivery attempted but target rejected it — surface as 502
# with a generic error (don't leak adapter-level detail).
logger.warning(
"[webhook] direct-deliver target rejected route=%s target=%s error=%s",
route_name,
delivery["deliver"],
result.error,
)
return web.json_response(
{"status": "error", "error": "Delivery failed", "delivery_id": delivery_id},
status=502,
)
# Use delivery_id in session key so concurrent webhooks on the
# same route get independent agent runs (not queued/interrupted).
session_chat_id = f"webhook:{route_name}:{delivery_id}"
@@ -572,6 +647,34 @@ class WebhookAdapter(BasePlatformAdapter):
# Response delivery
# ------------------------------------------------------------------
async def _direct_deliver(
self, content: str, delivery: dict
) -> SendResult:
"""Deliver *content* directly without invoking the agent.
Used by ``deliver_only`` routes: the rendered template becomes the
literal message body, and we dispatch to the same delivery helpers
that the agent-mode ``send()`` flow uses. All target types that
work in agent mode work here Telegram, Discord, Slack, GitHub
PR comments, etc.
"""
deliver_type = delivery.get("deliver", "log")
if deliver_type == "log":
# Shouldn't reach here — startup validation rejects deliver_only
# with deliver=log — but guard defensively.
logger.info("[webhook] direct-deliver log-only: %s", content[:200])
return SendResult(success=True)
if deliver_type == "github_comment":
return await self._deliver_github_comment(content, delivery)
# Fall through to the cross-platform dispatcher, which validates the
# target name and routes via the gateway runner.
return await self._deliver_cross_platform(
deliver_type, content, delivery
)
async def _deliver_github_comment(
self, content: str, delivery: dict
) -> SendResult:
+316 -35
View File
@@ -96,6 +96,10 @@ from hermes_cli.env_loader import load_hermes_dotenv
_env_path = _hermes_home / '.env'
load_hermes_dotenv(hermes_home=_hermes_home, project_env=Path(__file__).resolve().parents[1] / '.env')
_DOCKER_VOLUME_SPEC_RE = re.compile(r"^(?P<host>.+):(?P<container>/[^:]+?)(?::(?P<options>[^:]+))?$")
_DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS = {"/output", "/outputs"}
# Bridge config.yaml values into the environment so os.getenv() picks them up.
# config.yaml is authoritative for terminal settings — overrides .env.
_config_path = _hermes_home / 'config.yaml'
@@ -398,6 +402,33 @@ def _dequeue_pending_event(adapter, session_key: str) -> MessageEvent | None:
return adapter.get_pending_message(session_key)
_INTERRUPT_REASON_STOP = "Stop requested"
_INTERRUPT_REASON_RESET = "Session reset requested"
_INTERRUPT_REASON_TIMEOUT = "Execution timed out (inactivity)"
_INTERRUPT_REASON_SSE_DISCONNECT = "SSE client disconnected"
_INTERRUPT_REASON_GATEWAY_SHUTDOWN = "Gateway shutting down"
_INTERRUPT_REASON_GATEWAY_RESTART = "Gateway restarting"
_CONTROL_INTERRUPT_MESSAGES = frozenset(
{
_INTERRUPT_REASON_STOP.lower(),
_INTERRUPT_REASON_RESET.lower(),
_INTERRUPT_REASON_TIMEOUT.lower(),
_INTERRUPT_REASON_SSE_DISCONNECT.lower(),
_INTERRUPT_REASON_GATEWAY_SHUTDOWN.lower(),
_INTERRUPT_REASON_GATEWAY_RESTART.lower(),
}
)
def _is_control_interrupt_message(message: Optional[str]) -> bool:
"""Return True when an interrupt message is internal control flow."""
if not message:
return False
normalized = " ".join(str(message).strip().split()).lower()
return normalized in _CONTROL_INTERRUPT_MESSAGES
def _check_unavailable_skill(command_name: str) -> str | None:
"""Check if a command matches a known-but-inactive skill.
@@ -585,6 +616,7 @@ class GatewayRunner:
def __init__(self, config: Optional[GatewayConfig] = None):
self.config = config or load_gateway_config()
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
self._warn_if_docker_media_delivery_is_risky()
# Load ephemeral config from config.yaml / env vars.
# Both are injected at API-call time only and never persisted.
@@ -625,6 +657,7 @@ class GatewayRunner:
self._running_agents_ts: Dict[str, float] = {} # start timestamp per session
self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt
self._busy_ack_ts: Dict[str, float] = {} # last busy-ack timestamp per session (debounce)
self._session_run_generation: Dict[str, int] = {}
# Cache AIAgent instances per session to preserve prompt caching.
# Without this, a new AIAgent is created per message, rebuilding the
@@ -691,6 +724,53 @@ class GatewayRunner:
self._background_tasks: set = set()
def _warn_if_docker_media_delivery_is_risky(self) -> None:
"""Warn when Docker-backed gateways lack an explicit export mount.
MEDIA delivery happens in the gateway process, so paths emitted by the model
must be readable from the host. A plain container-local path like
`/workspace/report.txt` or `/output/report.txt` often exists only inside
Docker, so users commonly need a dedicated export mount such as
`host-dir:/output`.
"""
if os.getenv("TERMINAL_ENV", "").strip().lower() != "docker":
return
connected = self.config.get_connected_platforms()
messaging_platforms = [p for p in connected if p not in {Platform.LOCAL, Platform.API_SERVER, Platform.WEBHOOK}]
if not messaging_platforms:
return
raw_volumes = os.getenv("TERMINAL_DOCKER_VOLUMES", "").strip()
volumes: List[str] = []
if raw_volumes:
try:
parsed = json.loads(raw_volumes)
if isinstance(parsed, list):
volumes = [str(v) for v in parsed if isinstance(v, str)]
except Exception:
logger.debug("Could not parse TERMINAL_DOCKER_VOLUMES for gateway media warning", exc_info=True)
has_explicit_output_mount = False
for spec in volumes:
match = _DOCKER_VOLUME_SPEC_RE.match(spec)
if not match:
continue
container_path = match.group("container")
if container_path in _DOCKER_MEDIA_OUTPUT_CONTAINER_PATHS:
has_explicit_output_mount = True
break
if has_explicit_output_mount:
return
logger.warning(
"Docker backend is enabled for the messaging gateway but no explicit host-visible "
"output mount (for example '/home/user/.hermes/cache/documents:/output') is configured. "
"This is fine if the model already emits host-visible paths, but MEDIA file delivery can fail "
"for container-local paths like '/workspace/...' or '/output/...'."
)
# -- Setup skill availability ----------------------------------------
@@ -2441,7 +2521,7 @@ class GatewayRunner:
_sk[:20], _e,
)
self._interrupt_running_agents(
"Gateway restarting" if self._restart_requested else "Gateway shutting down"
_INTERRUPT_REASON_GATEWAY_RESTART if self._restart_requested else _INTERRUPT_REASON_GATEWAY_SHUTDOWN
)
interrupt_deadline = asyncio.get_running_loop().time() + 5.0
while self._running_agents and asyncio.get_running_loop().time() < interrupt_deadline:
@@ -3012,6 +3092,10 @@ class GatewayRunner:
_quick_key[:30], _stale_age, _stale_idle,
_raw_stale_timeout, _stale_detail,
)
self._invalidate_session_run_generation(
_quick_key,
reason="stale_running_agent_eviction",
)
self._release_running_agent_state(_quick_key)
if _quick_key in self._running_agents:
@@ -3035,15 +3119,12 @@ class GatewayRunner:
# _interrupt_requested. Force-clean _running_agents so the session
# is unlocked and subsequent messages are processed normally.
if _cmd_def_inner and _cmd_def_inner.name == "stop":
running_agent = self._running_agents.get(_quick_key)
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
running_agent.interrupt("Stop requested")
# Force-clean: remove the session lock regardless of agent state
adapter = self.adapters.get(source.platform)
if adapter and hasattr(adapter, 'get_pending_message'):
adapter.get_pending_message(_quick_key) # consume and discard
self._pending_messages.pop(_quick_key, None)
self._release_running_agent_state(_quick_key)
await self._interrupt_and_clear_session(
_quick_key,
source,
interrupt_reason=_INTERRUPT_REASON_STOP,
invalidation_reason="stop_command",
)
logger.info("STOP for session %s — agent interrupted, session lock released", _quick_key[:20])
return "⚡ Stopped. You can continue this session."
@@ -3055,17 +3136,15 @@ class GatewayRunner:
# doesn't get re-processed as a user message after the
# interrupt completes.
if _cmd_def_inner and _cmd_def_inner.name == "new":
running_agent = self._running_agents.get(_quick_key)
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
running_agent.interrupt("Session reset requested")
# Clear any pending messages so the old text doesn't replay
adapter = self.adapters.get(source.platform)
if adapter and hasattr(adapter, 'get_pending_message'):
adapter.get_pending_message(_quick_key) # consume and discard
self._pending_messages.pop(_quick_key, None)
await self._interrupt_and_clear_session(
_quick_key,
source,
interrupt_reason=_INTERRUPT_REASON_RESET,
invalidation_reason="new_command",
)
# Clean up the running agent entry so the reset handler
# doesn't think an agent is still active.
self._release_running_agent_state(_quick_key)
return await self._handle_reset_command(event)
# /queue <prompt> — queue without interrupting
@@ -3546,9 +3625,10 @@ class GatewayRunner:
# same session — corrupting the transcript.
self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL
self._running_agents_ts[_quick_key] = time.time()
_run_generation = self._begin_session_run_generation(_quick_key)
try:
return await self._handle_message_with_agent(event, source, _quick_key)
return await self._handle_message_with_agent(event, source, _quick_key, _run_generation)
finally:
# If _run_agent replaced the sentinel with a real agent and
# then cleaned it up, this is a no-op. If we exited early
@@ -3719,7 +3799,7 @@ class GatewayRunner:
return message_text
async def _handle_message_with_agent(self, event, source, _quick_key: str):
async def _handle_message_with_agent(self, event, source, _quick_key: str, run_generation: int):
"""Inner handler that runs under the _running_agents sentinel guard."""
_msg_start_time = time.time()
_platform_name = source.platform.value if hasattr(source.platform, "value") else str(source.platform)
@@ -4176,6 +4256,15 @@ class GatewayRunner:
if message_text is None:
return
# Bind this gateway run generation to the adapter's active-session
# event so deferred post-delivery callbacks can be released by the
# same run that registered them.
self._bind_adapter_run_generation(
self.adapters.get(source.platform),
session_key,
run_generation,
)
try:
# Emit agent:start hook
hook_ctx = {
@@ -4194,6 +4283,7 @@ class GatewayRunner:
source=source,
session_id=session_entry.session_id,
session_key=session_key,
run_generation=run_generation,
event_message_id=event.message_id,
channel_prompt=event.channel_prompt,
)
@@ -4206,6 +4296,22 @@ class GatewayRunner:
except Exception:
pass
if not self._is_session_run_current(_quick_key, run_generation):
logger.info(
"Discarding stale agent result for %s — generation %d is no longer current",
_quick_key[:20] if _quick_key else "?",
run_generation,
)
_stale_adapter = self.adapters.get(source.platform)
if getattr(type(_stale_adapter), "pop_post_delivery_callback", None) is not None:
_stale_adapter.pop_post_delivery_callback(
_quick_key,
generation=run_generation,
)
elif _stale_adapter and hasattr(_stale_adapter, "_post_delivery_callbacks"):
_stale_adapter._post_delivery_callbacks.pop(_quick_key, None)
return None
response = agent_result.get("final_response") or ""
# Convert the agent's internal "(empty)" sentinel into a
@@ -4620,6 +4726,7 @@ class GatewayRunner:
# Get existing session key
session_key = self._session_key_for_source(source)
self._invalidate_session_run_generation(session_key, reason="session_reset")
# Flush memories in the background (fire-and-forget) so the user
# gets the "Session reset!" response immediately.
@@ -4879,14 +4986,23 @@ class GatewayRunner:
agent = self._running_agents.get(session_key)
if agent is _AGENT_PENDING_SENTINEL:
# Force-clean the sentinel so the session is unlocked.
self._release_running_agent_state(session_key)
await self._interrupt_and_clear_session(
session_key,
source,
interrupt_reason=_INTERRUPT_REASON_STOP,
invalidation_reason="stop_command_pending",
)
logger.info("STOP (pending) for session %s — sentinel cleared", session_key[:20])
return "⚡ Stopped. The agent hadn't started yet — you can continue this session."
if agent:
agent.interrupt("Stop requested")
# Force-clean the session lock so a truly hung agent doesn't
# keep it locked forever.
self._release_running_agent_state(session_key)
await self._interrupt_and_clear_session(
session_key,
source,
interrupt_reason=_INTERRUPT_REASON_STOP,
invalidation_reason="stop_command_handler",
)
return "⚡ Stopped. You can continue this session."
else:
return "No active task to stop."
@@ -8333,6 +8449,84 @@ class GatewayRunner:
if hasattr(self, "_busy_ack_ts"):
self._busy_ack_ts.pop(session_key, None)
def _begin_session_run_generation(self, session_key: str) -> int:
"""Claim a fresh run generation token for ``session_key``.
Every top-level gateway turn gets a monotonically increasing token.
If a later command like /stop or /new invalidates that token while the
old worker is still unwinding, the late result can be recognized and
dropped instead of bleeding into the fresh session.
"""
if not session_key:
return 0
generations = self.__dict__.get("_session_run_generation")
if generations is None:
generations = {}
self._session_run_generation = generations
next_generation = int(generations.get(session_key, 0)) + 1
generations[session_key] = next_generation
return next_generation
def _invalidate_session_run_generation(self, session_key: str, *, reason: str = "") -> int:
"""Invalidate any in-flight run token for ``session_key``."""
generation = self._begin_session_run_generation(session_key)
if reason:
logger.info(
"Invalidated run generation for %s%d (%s)",
session_key[:20],
generation,
reason,
)
return generation
def _is_session_run_current(self, session_key: str, generation: int) -> bool:
"""Return True when ``generation`` is still current for ``session_key``."""
if not session_key:
return True
generations = self.__dict__.get("_session_run_generation") or {}
return int(generations.get(session_key, 0)) == int(generation)
def _bind_adapter_run_generation(
self,
adapter: Any,
session_key: str,
generation: int | None,
) -> None:
"""Bind a gateway run generation to the adapter's active-session event."""
if not adapter or not session_key or generation is None:
return
try:
interrupt_event = getattr(adapter, "_active_sessions", {}).get(session_key)
if interrupt_event is not None:
setattr(interrupt_event, "_hermes_run_generation", int(generation))
except Exception:
pass
async def _interrupt_and_clear_session(
self,
session_key: str,
source: SessionSource,
*,
interrupt_reason: str,
invalidation_reason: str,
release_running_state: bool = True,
) -> None:
"""Interrupt the current run and clear queued session state consistently."""
if not session_key:
return
running_agent = self._running_agents.get(session_key)
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
running_agent.interrupt(interrupt_reason)
self._invalidate_session_run_generation(session_key, reason=invalidation_reason)
adapter = self.adapters.get(source.platform)
if adapter and hasattr(adapter, "interrupt_session_activity"):
await adapter.interrupt_session_activity(session_key, source.chat_id)
if adapter and hasattr(adapter, "get_pending_message"):
adapter.get_pending_message(session_key) # consume and discard
self._pending_messages.pop(session_key, None)
if release_running_state:
self._release_running_agent_state(session_key)
def _evict_cached_agent(self, session_key: str) -> None:
"""Remove a cached agent for a session (called on /new, /model, etc)."""
_lock = getattr(self, "_agent_cache_lock", None)
@@ -8514,6 +8708,7 @@ class GatewayRunner:
source: "SessionSource",
session_id: str,
session_key: str = None,
run_generation: Optional[int] = None,
event_message_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Forward the message to a remote Hermes API server instead of
@@ -8549,6 +8744,11 @@ class GatewayRunner:
proxy_key = os.getenv("GATEWAY_PROXY_KEY", "").strip()
def _run_still_current() -> bool:
if run_generation is None or not session_key:
return True
return self._is_session_run_current(session_key, run_generation)
# Build messages in OpenAI chat format --------------------------
#
# The remote api_server can maintain session continuity via
@@ -8678,6 +8878,21 @@ class GatewayRunner:
# Parse SSE stream
buffer = ""
async for chunk in resp.content.iter_any():
if not _run_still_current():
logger.info(
"Discarding stale proxy stream for %s — generation %d is no longer current",
session_key[:20] if session_key else "?",
run_generation or 0,
)
return {
"final_response": "",
"messages": [],
"api_calls": 0,
"tools": [],
"history_offset": len(history),
"session_id": session_id,
"response_previewed": False,
}
text = chunk.decode("utf-8", errors="replace")
buffer += text
@@ -8727,6 +8942,21 @@ class GatewayRunner:
stream_task.cancel()
_elapsed = time.time() - _start
if not _run_still_current():
logger.info(
"Discarding stale proxy result for %s — generation %d is no longer current",
session_key[:20] if session_key else "?",
run_generation or 0,
)
return {
"final_response": "",
"messages": [],
"api_calls": 0,
"tools": [],
"history_offset": len(history),
"session_id": session_id,
"response_previewed": False,
}
logger.info(
"proxy response: url=%s session=%s time=%.1fs response=%d chars",
proxy_url, (session_id or "")[:20], _elapsed, len(full_response),
@@ -8755,6 +8985,7 @@ class GatewayRunner:
source: SessionSource,
session_id: str,
session_key: str = None,
run_generation: Optional[int] = None,
_interrupt_depth: int = 0,
event_message_id: Optional[str] = None,
channel_prompt: Optional[str] = None,
@@ -8780,11 +9011,17 @@ class GatewayRunner:
source=source,
session_id=session_id,
session_key=session_key,
run_generation=run_generation,
event_message_id=event_message_id,
)
from run_agent import AIAgent
import queue
def _run_still_current() -> bool:
if run_generation is None or not session_key:
return True
return self._is_session_run_current(session_key, run_generation)
user_config = _load_gateway_config()
platform_key = _platform_config_key(source.platform)
@@ -8839,7 +9076,7 @@ class GatewayRunner:
def progress_callback(event_type: str, tool_name: str = None, preview: str = None, args: dict = None, **kwargs):
"""Callback invoked by agent on tool lifecycle events."""
if not progress_queue:
if not progress_queue or not _run_still_current():
return
# Only act on tool.started events (ignore tool.completed, reasoning.available, etc.)
@@ -8944,6 +9181,14 @@ class GatewayRunner:
while True:
try:
if not _run_still_current():
while not progress_queue.empty():
try:
progress_queue.get_nowait()
except Exception:
break
return
raw = progress_queue.get_nowait()
# Handle dedup messages: update last line with repeat counter
@@ -8969,6 +9214,9 @@ class GatewayRunner:
await asyncio.sleep(_remaining)
continue
if not _run_still_current():
return
if can_edit and progress_msg_id is not None:
# Try to edit the existing progress message
full_text = "\n".join(progress_lines)
@@ -9004,7 +9252,8 @@ class GatewayRunner:
# Restore typing indicator
await asyncio.sleep(0.3)
await adapter.send_typing(source.chat_id, metadata=_progress_metadata)
if _run_still_current():
await adapter.send_typing(source.chat_id, metadata=_progress_metadata)
except queue.Empty:
await asyncio.sleep(0.3)
@@ -9048,6 +9297,8 @@ class GatewayRunner:
_hooks_ref = self.hooks
def _step_callback_sync(iteration: int, prev_tools: list) -> None:
if not _run_still_current():
return
try:
# prev_tools may be list[str] or list[dict] with "name"/"result"
# keys. Normalise to keep "tool_names" backward-compatible for
@@ -9078,7 +9329,7 @@ class GatewayRunner:
_status_thread_metadata = {"thread_id": _progress_thread_id} if _progress_thread_id else None
def _status_callback_sync(event_type: str, message: str) -> None:
if not _status_adapter:
if not _status_adapter or not _run_still_current():
return
try:
asyncio.run_coroutine_threadsafe(
@@ -9209,12 +9460,16 @@ class GatewayRunner:
metadata={"thread_id": _progress_thread_id} if _progress_thread_id else None,
)
if _want_stream_deltas:
_stream_delta_cb = _stream_consumer.on_delta
def _stream_delta_cb(text: str) -> None:
if _run_still_current():
_stream_consumer.on_delta(text)
stream_consumer_holder[0] = _stream_consumer
except Exception as _sc_err:
logger.debug("Could not set up stream consumer: %s", _sc_err)
def _interim_assistant_cb(text: str, *, already_streamed: bool = False) -> None:
if not _run_still_current():
return
if _stream_consumer is not None:
if already_streamed:
_stream_consumer.on_segment_break()
@@ -9318,7 +9573,7 @@ class GatewayRunner:
_bg_review_pending_lock = threading.Lock()
def _deliver_bg_review_message(message: str) -> None:
if not _status_adapter:
if not _status_adapter or not _run_still_current():
return
try:
asyncio.run_coroutine_threadsafe(
@@ -9342,7 +9597,7 @@ class GatewayRunner:
# Background review delivery — send "💾 Memory updated" etc. to user
def _bg_review_send(message: str) -> None:
if not _status_adapter:
if not _status_adapter or not _run_still_current():
return
if not _bg_review_release.is_set():
with _bg_review_pending_lock:
@@ -9355,9 +9610,16 @@ class GatewayRunner:
# Register the release hook on the adapter so base.py's finally
# block can fire it after delivering the main response.
if _status_adapter and session_key:
_pdc = getattr(_status_adapter, "_post_delivery_callbacks", None)
if _pdc is not None:
_pdc[session_key] = _release_bg_review_messages
if getattr(type(_status_adapter), "register_post_delivery_callback", None) is not None:
_status_adapter.register_post_delivery_callback(
session_key,
_release_bg_review_messages,
generation=run_generation,
)
else:
_pdc = getattr(_status_adapter, "_post_delivery_callbacks", None)
if _pdc is not None:
_pdc[session_key] = _release_bg_review_messages
# Store agent reference for interrupt support
agent_holder[0] = agent
@@ -9959,7 +10221,7 @@ class GatewayRunner:
# Interrupt the agent if it's still running so the thread
# pool worker is freed.
if _timed_out_agent and hasattr(_timed_out_agent, "interrupt"):
_timed_out_agent.interrupt("Execution timed out (inactivity)")
_timed_out_agent.interrupt(_INTERRUPT_REASON_TIMEOUT)
_timeout_mins = int(_agent_timeout // 60) or 1
@@ -10024,7 +10286,15 @@ class GatewayRunner:
if result and adapter and session_key:
pending_event = _dequeue_pending_event(adapter, session_key)
if result.get("interrupted") and not pending_event and result.get("interrupt_message"):
pending = result.get("interrupt_message")
interrupt_message = result.get("interrupt_message")
if _is_control_interrupt_message(interrupt_message):
logger.info(
"Ignoring control interrupt message for session %s: %s",
session_key[:20] if session_key else "?",
interrupt_message,
)
else:
pending = interrupt_message
elif pending_event:
pending = pending_event.text or _build_media_placeholder(pending_event)
logger.debug("Processing queued message after agent completion: '%s...'", pending[:40])
@@ -10129,7 +10399,17 @@ class GatewayRunner:
# first response has been delivered. Pop from the
# adapter's callback dict (prevents double-fire in
# base.py's finally block) and call it.
if adapter and hasattr(adapter, "_post_delivery_callbacks"):
if getattr(type(adapter), "pop_post_delivery_callback", None) is not None:
_bg_cb = adapter.pop_post_delivery_callback(
session_key,
generation=run_generation,
)
if callable(_bg_cb):
try:
_bg_cb()
except Exception:
pass
elif adapter and hasattr(adapter, "_post_delivery_callbacks"):
_bg_cb = adapter._post_delivery_callbacks.pop(session_key, None)
if callable(_bg_cb):
try:
@@ -10177,6 +10457,7 @@ class GatewayRunner:
source=next_source,
session_id=session_id,
session_key=session_key,
run_generation=run_generation,
_interrupt_depth=_interrupt_depth + 1,
event_message_id=next_message_id,
channel_prompt=next_channel_prompt,
+24
View File
@@ -571,6 +571,30 @@ class GatewayStreamConsumer:
if final_text.strip() and final_text != self._visible_prefix():
continuation = final_text
else:
# Defence-in-depth for #7183: the last edit may still show the
# cursor character because fallback mode was entered after an
# edit failure left it stuck. Try one final edit to strip it
# so the message doesn't freeze with a visible ▉. Best-effort
# — if this edit also fails (flood control still active),
# _try_strip_cursor has already been called on fallback entry
# and the adaptive-backoff retries will have had their shot.
if (
self._message_id
and self._last_sent_text
and self.cfg.cursor
and self._last_sent_text.endswith(self.cfg.cursor)
):
clean_text = self._last_sent_text[:-len(self.cfg.cursor)]
try:
result = await self.adapter.edit_message(
chat_id=self.chat_id,
message_id=self._message_id,
content=clean_text,
)
if result.success:
self._last_sent_text = clean_text
except Exception:
pass
self._already_sent = True
self._final_response_sent = True
return
+1 -1
View File
@@ -151,7 +151,7 @@ PROVIDER_REGISTRY: Dict[str, ProviderConfig] = {
id="gemini",
name="Google AI Studio",
auth_type="api_key",
inference_base_url="https://generativelanguage.googleapis.com/v1beta/openai",
inference_base_url="https://generativelanguage.googleapis.com/v1beta",
api_key_env_vars=("GOOGLE_API_KEY", "GEMINI_API_KEY"),
base_url_env_var="GEMINI_BASE_URL",
),
+5 -1
View File
@@ -403,7 +403,11 @@ DEFAULT_CONFIG = {
"container_persistent": True, # Persist filesystem across sessions
# Docker volume mounts — share host directories with the container.
# Each entry is "host_path:container_path" (standard Docker -v syntax).
# Example: ["/home/user/projects:/workspace/projects", "/data:/data"]
# Example:
# ["/home/user/projects:/workspace/projects",
# "/home/user/.hermes/cache/documents:/output"]
# For gateway MEDIA delivery, write inside Docker to /output/... and emit
# the host-visible path in MEDIA:, not the container path.
"docker_volumes": [],
# Explicit opt-in: mount the host cwd into /workspace for Docker sessions.
# Default off because passing host directories into a sandbox weakens isolation.
+7
View File
@@ -7002,6 +7002,13 @@ For more help on a command:
wh_sub.add_argument(
"--secret", default="", help="HMAC secret (auto-generated if omitted)"
)
wh_sub.add_argument(
"--deliver-only",
action="store_true",
help="Skip the agent — deliver the rendered prompt directly as the "
"message. Zero LLM cost. Requires --deliver to be a real target "
"(not 'log').",
)
webhook_subparsers.add_parser(
"list", aliases=["ls"], help="List all dynamic subscriptions"
+1 -1
View File
@@ -552,7 +552,7 @@ CANONICAL_PROVIDERS: list[ProviderEntry] = [
ProviderEntry("copilot", "GitHub Copilot", "GitHub Copilot (uses GITHUB_TOKEN or gh auth token)"),
ProviderEntry("copilot-acp", "GitHub Copilot ACP", "GitHub Copilot ACP (spawns `copilot --acp --stdio`)"),
ProviderEntry("huggingface", "Hugging Face", "Hugging Face Inference Providers (20+ open models)"),
ProviderEntry("gemini", "Google AI Studio", "Google AI Studio (Gemini models — OpenAI-compatible endpoint)"),
ProviderEntry("gemini", "Google AI Studio", "Google AI Studio (Gemini models — native Gemini API)"),
ProviderEntry("google-gemini-cli", "Google Gemini (OAuth)", "Google Gemini via OAuth + Code Assist (free tier supported; no API key needed)"),
ProviderEntry("deepseek", "DeepSeek", "DeepSeek (DeepSeek-V3, R1, coder — direct API)"),
ProviderEntry("xai", "xAI", "xAI (Grok models — direct API)"),
+15 -1
View File
@@ -155,6 +155,15 @@ def _cmd_subscribe(args):
"created_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
}
if getattr(args, "deliver_only", False):
if route["deliver"] == "log":
print(
"Error: --deliver-only requires --deliver to be a real target "
"(telegram, discord, slack, github_comment, etc.) — not 'log'."
)
return
route["deliver_only"] = True
if args.deliver_chat_id:
route["deliver_extra"] = {"chat_id": args.deliver_chat_id}
@@ -172,9 +181,12 @@ def _cmd_subscribe(args):
else:
print(" Events: (all)")
print(f" Deliver: {route['deliver']}")
if route.get("deliver_only"):
print(" Mode: direct delivery (no agent, zero LLM cost)")
if route.get("prompt"):
prompt_preview = route["prompt"][:80] + ("..." if len(route["prompt"]) > 80 else "")
print(f" Prompt: {prompt_preview}")
label = "Message" if route.get("deliver_only") else "Prompt"
print(f" {label}: {prompt_preview}")
print(f"\n Configure your service to POST to the URL above.")
print(f" Use the secret for HMAC-SHA256 signature validation.")
print(f" The gateway must be running to receive events (hermes gateway run).\n")
@@ -192,6 +204,8 @@ def _cmd_list(args):
for name, route in subs.items():
events = ", ".join(route.get("events", [])) or "(all)"
deliver = route.get("deliver", "log")
if route.get("deliver_only"):
deliver = f"{deliver} (direct — no agent)"
desc = route.get("description", "")
print(f"{name}")
if desc:
@@ -7,7 +7,7 @@ license: MIT
metadata:
hermes:
tags: [telephony, phone, sms, mms, voice, twilio, bland.ai, vapi, calling, texting]
related_skills: [find-nearby, google-workspace, agentmail]
related_skills: [maps, google-workspace, agentmail]
category: productivity
---
+4 -4
View File
@@ -4,7 +4,7 @@
Add a first-class `gemini` provider that authenticates via Google OAuth, using the standard Gemini API (not Cloud Code Assist). Users who have a Google AI subscription or Gemini API access can authenticate through the browser without needing to manually copy API keys.
## Architecture Decision
- **Path A (chosen):** Standard Gemini API at `generativelanguage.googleapis.com/v1beta/openai/`
- **Path A (chosen):** Standard Gemini API at `generativelanguage.googleapis.com/v1beta`
- **NOT Path B:** Cloud Code Assist (`cloudcode-pa.googleapis.com`) — rate-limited free tier, internal API, account ban risk
- Standard `chat_completions` api_mode via OpenAI SDK — no new api_mode needed
- Our own OAuth credentials — NOT sharing tokens with Gemini CLI
@@ -32,9 +32,9 @@ Add a first-class `gemini` provider that authenticates via Google OAuth, using t
- File locking for concurrent access (multiple agent sessions)
## API Integration
- Base URL: `https://generativelanguage.googleapis.com/v1beta/openai/`
- Auth: `Authorization: Bearer <access_token>` (passed as `api_key` to OpenAI SDK)
- api_mode: `chat_completions` (standard)
- Base URL: `https://generativelanguage.googleapis.com/v1beta`
- Auth: native Gemini API authentication handled by the provider adapter
- api_mode: `chat_completions` (standard facade over native transport)
- Models: gemini-2.5-pro, gemini-2.5-flash, gemini-2.0-flash, etc.
## Files to Create/Modify
+66 -31
View File
@@ -1271,6 +1271,10 @@ class AIAgent:
_agent_cfg = _load_agent_config()
except Exception:
_agent_cfg = {}
# Cache only the derived auxiliary compression context override that is
# needed later by the startup feasibility check. Avoid exposing a
# broad pseudo-public config object on the agent instance.
self._aux_compression_context_length_config = None
# Persistent memory (MEMORY.md + USER.md) -- loaded from disk
self._memory_store = None
@@ -1401,6 +1405,24 @@ class AIAgent:
compression_target_ratio = float(_compression_cfg.get("target_ratio", 0.20))
compression_protect_last = int(_compression_cfg.get("protect_last_n", 20))
# Read optional explicit context_length override for the auxiliary
# compression model. Custom endpoints often cannot report this via
# /models, so the startup feasibility check needs the config hint.
try:
_aux_cfg = _agent_cfg.get("auxiliary", {}).get("compression", {})
except Exception:
_aux_cfg = {}
if isinstance(_aux_cfg, dict):
_aux_context_config = _aux_cfg.get("context_length")
else:
_aux_context_config = None
if _aux_context_config is not None:
try:
_aux_context_config = int(_aux_context_config)
except (TypeError, ValueError):
_aux_context_config = None
self._aux_compression_context_length_config = _aux_context_config
# Read explicit context_length override from model config
_model_cfg = _agent_cfg.get("model", {})
if isinstance(_model_cfg, dict):
@@ -1998,24 +2020,11 @@ class AIAgent:
aux_base_url = str(getattr(client, "base_url", ""))
aux_api_key = str(getattr(client, "api_key", ""))
# Read user-configured context_length for the compression model.
# Custom endpoints often don't support /models API queries so
# get_model_context_length() falls through to the 128K default,
# ignoring the explicit config value. Pass it as the highest-
# priority hint so the configured value is always respected.
_aux_cfg = (self.config or {}).get("auxiliary", {}).get("compression", {})
_aux_context_config = _aux_cfg.get("context_length") if isinstance(_aux_cfg, dict) else None
if _aux_context_config is not None:
try:
_aux_context_config = int(_aux_context_config)
except (TypeError, ValueError):
_aux_context_config = None
aux_context = get_model_context_length(
aux_model,
base_url=aux_base_url,
api_key=aux_api_key,
config_context_length=_aux_context_config,
config_context_length=getattr(self, "_aux_compression_context_length_config", None),
)
threshold = self.context_compressor.threshold_tokens
@@ -4644,6 +4653,25 @@ class AIAgent:
return bool(getattr(http_client, "is_closed", False))
return False
@staticmethod
def _build_keepalive_http_client() -> Any:
try:
import httpx as _httpx
import socket as _socket
_sock_opts = [(_socket.SOL_SOCKET, _socket.SO_KEEPALIVE, 1)]
if hasattr(_socket, "TCP_KEEPIDLE"):
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPIDLE, 30))
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPINTVL, 10))
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPCNT, 3))
elif hasattr(_socket, "TCP_KEEPALIVE"):
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPALIVE, 30))
return _httpx.Client(
transport=_httpx.HTTPTransport(socket_options=_sock_opts),
)
except Exception:
return None
def _create_openai_client(self, client_kwargs: dict, *, reason: str, shared: bool) -> Any:
from agent.auxiliary_client import _validate_base_url, _validate_proxy_env_urls
# Treat client_kwargs as read-only. Callers pass self._client_kwargs (or shallow
@@ -4684,6 +4712,27 @@ class AIAgent:
self._client_log_context(),
)
return client
if self.provider == "gemini":
from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url
base_url = str(client_kwargs.get("base_url", "") or "")
if is_native_gemini_base_url(base_url):
safe_kwargs = {
k: v for k, v in client_kwargs.items()
if k in {"api_key", "base_url", "default_headers", "timeout", "http_client"}
}
if "http_client" not in safe_kwargs:
keepalive_http = self._build_keepalive_http_client()
if keepalive_http is not None:
safe_kwargs["http_client"] = keepalive_http
client = GeminiNativeClient(**safe_kwargs)
logger.info(
"Gemini native client created (%s, shared=%s) %s",
reason,
shared,
self._client_log_context(),
)
return client
# Inject TCP keepalives so the kernel detects dead provider connections
# instead of letting them sit silently in CLOSE-WAIT (#10324). Without
# this, a peer that drops mid-stream leaves the socket in a state where
@@ -4702,23 +4751,9 @@ class AIAgent:
# Tests in ``tests/run_agent/test_create_openai_client_reuse.py`` and
# ``tests/run_agent/test_sequential_chats_live.py`` pin this invariant.
if "http_client" not in client_kwargs:
try:
import httpx as _httpx
import socket as _socket
_sock_opts = [(_socket.SOL_SOCKET, _socket.SO_KEEPALIVE, 1)]
if hasattr(_socket, "TCP_KEEPIDLE"):
# Linux
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPIDLE, 30))
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPINTVL, 10))
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPCNT, 3))
elif hasattr(_socket, "TCP_KEEPALIVE"):
# macOS (uses TCP_KEEPALIVE instead of TCP_KEEPIDLE)
_sock_opts.append((_socket.IPPROTO_TCP, _socket.TCP_KEEPALIVE, 30))
client_kwargs["http_client"] = _httpx.Client(
transport=_httpx.HTTPTransport(socket_options=_sock_opts),
)
except Exception:
pass # Fall through to default transport if socket opts fail
keepalive_http = self._build_keepalive_http_client()
if keepalive_http is not None:
client_kwargs["http_client"] = keepalive_http
client = OpenAI(**client_kwargs)
logger.info(
"OpenAI client created (%s, shared=%s) %s",
+2
View File
@@ -77,6 +77,8 @@ AUTHOR_MAP = {
"Asunfly@users.noreply.github.com": "Asunfly",
"2500400+honghua@users.noreply.github.com": "honghua",
"nish3451@users.noreply.github.com": "nish3451",
"Mibayy@users.noreply.github.com": "Mibayy",
"135070653+sgaofen@users.noreply.github.com": "sgaofen",
# contributors (manual mapping from git names)
"ahmedsherif95@gmail.com": "asheriif",
"liujinkun@bytedance.com": "liujinkun2025",
+26 -3
View File
@@ -1,10 +1,10 @@
---
name: webhook-subscriptions
description: Create and manage webhook subscriptions for event-driven agent activation. Use when the user wants external services to trigger agent runs automatically.
version: 1.0.0
description: Create and manage webhook subscriptions for event-driven agent activation, or for direct push notifications (zero LLM cost). Use when the user wants external services to trigger agent runs OR push notifications to chats.
version: 1.1.0
metadata:
hermes:
tags: [webhook, events, automation, integrations]
tags: [webhook, events, automation, integrations, notifications, push]
---
# Webhook Subscriptions
@@ -154,6 +154,29 @@ hermes webhook subscribe alerts \
--deliver origin
```
### Direct delivery (no agent, zero LLM cost)
For use cases where you just want to push a notification through to a user's chat — no reasoning, no agent loop — add `--deliver-only`. The rendered `--prompt` template becomes the literal message body and is dispatched directly to the target adapter.
Use this for:
- External service push notifications (Supabase/Firebase webhooks → Telegram)
- Monitoring alerts that should forward verbatim
- Inter-agent pings where one agent is telling another agent's user something
- Any webhook where an LLM round trip would be wasted effort
```bash
hermes webhook subscribe antenna-matches \
--deliver telegram \
--deliver-chat-id "123456789" \
--deliver-only \
--prompt "🎉 New match: {match.user_name} matched with you!" \
--description "Antenna match notifications"
```
The POST returns `200 OK` on successful delivery, `502` on target failure — so upstream services can retry intelligently. HMAC auth, rate limits, and idempotency still apply.
Requires `--deliver` to be a real target (telegram, discord, slack, github_comment, etc.) — `--deliver log` is rejected because log-only direct delivery is pointless.
## Security
- Each subscription gets an auto-generated HMAC-SHA256 secret (or provide your own with `--secret`)
-69
View File
@@ -1,69 +0,0 @@
---
name: find-nearby
description: Find nearby places (restaurants, cafes, bars, pharmacies, etc.) using OpenStreetMap. Works with coordinates, addresses, cities, zip codes, or Telegram location pins. No API keys needed.
version: 1.0.0
metadata:
hermes:
tags: [location, maps, nearby, places, restaurants, local]
related_skills: []
---
# Find Nearby — Local Place Discovery
Find restaurants, cafes, bars, pharmacies, and other places near any location. Uses OpenStreetMap (free, no API keys). Works with:
- **Coordinates** from Telegram location pins (latitude/longitude in conversation)
- **Addresses** ("near 123 Main St, Springfield")
- **Cities** ("restaurants in downtown Austin")
- **Zip codes** ("pharmacies near 90210")
- **Landmarks** ("cafes near Times Square")
## Quick Reference
```bash
# By coordinates (from Telegram location pin or user-provided)
python3 SKILL_DIR/scripts/find_nearby.py --lat <LAT> --lon <LON> --type restaurant --radius 1500
# By address, city, or landmark (auto-geocoded)
python3 SKILL_DIR/scripts/find_nearby.py --near "Times Square, New York" --type cafe
# Multiple place types
python3 SKILL_DIR/scripts/find_nearby.py --near "downtown austin" --type restaurant --type bar --limit 10
# JSON output
python3 SKILL_DIR/scripts/find_nearby.py --near "90210" --type pharmacy --json
```
### Parameters
| Flag | Description | Default |
|------|-------------|---------|
| `--lat`, `--lon` | Exact coordinates | — |
| `--near` | Address, city, zip, or landmark (geocoded) | — |
| `--type` | Place type (repeatable for multiple) | restaurant |
| `--radius` | Search radius in meters | 1500 |
| `--limit` | Max results | 15 |
| `--json` | Machine-readable JSON output | off |
### Common Place Types
`restaurant`, `cafe`, `bar`, `pub`, `fast_food`, `pharmacy`, `hospital`, `bank`, `atm`, `fuel`, `parking`, `supermarket`, `convenience`, `hotel`
## Workflow
1. **Get the location.** Look for coordinates (`latitude: ... / longitude: ...`) from a Telegram pin, or ask the user for an address/city/zip.
2. **Ask for preferences** (only if not already stated): place type, how far they're willing to go, any specifics (cuisine, "open now", etc.).
3. **Run the script** with appropriate flags. Use `--json` if you need to process results programmatically.
4. **Present results** with names, distances, and Google Maps links. If the user asked about hours or "open now," check the `hours` field in results — if missing or unclear, verify with `web_search`.
5. **For directions**, use the `directions_url` from results, or construct: `https://www.google.com/maps/dir/?api=1&origin=<LAT>,<LON>&destination=<LAT>,<LON>`
## Tips
- If results are sparse, widen the radius (1500 → 3000m)
- For "open now" requests: check the `hours` field in results, cross-reference with `web_search` for accuracy since OSM hours aren't always complete
- Zip codes alone can be ambiguous globally — prompt the user for country/state if results look wrong
- The script uses OpenStreetMap data which is community-maintained; coverage varies by region
@@ -1,184 +0,0 @@
#!/usr/bin/env python3
"""Find nearby places using OpenStreetMap (Overpass + Nominatim). No API keys needed.
Usage:
# By coordinates
python find_nearby.py --lat 36.17 --lon -115.14 --type restaurant --radius 1500
# By address/city/zip (auto-geocoded)
python find_nearby.py --near "Times Square, New York" --type cafe --radius 1000
python find_nearby.py --near "90210" --type pharmacy
# Multiple types
python find_nearby.py --lat 36.17 --lon -115.14 --type restaurant --type bar
# JSON output for programmatic use
python find_nearby.py --near "downtown las vegas" --type restaurant --json
"""
import argparse
import json
import math
import sys
import urllib.parse
import urllib.request
from typing import Any
OVERPASS_URLS = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
]
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
USER_AGENT = "HermesAgent/1.0 (find-nearby skill)"
TIMEOUT = 15
def _http_get(url: str) -> Any:
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
return json.loads(r.read())
def _http_post(url: str, data: str) -> Any:
req = urllib.request.Request(
url, data=data.encode(), headers={"User-Agent": USER_AGENT}
)
with urllib.request.urlopen(req, timeout=TIMEOUT) as r:
return json.loads(r.read())
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Distance in meters between two coordinates."""
R = 6_371_000
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def geocode(query: str) -> tuple[float, float]:
"""Convert address/city/zip to coordinates via Nominatim."""
params = urllib.parse.urlencode({"q": query, "format": "json", "limit": 1})
results = _http_get(f"{NOMINATIM_URL}?{params}")
if not results:
print(f"Error: Could not geocode '{query}'. Try a more specific address.", file=sys.stderr)
sys.exit(1)
return float(results[0]["lat"]), float(results[0]["lon"])
def find_nearby(lat: float, lon: float, types: list[str], radius: int = 1500, limit: int = 15) -> list[dict]:
"""Query Overpass for nearby amenities."""
# Build Overpass QL query
type_filters = "".join(
f'nwr["amenity"="{t}"](around:{radius},{lat},{lon});' for t in types
)
query = f"[out:json][timeout:{TIMEOUT}];({type_filters});out center tags;"
# Try each Overpass server
data = None
for url in OVERPASS_URLS:
try:
data = _http_post(url, f"data={urllib.parse.quote(query)}")
break
except Exception:
continue
if not data:
return []
# Parse results
places = []
for el in data.get("elements", []):
tags = el.get("tags", {})
name = tags.get("name")
if not name:
continue
# Get coordinates (nodes have lat/lon directly, ways/relations use center)
plat = el.get("lat") or (el.get("center", {}) or {}).get("lat")
plon = el.get("lon") or (el.get("center", {}) or {}).get("lon")
if plat is None or plon is None:
continue
dist = haversine(lat, lon, plat, plon)
place = {
"name": name,
"type": tags.get("amenity", ""),
"distance_m": round(dist),
"lat": plat,
"lon": plon,
"maps_url": f"https://www.google.com/maps/search/?api=1&query={plat},{plon}",
"directions_url": f"https://www.google.com/maps/dir/?api=1&origin={lat},{lon}&destination={plat},{plon}",
}
# Add useful optional fields
if tags.get("cuisine"):
place["cuisine"] = tags["cuisine"]
if tags.get("opening_hours"):
place["hours"] = tags["opening_hours"]
if tags.get("phone"):
place["phone"] = tags["phone"]
if tags.get("website"):
place["website"] = tags["website"]
if tags.get("addr:street"):
addr_parts = [tags.get("addr:housenumber", ""), tags.get("addr:street", "")]
if tags.get("addr:city"):
addr_parts.append(tags["addr:city"])
place["address"] = " ".join(p for p in addr_parts if p)
places.append(place)
# Sort by distance, limit results
places.sort(key=lambda p: p["distance_m"])
return places[:limit]
def main():
parser = argparse.ArgumentParser(description="Find nearby places via OpenStreetMap")
parser.add_argument("--lat", type=float, help="Latitude")
parser.add_argument("--lon", type=float, help="Longitude")
parser.add_argument("--near", type=str, help="Address, city, or zip code (geocoded automatically)")
parser.add_argument("--type", action="append", dest="types", default=[], help="Place type (restaurant, cafe, bar, pharmacy, etc.)")
parser.add_argument("--radius", type=int, default=1500, help="Search radius in meters (default: 1500)")
parser.add_argument("--limit", type=int, default=15, help="Max results (default: 15)")
parser.add_argument("--json", action="store_true", dest="json_output", help="Output as JSON")
args = parser.parse_args()
# Resolve coordinates
if args.near:
lat, lon = geocode(args.near)
elif args.lat is not None and args.lon is not None:
lat, lon = args.lat, args.lon
else:
print("Error: Provide --lat/--lon or --near", file=sys.stderr)
sys.exit(1)
if not args.types:
args.types = ["restaurant"]
places = find_nearby(lat, lon, args.types, args.radius, args.limit)
if args.json_output:
print(json.dumps({"origin": {"lat": lat, "lon": lon}, "results": places, "count": len(places)}, indent=2))
else:
if not places:
print(f"No {'/'.join(args.types)} found within {args.radius}m")
return
print(f"Found {len(places)} places within {args.radius}m:\n")
for i, p in enumerate(places, 1):
dist_str = f"{p['distance_m']}m" if p["distance_m"] < 1000 else f"{p['distance_m']/1000:.1f}km"
print(f" {i}. {p['name']} ({p['type']}) — {dist_str}")
if p.get("cuisine"):
print(f" Cuisine: {p['cuisine']}")
if p.get("hours"):
print(f" Hours: {p['hours']}")
if p.get("address"):
print(f" Address: {p['address']}")
print(f" Map: {p['maps_url']}")
print()
if __name__ == "__main__":
main()
+1 -1
View File
@@ -1,3 +1,3 @@
---
description: Skills for working with MCP (Model Context Protocol) servers, tools, and integrations. Includes the built-in native MCP client (configure servers in config.yaml for automatic tool discovery) and the mcporter CLI bridge for ad-hoc server interaction.
description: Skills for working with MCP (Model Context Protocol) servers, tools, and integrations. Documents the built-in native MCP client configure servers in config.yaml for automatic tool discovery.
---
-3
View File
@@ -1,3 +0,0 @@
---
description: GPU cloud providers and serverless compute platforms for ML workloads.
---
+1 -1
View File
@@ -1,3 +1,3 @@
---
description: Specific model architectures and tools — computer vision (CLIP, SAM, Stable Diffusion), speech (Whisper), audio generation (AudioCraft), and multimodal models (LLaVA).
description: Specific model architectures and tools — image segmentation (Segment Anything / SAM) and audio generation (AudioCraft / MusicGen). Additional model skills (CLIP, Stable Diffusion, Whisper, LLaVA) are available as optional skills.
---
+198
View File
@@ -0,0 +1,198 @@
---
name: maps
description: >
Location intelligence — geocode a place, reverse-geocode coordinates,
find nearby places (44 POI categories), driving/walking/cycling
distance + time, turn-by-turn directions, timezone lookup, bounding
box + area for a named place, and POI search within a rectangle.
Uses OpenStreetMap + Overpass + OSRM. Free, no API key.
version: 1.2.0
author: Mibayy
license: MIT
metadata:
hermes:
tags: [maps, geocoding, places, routing, distance, directions, nearby, location, openstreetmap, nominatim, overpass, osrm]
category: productivity
requires_toolsets: [terminal]
supersedes: [find-nearby]
---
# Maps Skill
Location intelligence using free, open data sources. 8 commands, 44 POI
categories, zero dependencies (Python stdlib only), no API key required.
Data sources: OpenStreetMap/Nominatim, Overpass API, OSRM, TimeAPI.io.
This skill supersedes the old `find-nearby` skill — all of find-nearby's
functionality is covered by the `nearby` command below, with the same
`--near "<place>"` shortcut and multi-category support.
## When to Use
- User sends a Telegram location pin (latitude/longitude in the message) → `nearby`
- User wants coordinates for a place name → `search`
- User has coordinates and wants the address → `reverse`
- User asks for nearby restaurants, hospitals, pharmacies, hotels, etc. → `nearby`
- User wants driving/walking/cycling distance or travel time → `distance`
- User wants turn-by-turn directions between two places → `directions`
- User wants timezone information for a location → `timezone`
- User wants to search for POIs within a geographic area → `area` + `bbox`
## Prerequisites
Python 3.8+ (stdlib only — no pip installs needed).
Script path: `~/.hermes/skills/maps/scripts/maps_client.py`
## Commands
```bash
MAPS=~/.hermes/skills/maps/scripts/maps_client.py
```
### search — Geocode a place name
```bash
python3 $MAPS search "Eiffel Tower"
python3 $MAPS search "1600 Pennsylvania Ave, Washington DC"
```
Returns: lat, lon, display name, type, bounding box, importance score.
### reverse — Coordinates to address
```bash
python3 $MAPS reverse 48.8584 2.2945
```
Returns: full address breakdown (street, city, state, country, postcode).
### nearby — Find places by category
```bash
# By coordinates (from a Telegram location pin, for example)
python3 $MAPS nearby 48.8584 2.2945 restaurant --limit 10
python3 $MAPS nearby 40.7128 -74.0060 hospital --radius 2000
# By address / city / zip / landmark — --near auto-geocodes
python3 $MAPS nearby --near "Times Square, New York" --category cafe
python3 $MAPS nearby --near "90210" --category pharmacy
# Multiple categories merged into one query
python3 $MAPS nearby --near "downtown austin" --category restaurant --category bar --limit 10
```
44 categories: restaurant, cafe, bar, hospital, pharmacy, hotel, supermarket,
atm, gas_station, parking, museum, park, school, university, bank, police,
fire_station, library, airport, train_station, bus_stop, church, mosque,
synagogue, dentist, doctor, cinema, theatre, gym, swimming_pool, post_office,
convenience_store, bakery, bookshop, laundry, car_wash, car_rental,
bicycle_rental, taxi, veterinary, zoo, playground, stadium, nightclub.
Each result includes: `name`, `address`, `lat`/`lon`, `distance_m`,
`maps_url` (clickable Google Maps link), `directions_url` (Google Maps
directions from the search point), and promoted tags when available —
`cuisine`, `hours` (opening_hours), `phone`, `website`.
### distance — Travel distance and time
```bash
python3 $MAPS distance "Paris" --to "Lyon"
python3 $MAPS distance "New York" --to "Boston" --mode driving
python3 $MAPS distance "Big Ben" --to "Tower Bridge" --mode walking
```
Modes: driving (default), walking, cycling. Returns road distance, duration,
and straight-line distance for comparison.
### directions — Turn-by-turn navigation
```bash
python3 $MAPS directions "Eiffel Tower" --to "Louvre Museum" --mode walking
python3 $MAPS directions "JFK Airport" --to "Times Square" --mode driving
```
Returns numbered steps with instruction, distance, duration, road name, and
maneuver type (turn, depart, arrive, etc.).
### timezone — Timezone for coordinates
```bash
python3 $MAPS timezone 48.8584 2.2945
python3 $MAPS timezone 35.6762 139.6503
```
Returns timezone name, UTC offset, and current local time.
### area — Bounding box and area for a place
```bash
python3 $MAPS area "Manhattan, New York"
python3 $MAPS area "London"
```
Returns bounding box coordinates, width/height in km, and approximate area.
Useful as input for the bbox command.
### bbox — Search within a bounding box
```bash
python3 $MAPS bbox 40.75 -74.00 40.77 -73.98 restaurant --limit 20
```
Finds POIs within a geographic rectangle. Use `area` first to get the
bounding box coordinates for a named place.
## Working With Telegram Location Pins
When a user sends a location pin, the message contains `latitude:` and
`longitude:` fields. Extract those and pass them straight to `nearby`:
```bash
# User sent a pin at 36.17, -115.14 and asked "find cafes nearby"
python3 $MAPS nearby 36.17 -115.14 cafe --radius 1500
```
Present results as a numbered list with names, distances, and the
`maps_url` field so the user gets a tap-to-open link in chat. For "open
now?" questions, check the `hours` field; if missing or unclear, verify
with `web_search` since OSM hours are community-maintained and not always
current.
## Workflow Examples
**"Find Italian restaurants near the Colosseum":**
1. `nearby --near "Colosseum Rome" --category restaurant --radius 500`
— one command, auto-geocoded
**"What's near this location pin they sent?":**
1. Extract lat/lon from the Telegram message
2. `nearby LAT LON cafe --radius 1500`
**"How do I walk from hotel to conference center?":**
1. `directions "Hotel Name" --to "Conference Center" --mode walking`
**"What restaurants are in downtown Seattle?":**
1. `area "Downtown Seattle"` → get bounding box
2. `bbox S W N E restaurant --limit 30`
## Pitfalls
- Nominatim ToS: max 1 req/s (handled automatically by the script)
- `nearby` requires lat/lon OR `--near "<address>"` — one of the two is needed
- OSRM routing coverage is best for Europe and North America
- Overpass API can be slow during peak hours; the script automatically
falls back between mirrors (overpass-api.de → overpass.kumi.systems)
- `distance` and `directions` use `--to` flag for the destination (not positional)
- If a zip code alone gives ambiguous results globally, include country/state
## Verification
```bash
python3 ~/.hermes/skills/maps/scripts/maps_client.py search "Statue of Liberty"
# Should return lat ~40.689, lon ~-74.044
python3 ~/.hermes/skills/maps/scripts/maps_client.py nearby --near "Times Square" --category restaurant --limit 3
# Should return a list of restaurants within ~500m of Times Square
```
File diff suppressed because it is too large Load Diff
+18
View File
@@ -909,6 +909,24 @@ class TestStaleBaseUrlWarning:
assert not any("OPENAI_BASE_URL is set" in rec.message for rec in caplog.records), \
"Should NOT warn when OPENAI_BASE_URL is not set"
def test_gemini_main_runtime_preserves_explicit_base_url(self, monkeypatch):
monkeypatch.setenv("GOOGLE_API_KEY", "AIza_ENV")
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_native, \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
mock_openai.return_value = MagicMock(base_url="https://generativelanguage.googleapis.com/v1beta/openai")
client, model = _resolve_auto(main_runtime={
"provider": "gemini",
"model": "gemini-2.5-flash",
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
"api_key": "AIza_RUNTIME",
"api_mode": "chat_completions",
})
mock_native.assert_not_called()
mock_openai.assert_called_once()
assert model == "gemini-2.5-flash"
assert str(getattr(client, "base_url", "")) == "https://generativelanguage.googleapis.com/v1beta/openai"
# ---------------------------------------------------------------------------
# Anthropic-compatible image block conversion
# ---------------------------------------------------------------------------
+289
View File
@@ -0,0 +1,289 @@
"""Tests for the native Google AI Studio Gemini adapter."""
from __future__ import annotations
import json
from types import SimpleNamespace
import pytest
class DummyResponse:
def __init__(self, status_code=200, payload=None, headers=None, text=None):
self.status_code = status_code
self._payload = payload or {}
self.headers = headers or {}
self.text = text if text is not None else json.dumps(self._payload)
def json(self):
return self._payload
def test_build_native_request_preserves_thought_signature_on_tool_replay():
from agent.gemini_native_adapter import build_gemini_request
request = build_gemini_request(
messages=[
{"role": "system", "content": "Be helpful."},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"city": "Paris"}',
},
"extra_content": {
"google": {"thought_signature": "sig-123"}
},
}
],
},
],
tools=[],
tool_choice=None,
)
parts = request["contents"][0]["parts"]
assert parts[0]["functionCall"]["name"] == "get_weather"
assert parts[0]["thoughtSignature"] == "sig-123"
def test_build_native_request_uses_original_function_name_for_tool_result():
from agent.gemini_native_adapter import build_gemini_request
request = build_gemini_request(
messages=[
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"city": "Paris"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": '{"forecast": "sunny"}',
},
],
tools=[],
tool_choice=None,
)
tool_response = request["contents"][1]["parts"][0]["functionResponse"]
assert tool_response["name"] == "get_weather"
def test_translate_native_response_surfaces_reasoning_and_tool_calls():
from agent.gemini_native_adapter import translate_gemini_response
payload = {
"candidates": [
{
"content": {
"parts": [
{"thought": True, "text": "thinking..."},
{"functionCall": {"name": "search", "args": {"q": "hermes"}}},
]
},
"finishReason": "STOP",
}
],
"usageMetadata": {
"promptTokenCount": 10,
"candidatesTokenCount": 5,
"totalTokenCount": 15,
},
}
response = translate_gemini_response(payload, model="gemini-2.5-flash")
choice = response.choices[0]
assert choice.finish_reason == "tool_calls"
assert choice.message.reasoning == "thinking..."
assert choice.message.tool_calls[0].function.name == "search"
assert json.loads(choice.message.tool_calls[0].function.arguments) == {"q": "hermes"}
def test_native_client_uses_x_goog_api_key_and_native_models_endpoint(monkeypatch):
from agent.gemini_native_adapter import GeminiNativeClient
recorded = {}
class DummyHTTP:
def post(self, url, json=None, headers=None, timeout=None):
recorded["url"] = url
recorded["json"] = json
recorded["headers"] = headers
return DummyResponse(
payload={
"candidates": [
{
"content": {"parts": [{"text": "hello"}]},
"finishReason": "STOP",
}
],
"usageMetadata": {
"promptTokenCount": 1,
"candidatesTokenCount": 1,
"totalTokenCount": 2,
},
}
)
def close(self):
return None
monkeypatch.setattr("agent.gemini_native_adapter.httpx.Client", lambda *a, **k: DummyHTTP())
client = GeminiNativeClient(api_key="AIza-test", base_url="https://generativelanguage.googleapis.com/v1beta")
response = client.chat.completions.create(
model="gemini-2.5-flash",
messages=[{"role": "user", "content": "Hello"}],
)
assert recorded["url"] == "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent"
assert recorded["headers"]["x-goog-api-key"] == "AIza-test"
assert "Authorization" not in recorded["headers"]
assert response.choices[0].message.content == "hello"
def test_native_http_error_keeps_status_and_retry_after():
from agent.gemini_native_adapter import gemini_http_error
response = DummyResponse(
status_code=429,
headers={"Retry-After": "17"},
payload={
"error": {
"code": 429,
"message": "quota exhausted",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"@type": "type.googleapis.com/google.rpc.ErrorInfo",
"reason": "RESOURCE_EXHAUSTED",
"metadata": {"service": "generativelanguage.googleapis.com"},
}
],
}
},
)
err = gemini_http_error(response)
assert getattr(err, "status_code", None) == 429
assert getattr(err, "retry_after", None) == 17.0
assert "quota exhausted" in str(err)
def test_iter_sse_events_supports_multiline_data_frames():
from agent.gemini_native_adapter import _iter_sse_events
class DummyStreamResponse:
def iter_text(self):
yield 'data: {"candidates":\n'
yield 'data: [{"content":{"parts":[{"text":"hello"}]},"finishReason":"STOP"}]}\n\n'
yield 'data: [DONE]\n\n'
events = list(_iter_sse_events(DummyStreamResponse()))
assert len(events) == 1
assert events[0]["candidates"][0]["content"]["parts"][0]["text"] == "hello"
def test_native_client_accepts_injected_http_client():
from agent.gemini_native_adapter import GeminiNativeClient
injected = SimpleNamespace(close=lambda: None)
client = GeminiNativeClient(api_key="AIza-test", http_client=injected)
assert client._http is injected
@pytest.mark.asyncio
async def test_async_native_client_streams_without_requiring_async_iterator_from_sync_client():
from agent.gemini_native_adapter import AsyncGeminiNativeClient
chunk = SimpleNamespace(choices=[SimpleNamespace(delta=SimpleNamespace(content="hi"), finish_reason=None)])
sync_stream = iter([chunk])
def _advance(iterator):
try:
return False, next(iterator)
except StopIteration:
return True, None
sync_client = SimpleNamespace(
api_key="AIza-test",
base_url="https://generativelanguage.googleapis.com/v1beta",
chat=SimpleNamespace(completions=SimpleNamespace(create=lambda **kwargs: sync_stream)),
_advance_stream_iterator=_advance,
close=lambda: None,
)
async_client = AsyncGeminiNativeClient(sync_client)
stream = await async_client.chat.completions.create(stream=True)
collected = []
async for item in stream:
collected.append(item)
assert collected == [chunk]
def test_stream_event_translation_emits_tool_call_delta_with_stable_index():
from agent.gemini_native_adapter import translate_stream_event
tool_call_indices = {}
event = {
"candidates": [
{
"content": {
"parts": [
{"functionCall": {"name": "search", "args": {"q": "abc"}}}
]
},
"finishReason": "STOP",
}
]
}
first = translate_stream_event(event, model="gemini-2.5-flash", tool_call_indices=tool_call_indices)
second = translate_stream_event(event, model="gemini-2.5-flash", tool_call_indices=tool_call_indices)
assert first[0].choices[0].delta.tool_calls[0].index == 0
assert second[0].choices[0].delta.tool_calls[0].index == 0
assert first[0].choices[0].delta.tool_calls[0].id == second[0].choices[0].delta.tool_calls[0].id
assert first[0].choices[0].delta.tool_calls[0].function.arguments == '{"q": "abc"}'
assert second[0].choices[0].delta.tool_calls[0].function.arguments == ""
assert first[-1].choices[0].finish_reason == "tool_calls"
def test_stream_event_translation_keeps_identical_calls_in_distinct_parts():
from agent.gemini_native_adapter import translate_stream_event
event = {
"candidates": [
{
"content": {
"parts": [
{"functionCall": {"name": "search", "args": {"q": "abc"}}},
{"functionCall": {"name": "search", "args": {"q": "abc"}}},
]
},
"finishReason": "STOP",
}
]
}
chunks = translate_stream_event(event, model="gemini-2.5-flash", tool_call_indices={})
tool_chunks = [chunk for chunk in chunks if chunk.choices[0].delta.tool_calls]
assert tool_chunks[0].choices[0].delta.tool_calls[0].index == 0
assert tool_chunks[1].choices[0].delta.tool_calls[0].index == 1
assert tool_chunks[0].choices[0].delta.tool_calls[0].id != tool_chunks[1].choices[0].delta.tool_calls[0].id
+4 -4
View File
@@ -1024,7 +1024,7 @@ class TestRunJobSkillBacked:
"id": "multi-skill-job",
"name": "multi skill test",
"prompt": "Combine the results.",
"skills": ["blogwatcher", "find-nearby"],
"skills": ["blogwatcher", "maps"],
}
fake_db = MagicMock()
@@ -1057,12 +1057,12 @@ class TestRunJobSkillBacked:
assert error is None
assert final_response == "ok"
assert skill_view_mock.call_count == 2
assert [call.args[0] for call in skill_view_mock.call_args_list] == ["blogwatcher", "find-nearby"]
assert [call.args[0] for call in skill_view_mock.call_args_list] == ["blogwatcher", "maps"]
prompt_arg = mock_agent.run_conversation.call_args.args[0]
assert prompt_arg.index("blogwatcher") < prompt_arg.index("find-nearby")
assert prompt_arg.index("blogwatcher") < prompt_arg.index("maps")
assert "Instructions for blogwatcher." in prompt_arg
assert "Instructions for find-nearby." in prompt_arg
assert "Instructions for maps." in prompt_arg
assert "Combine the results." in prompt_arg
@@ -0,0 +1,148 @@
"""Regression test: cancel_background_tasks must drain late-arrival tasks.
During gateway shutdown, a message arriving while
cancel_background_tasks is mid-await can spawn a fresh
_process_message_background task via handle_message, which is added
to self._background_tasks. Without the re-drain loop, the subsequent
_background_tasks.clear() drops the reference; the task runs
untracked against a disconnecting adapter.
"""
import asyncio
from unittest.mock import AsyncMock
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
class _StubAdapter(BasePlatformAdapter):
async def connect(self):
pass
async def disconnect(self):
pass
async def send(self, chat_id, text, **kwargs):
return None
async def get_chat_info(self, chat_id):
return {}
def _make_adapter():
adapter = _StubAdapter(PlatformConfig(enabled=True, token="t"), Platform.TELEGRAM)
adapter._send_with_retry = AsyncMock(return_value=None)
return adapter
def _event(text, cid="42"):
return MessageEvent(
text=text,
message_type=MessageType.TEXT,
source=SessionSource(platform=Platform.TELEGRAM, chat_id=cid, chat_type="dm"),
)
@pytest.mark.asyncio
async def test_cancel_background_tasks_drains_late_arrivals():
"""A message that arrives during the gather window must be picked
up by the re-drain loop, not leaked as an untracked task."""
adapter = _make_adapter()
sk = build_session_key(
SessionSource(platform=Platform.TELEGRAM, chat_id="42", chat_type="dm")
)
m1_started = asyncio.Event()
m1_cleanup_running = asyncio.Event()
m2_started = asyncio.Event()
m2_cancelled = asyncio.Event()
async def handler(event):
if event.text == "M1":
m1_started.set()
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
m1_cleanup_running.set()
# Widen the gather window with a shielded cleanup
# delay so M2 can get injected during it.
await asyncio.shield(asyncio.sleep(0.2))
raise
else: # M2 — the late arrival
m2_started.set()
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
m2_cancelled.set()
raise
adapter._message_handler = handler
# Spawn M1.
await adapter.handle_message(_event("M1"))
await asyncio.wait_for(m1_started.wait(), timeout=1.0)
# Kick off shutdown. This will cancel M1 and await its cleanup.
cancel_task = asyncio.create_task(adapter.cancel_background_tasks())
# Wait until M1's cleanup is running (inside the shielded sleep).
# This is the race window: cancel_task is awaiting gather, M1 is
# shielded in cleanup, the _active_sessions entry has been cleared
# by M1's own finally.
await asyncio.wait_for(m1_cleanup_running.wait(), timeout=1.0)
# Clear the active-session entry (M1's finally hasn't fully run yet,
# but in production the platform dispatcher would deliver a new
# message that takes the no-active-session spawn path). For this
# repro, make it deterministic.
adapter._active_sessions.pop(sk, None)
# Inject late arrival — spawns a fresh _process_message_background
# task and adds it to _background_tasks while cancel_task is still
# in gather.
await adapter.handle_message(_event("M2"))
await asyncio.wait_for(m2_started.wait(), timeout=1.0)
# Let cancel_task finish. Round 1's gather completes when M1's
# shielded cleanup finishes. Round 2 should pick up M2.
await asyncio.wait_for(cancel_task, timeout=5.0)
# Assert M2 was drained, not leaked.
assert m2_cancelled.is_set(), (
"Late-arrival M2 was NOT cancelled by cancel_background_tasks — "
"the re-drain loop is missing and the task leaked"
)
assert adapter._background_tasks == set()
@pytest.mark.asyncio
async def test_cancel_background_tasks_handles_no_tasks():
"""Regression guard: no tasks, no hang, no error."""
adapter = _make_adapter()
await adapter.cancel_background_tasks()
assert adapter._background_tasks == set()
@pytest.mark.asyncio
async def test_cancel_background_tasks_bounded_rounds():
"""Regression guard: the drain loop is bounded — it does not spin
forever even if late-arrival tasks keep getting spawned."""
adapter = _make_adapter()
# Single well-behaved task that cancels cleanly — baseline check
# that the loop terminates in one round.
async def quick():
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
raise
task = asyncio.create_task(quick())
adapter._background_tasks.add(task)
await adapter.cancel_background_tasks()
assert task.done()
assert adapter._background_tasks == set()
+122
View File
@@ -0,0 +1,122 @@
"""Regression tests for the Discord adapter race-polish fix.
Two races are addressed:
1. on_message allowlist check racing on_ready's _resolve_allowed_usernames
resolution window. Username-based entries in DISCORD_ALLOWED_USERS
appear in the set as raw strings for several seconds after
connect/reconnect; author.id is always numeric, so legitimate users
are silently rejected until resolution finishes.
2. join_voice_channel check-and-connect: concurrent /voice channel
invocations both see _voice_clients.get(guild_id) is None, both call
channel.connect(), second raises ClientException ('Already connected').
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import Platform, PlatformConfig
def _make_adapter():
"""Bare DiscordAdapter for testing — object.__new__ pattern per AGENTS.md."""
from gateway.platforms.discord import DiscordAdapter
adapter = object.__new__(DiscordAdapter)
adapter._platform = Platform.DISCORD
adapter.config = PlatformConfig(enabled=True, token="t")
adapter._ready_event = asyncio.Event()
adapter._allowed_user_ids = set()
adapter._allowed_role_ids = set()
adapter._voice_clients = {}
adapter._voice_locks = {}
adapter._voice_receivers = {}
adapter._voice_listen_tasks = {}
adapter._voice_timeout_tasks = {}
adapter._voice_text_channels = {}
adapter._voice_sources = {}
adapter._client = MagicMock()
return adapter
class TestJoinVoiceSerialization:
@pytest.mark.asyncio
async def test_concurrent_joins_do_not_double_connect(self):
"""Two concurrent join_voice_channel calls on the same guild
must serialize through the per-guild lock only ONE
channel.connect() actually fires; the second sees the
_voice_clients entry the first just installed."""
adapter = _make_adapter()
connect_count = [0]
connect_event = asyncio.Event()
class FakeVC:
def __init__(self, channel):
self.channel = channel
def is_connected(self):
return True
async def move_to(self, _channel):
return None
async def disconnect(self):
return None
async def slow_connect(self):
connect_count[0] += 1
# Widen the race window
await connect_event.wait()
return FakeVC(self)
channel = MagicMock()
channel.id = 111
channel.guild.id = 42
channel.connect = lambda: slow_connect(channel)
# Swap out VoiceReceiver so it doesn't try to set up real audio
from gateway.platforms import discord as discord_mod
with patch.object(discord_mod, "VoiceReceiver", MagicMock(return_value=MagicMock(start=lambda: None))):
with patch.object(discord_mod.asyncio, "ensure_future", lambda _c: asyncio.create_task(asyncio.sleep(0))):
# Fire two joins concurrently
t1 = asyncio.create_task(adapter.join_voice_channel(channel))
t2 = asyncio.create_task(adapter.join_voice_channel(channel))
# Let them run until they're blocked on our event
await asyncio.sleep(0.05)
# Release connect so both can finish
connect_event.set()
r1, r2 = await asyncio.gather(t1, t2)
assert connect_count[0] == 1, (
f"Expected exactly 1 channel.connect() call, got {connect_count[0]}"
"per-guild voice lock is not serializing join_voice_channel"
)
assert r1 is True and r2 is True
assert 42 in adapter._voice_clients
class TestOnMessageWaitsForReadyEvent:
@pytest.mark.asyncio
async def test_on_message_blocks_until_ready_event_set(self):
"""A message arriving before on_ready finishes
_resolve_allowed_usernames must wait, not proceed with a
half-resolved allowlist."""
# This is an integration-style check — we pull out the
# on_message handler by asserting the source contains the
# expected wait pattern. A full end-to-end test would require
# setting up the discord.py client machinery, which is not
# practical here.
import inspect
from gateway.platforms import discord as discord_mod
src = inspect.getsource(discord_mod.DiscordAdapter.connect)
assert "_ready_event.is_set()" in src, (
"on_message must gate on _ready_event so username-based "
"allowlist entries are resolved before the allowlist check"
)
assert "await asyncio.wait_for(" in src and "_ready_event.wait()" in src, (
"Expected asyncio.wait_for(_ready_event.wait(), timeout=...) "
"pattern in on_message"
)
+128
View File
@@ -2370,6 +2370,134 @@ class TestAdapterBehavior(unittest.TestCase):
elements = payload["zh_cn"]["content"][0]
self.assertEqual(elements, [{"tag": "md", "text": "可以用 **粗体** 和 *斜体*。"}])
@patch.dict(os.environ, {}, clear=True)
def test_send_splits_fenced_code_blocks_into_separate_post_rows(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
captured = {}
class _MessageAPI:
def create(self, request):
captured["request"] = request
return SimpleNamespace(
success=lambda: True,
data=SimpleNamespace(message_id="om_codeblock"),
)
adapter._client = SimpleNamespace(
im=SimpleNamespace(
v1=SimpleNamespace(
message=_MessageAPI(),
)
)
)
async def _direct(func, *args, **kwargs):
return func(*args, **kwargs)
content = (
"确认已入库 ✓\n"
"文件路径:`/root/.hermes/profiles/agent_cto/cron/jobs.json`\n"
"**解码后的内容:**\n"
"```json\n"
'{"cron": "list"}\n'
"```\n"
"后续说明仍应保留。"
)
with patch("gateway.platforms.feishu.asyncio.to_thread", side_effect=_direct):
result = asyncio.run(
adapter.send(
chat_id="oc_chat",
content=content,
)
)
self.assertTrue(result.success)
self.assertEqual(captured["request"].request_body.msg_type, "post")
payload = json.loads(captured["request"].request_body.content)
rows = payload["zh_cn"]["content"]
self.assertEqual(
rows,
[
[
{
"tag": "md",
"text": "确认已入库 ✓\n文件路径:`/root/.hermes/profiles/agent_cto/cron/jobs.json`\n**解码后的内容:**",
}
],
[{"tag": "md", "text": "```json\n{\"cron\": \"list\"}\n```"}],
[{"tag": "md", "text": "后续说明仍应保留。"}],
],
)
@patch.dict(os.environ, {}, clear=True)
def test_build_post_payload_keeps_fence_like_code_lines_inside_code_block(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
payload = json.loads(
adapter._build_post_payload(
"before\n```python\n```oops\n```\nafter"
)
)
self.assertEqual(
payload["zh_cn"]["content"],
[
[{"tag": "md", "text": "before"}],
[{"tag": "md", "text": "```python\n```oops\n```"}],
[{"tag": "md", "text": "after"}],
],
)
@patch.dict(os.environ, {}, clear=True)
def test_build_post_payload_preserves_trailing_spaces_in_code_block(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
payload = json.loads(
adapter._build_post_payload(
"before\n```python\nline with two spaces \n```\nafter"
)
)
self.assertEqual(
payload["zh_cn"]["content"],
[
[{"tag": "md", "text": "before"}],
[{"tag": "md", "text": "```python\nline with two spaces \n```"}],
[{"tag": "md", "text": "after"}],
],
)
@patch.dict(os.environ, {}, clear=True)
def test_build_post_payload_splits_multiple_fenced_code_blocks(self):
from gateway.config import PlatformConfig
from gateway.platforms.feishu import FeishuAdapter
adapter = FeishuAdapter(PlatformConfig())
payload = json.loads(
adapter._build_post_payload(
"before\n```python\nprint(1)\n```\nmiddle\n```json\n{}\n```\nafter"
)
)
self.assertEqual(
payload["zh_cn"]["content"],
[
[{"tag": "md", "text": "before"}],
[{"tag": "md", "text": "```python\nprint(1)\n```"}],
[{"tag": "md", "text": "middle"}],
[{"tag": "md", "text": "```json\n{}\n```"}],
[{"tag": "md", "text": "after"}],
],
)
@patch.dict(os.environ, {}, clear=True)
def test_send_falls_back_to_text_when_post_payload_is_rejected(self):
from gateway.config import PlatformConfig
+31 -1
View File
@@ -1,13 +1,18 @@
"""Tests for the pending_event None guard in recursive _run_agent calls.
"""Tests for pending follow-up extraction in recursive _run_agent calls.
When pending_event is None (Path B: pending comes from interrupt_message),
accessing pending_event.channel_prompt previously raised AttributeError.
This verifies the fix: channel_prompt is captured inside the
`if pending_event is not None:` block and falls back to None otherwise.
Also verifies that internal control interrupt reasons like "Stop requested"
do not get recycled into the pending-user-message follow-up path.
"""
from types import SimpleNamespace
from gateway.run import _is_control_interrupt_message
def _extract_channel_prompt(pending_event):
"""Reproduce the fixed logic from gateway/run.py.
@@ -21,6 +26,15 @@ def _extract_channel_prompt(pending_event):
return next_channel_prompt
def _extract_pending_text(interrupted, pending_event, interrupt_message):
"""Reproduce the fixed pending-text selection from gateway/run.py."""
if interrupted and pending_event is None and interrupt_message:
if _is_control_interrupt_message(interrupt_message):
return None
return interrupt_message
return None
class TestPendingEventNoneChannelPrompt:
"""Guard against AttributeError when pending_event is None."""
@@ -40,3 +54,19 @@ class TestPendingEventNoneChannelPrompt:
event = SimpleNamespace()
result = _extract_channel_prompt(event)
assert result is None
class TestControlInterruptMessages:
"""Control interrupt reasons must not become follow-up user input."""
def test_stop_requested_is_not_treated_as_pending_user_message(self):
result = _extract_pending_text(True, None, "Stop requested")
assert result is None
def test_session_reset_requested_is_not_treated_as_pending_user_message(self):
result = _extract_pending_text(True, None, "Session reset requested")
assert result is None
def test_real_user_interrupt_message_still_requeues(self):
result = _extract_pending_text(True, None, "actually use postgres instead")
assert result == "actually use postgres instead"
+37
View File
@@ -19,6 +19,7 @@ def _make_runner(proxy_url=None):
runner.config = MagicMock()
runner.config.streaming = StreamingConfig()
runner._running_agents = {}
runner._session_run_generation = {}
runner._session_model_overrides = {}
runner._agent_cache = {}
runner._agent_cache_lock = None
@@ -160,10 +161,12 @@ class TestRunAgentProxyDispatch:
source=source,
session_id="test-session-123",
session_key="test-key",
run_generation=7,
)
assert result["final_response"] == "Hello from remote!"
runner._run_agent_via_proxy.assert_called_once()
assert runner._run_agent_via_proxy.call_args.kwargs["run_generation"] == 7
@pytest.mark.asyncio
async def test_run_agent_skips_proxy_when_not_configured(self, monkeypatch):
@@ -370,6 +373,40 @@ class TestRunAgentViaProxy:
assert "session_id" in result
assert result["session_id"] == "sess-123"
@pytest.mark.asyncio
async def test_proxy_stale_generation_returns_empty_result(self, monkeypatch):
monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642")
monkeypatch.delenv("GATEWAY_PROXY_KEY", raising=False)
runner = _make_runner()
source = _make_source()
runner._session_run_generation["test-key"] = 2
resp = _FakeSSEResponse(
status=200,
sse_chunks=[
'data: {"choices":[{"delta":{"content":"stale"}}]}\n\n',
"data: [DONE]\n\n",
],
)
session = _FakeSession(resp)
with patch("gateway.run._load_gateway_config", return_value={}):
with _patch_aiohttp(session):
with patch("aiohttp.ClientTimeout"):
result = await runner._run_agent_via_proxy(
message="hi",
context_prompt="",
history=[],
source=source,
session_id="sess-123",
session_key="test-key",
run_generation=1,
)
assert result["final_response"] == ""
assert result["messages"] == []
assert result["api_calls"] == 0
@pytest.mark.asyncio
async def test_no_auth_header_without_key(self, monkeypatch):
monkeypatch.setenv("GATEWAY_PROXY_URL", "http://host:8642")
+186
View File
@@ -51,6 +51,9 @@ class ProgressCaptureAdapter(BasePlatformAdapter):
async def send_typing(self, chat_id, metadata=None) -> None:
self.typing.append({"chat_id": chat_id, "metadata": metadata})
async def stop_typing(self, chat_id) -> None:
self.typing.append({"chat_id": chat_id, "metadata": {"stopped": True}})
async def get_chat_info(self, chat_id: str):
return {"id": chat_id}
@@ -90,6 +93,40 @@ class LongPreviewAgent:
}
class DelayedProgressAgent:
def __init__(self, **kwargs):
self.tool_progress_callback = kwargs.get("tool_progress_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
self.tool_progress_callback("tool.started", "terminal", "first command", {})
time.sleep(0.45)
self.tool_progress_callback("tool.started", "terminal", "second command", {})
time.sleep(0.1)
return {
"final_response": "done",
"messages": [],
"api_calls": 1,
}
class DelayedInterimAgent:
def __init__(self, **kwargs):
self.interim_assistant_callback = kwargs.get("interim_assistant_callback")
self.tools = []
def run_conversation(self, message, conversation_history=None, task_id=None):
self.interim_assistant_callback("first interim")
time.sleep(0.45)
self.interim_assistant_callback("second interim")
time.sleep(0.1)
return {
"final_response": "done",
"messages": [],
"api_calls": 1,
}
def _make_runner(adapter):
gateway_run = importlib.import_module("gateway.run")
GatewayRunner = gateway_run.GatewayRunner
@@ -104,6 +141,7 @@ def _make_runner(adapter):
runner._fallback_model = None
runner._session_db = None
runner._running_agents = {}
runner._session_run_generation = {}
runner.hooks = SimpleNamespace(loaded_hooks=False)
runner.config = SimpleNamespace(
thread_sessions_per_user=False,
@@ -744,6 +782,154 @@ async def test_base_processing_releases_post_delivery_callback_after_main_send()
assert released == [True]
@pytest.mark.asyncio
async def test_run_agent_drops_tool_progress_after_generation_invalidation(monkeypatch, tmp_path):
import yaml
(tmp_path / "config.yaml").write_text(
yaml.dump({"display": {"tool_progress": "all"}}),
encoding="utf-8",
)
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = DelayedProgressAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
import tools.terminal_tool # noqa: F401 - register terminal tool metadata
adapter = ProgressCaptureAdapter(platform=Platform.DISCORD)
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
source = SessionSource(
platform=Platform.DISCORD,
chat_id="dm-1",
chat_type="dm",
thread_id=None,
)
session_key = "agent:main:discord:dm:dm-1"
runner._session_run_generation[session_key] = 1
original_send = adapter.send
invalidated = {"done": False}
async def send_and_invalidate(chat_id, content, reply_to=None, metadata=None):
result = await original_send(chat_id, content, reply_to=reply_to, metadata=metadata)
if "first command" in content and not invalidated["done"]:
invalidated["done"] = True
runner._invalidate_session_run_generation(session_key, reason="test_stop")
return result
adapter.send = send_and_invalidate
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-progress-stop",
session_key=session_key,
run_generation=1,
)
all_progress_text = " ".join(call["content"] for call in adapter.sent)
all_progress_text += " ".join(call["content"] for call in adapter.edits)
assert result["final_response"] == "done"
assert 'first command' in all_progress_text
assert 'second command' not in all_progress_text
@pytest.mark.asyncio
async def test_run_agent_drops_interim_commentary_after_generation_invalidation(monkeypatch, tmp_path):
import yaml
(tmp_path / "config.yaml").write_text(
yaml.dump({"display": {"tool_progress": "off", "interim_assistant_messages": True}}),
encoding="utf-8",
)
fake_dotenv = types.ModuleType("dotenv")
fake_dotenv.load_dotenv = lambda *args, **kwargs: None
monkeypatch.setitem(sys.modules, "dotenv", fake_dotenv)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = DelayedInterimAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
adapter = ProgressCaptureAdapter(platform=Platform.DISCORD)
runner = _make_runner(adapter)
gateway_run = importlib.import_module("gateway.run")
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
source = SessionSource(
platform=Platform.DISCORD,
chat_id="dm-2",
chat_type="dm",
thread_id=None,
)
session_key = "agent:main:discord:dm:dm-2"
runner._session_run_generation[session_key] = 1
original_send = adapter.send
invalidated = {"done": False}
async def send_and_invalidate(chat_id, content, reply_to=None, metadata=None):
result = await original_send(chat_id, content, reply_to=reply_to, metadata=metadata)
if content == "first interim" and not invalidated["done"]:
invalidated["done"] = True
runner._invalidate_session_run_generation(session_key, reason="test_stop")
return result
adapter.send = send_and_invalidate
result = await runner._run_agent(
message="hello",
context_prompt="",
history=[],
source=source,
session_id="sess-commentary-stop",
session_key=session_key,
run_generation=1,
)
sent_texts = [call["content"] for call in adapter.sent]
assert result["final_response"] == "done"
assert "first interim" in sent_texts
assert "second interim" not in sent_texts
@pytest.mark.asyncio
async def test_keep_typing_stops_immediately_when_interrupt_event_is_set():
adapter = ProgressCaptureAdapter(platform=Platform.DISCORD)
stop_event = asyncio.Event()
task = asyncio.create_task(
adapter._keep_typing(
"dm-typing-stop",
interval=30.0,
stop_event=stop_event,
)
)
await asyncio.sleep(0.05)
stop_event.set()
await asyncio.wait_for(task, timeout=0.5)
normal_typing_calls = [
call for call in adapter.typing if call.get("metadata") != {"stopped": True}
]
stopped_calls = [
call for call in adapter.typing if call.get("metadata") == {"stopped": True}
]
assert len(normal_typing_calls) == 1
assert len(stopped_calls) == 1
@pytest.mark.asyncio
async def test_verbose_mode_does_not_truncate_args_by_default(monkeypatch, tmp_path):
"""Verbose mode with default tool_preview_length (0) should NOT truncate args.
@@ -319,3 +319,23 @@ async def test_start_gateway_replace_clears_marker_on_permission_denied(
assert ok is False
# Marker must NOT be left behind
assert not (tmp_path / ".gateway-takeover.json").exists()
def test_runner_warns_when_docker_gateway_lacks_explicit_output_mount(monkeypatch, tmp_path, caplog):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("TERMINAL_ENV", "docker")
monkeypatch.setenv("TERMINAL_DOCKER_VOLUMES", '["/etc/localtime:/etc/localtime:ro"]')
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")
},
sessions_dir=tmp_path / "sessions",
)
with caplog.at_level("WARNING"):
GatewayRunner(config)
assert any(
"host-visible output mount" in record.message
for record in caplog.records
)
+19 -5
View File
@@ -24,10 +24,18 @@ class _FakeAdapter:
def __init__(self):
self._pending_messages = {}
self._active_sessions = {}
self.interrupted_sessions = []
async def send(self, chat_id, text, **kwargs):
pass
async def interrupt_session_activity(self, session_key, chat_id):
self.interrupted_sessions.append((session_key, chat_id))
event = self._active_sessions.get(session_key)
if event is not None:
event.set()
def _make_runner():
runner = object.__new__(GatewayRunner)
@@ -37,6 +45,7 @@ def _make_runner():
runner.adapters = {Platform.TELEGRAM: _FakeAdapter()}
runner._running_agents = {}
runner._running_agents_ts = {}
runner._session_run_generation = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._voice_mode = {}
@@ -81,7 +90,7 @@ async def test_sentinel_placed_before_agent_setup():
# Patch _handle_message_with_agent to capture state at entry
sentinel_was_set = False
async def mock_inner(self_inner, ev, src, qk):
async def mock_inner(self_inner, ev, src, qk, generation):
nonlocal sentinel_was_set
sentinel_was_set = runner._running_agents.get(qk) is _AGENT_PENDING_SENTINEL
return "ok"
@@ -105,7 +114,7 @@ async def test_sentinel_cleaned_up_after_handler_returns():
event = _make_event()
session_key = build_session_key(event.source)
async def mock_inner(self_inner, ev, src, qk):
async def mock_inner(self_inner, ev, src, qk, generation):
return "ok"
with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner):
@@ -127,7 +136,7 @@ async def test_sentinel_cleaned_up_on_exception():
event = _make_event()
session_key = build_session_key(event.source)
async def mock_inner(self_inner, ev, src, qk):
async def mock_inner(self_inner, ev, src, qk, generation):
raise RuntimeError("boom")
with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner):
@@ -154,7 +163,7 @@ async def test_second_message_during_sentinel_queued_not_duplicate():
barrier = asyncio.Event()
async def slow_inner(self_inner, ev, src, qk):
async def slow_inner(self_inner, ev, src, qk, generation):
# Simulate slow setup — wait until test tells us to proceed
await barrier.wait()
return "ok"
@@ -333,7 +342,7 @@ async def test_stop_during_sentinel_force_cleans_session():
barrier = asyncio.Event()
async def slow_inner(self_inner, ev, src, qk):
async def slow_inner(self_inner, ev, src, qk, generation):
await barrier.wait()
return "ok"
@@ -381,6 +390,7 @@ async def test_stop_hard_kills_running_agent():
fake_agent = MagicMock()
fake_agent.get_activity_summary.return_value = {"seconds_since_activity": 0}
runner._running_agents[session_key] = fake_agent
runner.adapters[Platform.TELEGRAM]._active_sessions[session_key] = asyncio.Event()
# Send /stop
stop_event = _make_event(text="/stop")
@@ -393,6 +403,10 @@ async def test_stop_hard_kills_running_agent():
assert session_key not in runner._running_agents, (
"/stop must remove the agent from _running_agents so the session is unlocked"
)
assert runner.adapters[Platform.TELEGRAM].interrupted_sessions == [
(session_key, "12345")
]
assert runner.adapters[Platform.TELEGRAM]._active_sessions[session_key].is_set()
# Must return a confirmation
assert result is not None
+116
View File
@@ -50,6 +50,7 @@ def _make_runner(session_entry: SessionEntry):
runner.session_store.rewrite_transcript = MagicMock()
runner.session_store.update_session = MagicMock()
runner._running_agents = {}
runner._session_run_generation = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._session_db = MagicMock()
@@ -223,6 +224,121 @@ async def test_handle_message_persists_agent_token_counts(monkeypatch):
)
@pytest.mark.asyncio
async def test_handle_message_discards_stale_result_after_session_invalidation(monkeypatch):
import gateway.run as gateway_run
session_entry = SessionEntry(
session_key=build_session_key(_make_source()),
session_id="sess-1",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
)
runner = _make_runner(session_entry)
runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}]
session_key = session_entry.session_key
runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks = {session_key: object()}
async def _stale_result(**kwargs):
runner._invalidate_session_run_generation(kwargs["session_key"], reason="test_stale_result")
return {
"final_response": "late reply",
"messages": [],
"tools": [],
"history_offset": 0,
"last_prompt_tokens": 80,
"input_tokens": 120,
"output_tokens": 45,
"model": "openai/test-model",
}
runner._run_agent = AsyncMock(side_effect=_stale_result)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
monkeypatch.setattr(
"agent.model_metadata.get_model_context_length",
lambda *_args, **_kwargs: 100000,
)
result = await runner._handle_message(_make_event("hello"))
assert result is None
runner.session_store.append_to_transcript.assert_not_called()
runner.session_store.update_session.assert_not_called()
assert session_key not in runner.adapters[Platform.TELEGRAM]._post_delivery_callbacks
@pytest.mark.asyncio
async def test_handle_message_stale_result_keeps_newer_generation_callback(monkeypatch):
import gateway.run as gateway_run
class _Adapter:
def __init__(self):
self._post_delivery_callbacks = {}
async def send(self, *args, **kwargs):
return None
def pop_post_delivery_callback(self, session_key, *, generation=None):
entry = self._post_delivery_callbacks.get(session_key)
if entry is None:
return None
if isinstance(entry, tuple):
entry_generation, callback = entry
if generation is not None and entry_generation != generation:
return None
self._post_delivery_callbacks.pop(session_key, None)
return callback
if generation is not None:
return None
return self._post_delivery_callbacks.pop(session_key, None)
session_entry = SessionEntry(
session_key=build_session_key(_make_source()),
session_id="sess-1",
created_at=datetime.now(),
updated_at=datetime.now(),
platform=Platform.TELEGRAM,
chat_type="dm",
)
runner = _make_runner(session_entry)
runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}]
session_key = session_entry.session_key
adapter = _Adapter()
runner.adapters[Platform.TELEGRAM] = adapter
async def _stale_result(**kwargs):
# Simulate a newer run claiming the callback slot before the stale run unwinds.
runner._session_run_generation[session_key] = 2
adapter._post_delivery_callbacks[session_key] = (2, lambda: None)
return {
"final_response": "late reply",
"messages": [],
"tools": [],
"history_offset": 0,
"last_prompt_tokens": 80,
"input_tokens": 120,
"output_tokens": 45,
"model": "openai/test-model",
}
runner._run_agent = AsyncMock(side_effect=_stale_result)
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
monkeypatch.setattr(
"agent.model_metadata.get_model_context_length",
lambda *_args, **_kwargs: 100000,
)
result = await runner._handle_message(_make_event("hello"))
assert result is None
assert session_key in adapter._post_delivery_callbacks
assert adapter._post_delivery_callbacks[session_key][0] == 2
@pytest.mark.asyncio
async def test_status_command_bypasses_active_session_guard():
+84
View File
@@ -1216,3 +1216,87 @@ class TestBufferOnlyMode:
# text, the consumer may send then edit, or just send once at got_done.
# The key assertion: this doesn't break.
assert adapter.send.call_count >= 1
# ── Cursor stripping on fallback (#7183) ────────────────────────────────────
class TestCursorStrippingOnFallback:
"""Regression: cursor must be stripped when fallback continuation is empty (#7183).
When _send_fallback_final is called with nothing new to deliver (the visible
partial already matches final_text), the last edit may still show the cursor
character because fallback mode was entered after a failed edit. Before the
fix this would leave the message permanently frozen with a visible .
"""
@pytest.mark.asyncio
async def test_cursor_stripped_when_continuation_empty(self):
"""_send_fallback_final must attempt a final edit to strip the cursor."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.edit_message = AsyncMock(
return_value=SimpleNamespace(success=True, message_id="msg-1")
)
consumer = GatewayStreamConsumer(
adapter, "chat-1",
config=StreamConsumerConfig(cursor=""),
)
consumer._message_id = "msg-1"
consumer._last_sent_text = "Hello world ▉"
consumer._fallback_final_send = False
await consumer._send_fallback_final("Hello world")
adapter.edit_message.assert_called_once()
call_args = adapter.edit_message.call_args
assert call_args.kwargs["content"] == "Hello world"
assert consumer._already_sent is True
# _last_sent_text should reflect the cleaned text after a successful strip
assert consumer._last_sent_text == "Hello world"
@pytest.mark.asyncio
async def test_cursor_not_stripped_when_no_cursor_configured(self):
"""No edit attempted when cursor is not configured."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.edit_message = AsyncMock()
consumer = GatewayStreamConsumer(
adapter, "chat-1",
config=StreamConsumerConfig(cursor=""),
)
consumer._message_id = "msg-1"
consumer._last_sent_text = "Hello world"
consumer._fallback_final_send = False
await consumer._send_fallback_final("Hello world")
adapter.edit_message.assert_not_called()
assert consumer._already_sent is True
@pytest.mark.asyncio
async def test_cursor_strip_edit_failure_handled(self):
"""If the cursor-stripping edit itself fails, it must not crash and
must not corrupt _last_sent_text."""
adapter = MagicMock()
adapter.MAX_MESSAGE_LENGTH = 4096
adapter.edit_message = AsyncMock(
return_value=SimpleNamespace(success=False, error="flood_control")
)
consumer = GatewayStreamConsumer(
adapter, "chat-1",
config=StreamConsumerConfig(cursor=""),
)
consumer._message_id = "msg-1"
consumer._last_sent_text = "Hello ▉"
consumer._fallback_final_send = False
await consumer._send_fallback_final("Hello")
# Should still set already_sent despite the cursor-strip edit failure
assert consumer._already_sent is True
# _last_sent_text must NOT be updated when the edit failed
assert consumer._last_sent_text == "Hello ▉"
+37
View File
@@ -483,6 +483,32 @@ class TestSendDocument:
assert "not found" in result.error.lower()
connected_adapter._bot.send_document.assert_not_called()
@pytest.mark.asyncio
async def test_send_document_workspace_path_has_docker_hint(self, connected_adapter):
"""Container-local-looking paths get a more actionable Docker hint."""
result = await connected_adapter.send_document(
chat_id="12345",
file_path="/workspace/report.txt",
)
assert result.success is False
assert "docker sandbox" in result.error.lower()
assert "host-visible path" in result.error.lower()
connected_adapter._bot.send_document.assert_not_called()
@pytest.mark.asyncio
async def test_send_document_outputs_path_has_docker_hint(self, connected_adapter):
"""Legacy /outputs paths also get the Docker hint."""
result = await connected_adapter.send_document(
chat_id="12345",
file_path="/outputs/report.txt",
)
assert result.success is False
assert "docker sandbox" in result.error.lower()
assert "host-visible path" in result.error.lower()
connected_adapter._bot.send_document.assert_not_called()
@pytest.mark.asyncio
async def test_send_document_not_connected(self, adapter):
"""If bot is None, returns not connected error."""
@@ -665,6 +691,17 @@ class TestSendVideo:
assert result.success is False
assert "not found" in result.error.lower()
@pytest.mark.asyncio
async def test_send_video_workspace_path_has_docker_hint(self, connected_adapter):
result = await connected_adapter.send_video(
chat_id="12345",
video_path="/workspace/video.mp4",
)
assert result.success is False
assert "docker sandbox" in result.error.lower()
assert "host-visible path" in result.error.lower()
@pytest.mark.asyncio
async def test_send_video_not_connected(self, adapter):
result = await adapter.send_video(
+473
View File
@@ -0,0 +1,473 @@
"""Tests for the webhook adapter's ``deliver_only`` route mode.
``deliver_only`` lets external services (Supabase webhooks, monitoring
alerts, background jobs, other agents) push plain-text notifications to
a user's chat via the webhook adapter WITHOUT invoking the agent. The
rendered prompt template becomes the literal message body.
Covers:
- Agent is NOT invoked (``handle_message`` never called)
- Rendered content is delivered to the target platform adapter
- HTTP returns 200 OK on success, 502 on delivery failure
- Startup validation rejects ``deliver_only`` without a real delivery target
- HMAC auth, rate limiting, and idempotency still apply
"""
import asyncio
import hashlib
import hmac
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, SendResult
from gateway.platforms.webhook import WebhookAdapter, _INSECURE_NO_AUTH
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_adapter(routes, **extra_kw) -> WebhookAdapter:
extra = {"host": "0.0.0.0", "port": 0, "routes": routes}
extra.update(extra_kw)
config = PlatformConfig(enabled=True, extra=extra)
return WebhookAdapter(config)
def _create_app(adapter: WebhookAdapter) -> web.Application:
app = web.Application()
app.router.add_get("/health", adapter._handle_health)
app.router.add_post("/webhooks/{route_name}", adapter._handle_webhook)
return app
def _wire_mock_target(adapter: WebhookAdapter, platform_name: str = "telegram"):
"""Attach a gateway_runner with a mocked target adapter."""
mock_target = AsyncMock()
mock_target.send = AsyncMock(return_value=SendResult(success=True))
mock_runner = MagicMock()
mock_runner.adapters = {Platform(platform_name): mock_target}
mock_runner.config.get_home_channel.return_value = None
adapter.gateway_runner = mock_runner
return mock_target
# ===================================================================
# Core behaviour: agent bypass
# ===================================================================
class TestDeliverOnlyBypassesAgent:
"""The whole point of the feature — handle_message must not be called."""
@pytest.mark.asyncio
async def test_post_delivers_directly_without_agent(self):
routes = {
"match-alert": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "12345"},
"prompt": "{payload.user} matched with {payload.other}!",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
# Guard: handle_message must NOT be called in deliver_only mode
handle_message_calls: list[MessageEvent] = []
async def _capture(event):
handle_message_calls.append(event)
adapter.handle_message = _capture
app = _create_app(adapter)
body = json.dumps(
{"payload": {"user": "alice", "other": "bob"}}
).encode()
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/match-alert",
data=body,
headers={
"Content-Type": "application/json",
"X-GitHub-Delivery": "delivery-1",
},
)
assert resp.status == 200
data = await resp.json()
assert data["status"] == "delivered"
assert data["route"] == "match-alert"
assert data["target"] == "telegram"
# Let any background tasks settle before asserting no agent call
await asyncio.sleep(0.05)
# Agent was NOT invoked
assert handle_message_calls == []
# Target adapter.send() WAS called with the rendered template
mock_target.send.assert_awaited_once()
call_args = mock_target.send.await_args
chat_id_arg, content_arg = call_args.args[0], call_args.args[1]
assert chat_id_arg == "12345"
assert content_arg == "alice matched with bob!"
@pytest.mark.asyncio
async def test_template_rendering_works(self):
"""Dot-notation template variables resolve in deliver_only mode."""
routes = {
"alert": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "chat-1"},
"prompt": "Build {build.number} status: {build.status}",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/alert",
json={"build": {"number": 77, "status": "FAILED"}},
headers={"X-GitHub-Delivery": "d-render-1"},
)
assert resp.status == 200
mock_target.send.assert_awaited_once()
content_arg = mock_target.send.await_args.args[1]
assert content_arg == "Build 77 status: FAILED"
@pytest.mark.asyncio
async def test_thread_id_passed_through(self):
"""deliver_extra.thread_id flows through to the target adapter."""
routes = {
"r": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1", "thread_id": "topic-42"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "d-thread-1"},
)
assert resp.status == 200
assert mock_target.send.await_args.kwargs["metadata"] == {
"thread_id": "topic-42"
}
# ===================================================================
# HTTP status codes
# ===================================================================
class TestDeliverOnlyStatusCodes:
@pytest.mark.asyncio
async def test_delivery_failure_returns_502(self):
"""If the target adapter returns SendResult(success=False), 502."""
routes = {
"r": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
mock_target.send = AsyncMock(
return_value=SendResult(success=False, error="rate limited by tg")
)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "d-fail-1"},
)
assert resp.status == 502
data = await resp.json()
# Generic error — no adapter-level detail leaks
assert data["error"] == "Delivery failed"
assert "rate limited" not in json.dumps(data)
@pytest.mark.asyncio
async def test_delivery_exception_returns_502(self):
"""If adapter.send() raises, we return 502 (not 500)."""
routes = {
"r": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
mock_target.send = AsyncMock(side_effect=RuntimeError("tg exploded"))
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "d-exc-1"},
)
assert resp.status == 502
data = await resp.json()
assert data["error"] == "Delivery failed"
# Exception message must not leak
assert "exploded" not in json.dumps(data)
@pytest.mark.asyncio
async def test_target_platform_not_connected_returns_502(self):
"""deliver_only to a platform the gateway doesn't have → 502."""
routes = {
"r": {
"secret": _INSECURE_NO_AUTH,
"deliver": "discord", # not configured in mock runner
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
_wire_mock_target(adapter, platform_name="telegram") # only TG wired
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "d-no-platform-1"},
)
assert resp.status == 502
# ===================================================================
# Startup validation
# ===================================================================
class TestDeliverOnlyStartupValidation:
@pytest.mark.asyncio
async def test_deliver_only_with_log_deliver_rejected(self):
"""deliver_only=true + deliver=log is nonsense — reject at connect()."""
routes = {
"bad": {
"secret": _INSECURE_NO_AUTH,
"deliver": "log",
"deliver_only": True,
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
with pytest.raises(ValueError, match="deliver_only=true but deliver is 'log'"):
await adapter.connect()
@pytest.mark.asyncio
async def test_deliver_only_with_missing_deliver_rejected(self):
"""deliver_only=true with no deliver field defaults to 'log' → reject."""
routes = {
"bad": {
"secret": _INSECURE_NO_AUTH,
# no deliver field
"deliver_only": True,
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
with pytest.raises(ValueError, match="deliver_only=true"):
await adapter.connect()
@pytest.mark.asyncio
async def test_deliver_only_with_real_target_accepted(self):
"""Sanity check — a valid deliver_only config passes validation."""
routes = {
"good": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
# connect() does more than validation (binds a socket) — we just
# want to verify the validation doesn't raise. Call it and tear
# down immediately.
try:
started = await adapter.connect()
if started:
await adapter.disconnect()
except ValueError:
pytest.fail("valid deliver_only config should not raise ValueError")
# ===================================================================
# Security + reliability invariants still hold
# ===================================================================
class TestDeliverOnlySecurityInvariants:
@pytest.mark.asyncio
async def test_hmac_still_enforced(self):
"""deliver_only does NOT bypass HMAC validation."""
secret = "real-secret-123"
routes = {
"r": {
"secret": secret,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
# No signature header → reject
resp = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "d-noauth-1"},
)
assert resp.status == 401
# Target never called
mock_target.send.assert_not_awaited()
@pytest.mark.asyncio
async def test_idempotency_still_applies(self):
"""Same delivery_id posted twice → second is suppressed."""
routes = {
"r": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes)
mock_target = _wire_mock_target(adapter)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
r1 = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "dup-1"},
)
assert r1.status == 200
r2 = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "dup-1"},
)
# Existing webhook adapter treats duplicates as 200 + status=duplicate
assert r2.status == 200
data = await r2.json()
assert data["status"] == "duplicate"
# Target was called exactly once
assert mock_target.send.await_count == 1
@pytest.mark.asyncio
async def test_rate_limit_still_applies(self):
"""Route-level rate limit caps deliver_only POSTs too."""
routes = {
"r": {
"secret": _INSECURE_NO_AUTH,
"deliver": "telegram",
"deliver_only": True,
"deliver_extra": {"chat_id": "c-1"},
"prompt": "hi",
}
}
adapter = _make_adapter(routes, rate_limit=2)
_wire_mock_target(adapter)
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
for i in range(2):
r = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": f"rl-{i}"},
)
assert r.status == 200
# Third within the window → 429
r3 = await cli.post(
"/webhooks/r",
json={},
headers={"X-GitHub-Delivery": "rl-3"},
)
assert r3.status == 429
# ===================================================================
# Unit: _direct_deliver dispatch
# ===================================================================
class TestDirectDeliverUnit:
@pytest.mark.asyncio
async def test_dispatches_to_cross_platform_for_messaging_targets(self):
adapter = _make_adapter({})
mock_target = _wire_mock_target(adapter, "telegram")
result = await adapter._direct_deliver(
"hello",
{"deliver": "telegram", "deliver_extra": {"chat_id": "c-1"}},
)
assert result.success is True
mock_target.send.assert_awaited_once_with(
"c-1", "hello", metadata=None
)
@pytest.mark.asyncio
async def test_dispatches_to_github_comment(self):
adapter = _make_adapter({})
with patch.object(
adapter, "_deliver_github_comment",
new=AsyncMock(return_value=SendResult(success=True)),
) as mock_gh:
result = await adapter._direct_deliver(
"review body",
{
"deliver": "github_comment",
"deliver_extra": {"repo": "org/r", "pr_number": "1"},
},
)
assert result.success is True
mock_gh.assert_awaited_once()
+1 -1
View File
@@ -13,7 +13,7 @@ class TestCustomProvidersValidation:
issues = validate_config_structure({
"custom_providers": {
"name": "Generativelanguage.googleapis.com",
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai",
"base_url": "https://generativelanguage.googleapis.com/v1beta",
"api_key": "xxx",
"model": "models/gemini-2.5-flash",
"rate_limit_delay": 2.0,
+4 -4
View File
@@ -54,12 +54,12 @@ class TestCronCommandLifecycle:
deliver=None,
repeat=None,
skill=None,
skills=["find-nearby", "blogwatcher"],
skills=["maps", "blogwatcher"],
clear_skills=False,
)
)
updated = get_job(job["id"])
assert updated["skills"] == ["find-nearby", "blogwatcher"]
assert updated["skills"] == ["maps", "blogwatcher"]
assert updated["name"] == "Edited Job"
assert updated["prompt"] == "Revised prompt"
assert updated["schedule_display"] == "every 120m"
@@ -95,7 +95,7 @@ class TestCronCommandLifecycle:
deliver=None,
repeat=None,
skill=None,
skills=["blogwatcher", "find-nearby"],
skills=["blogwatcher", "maps"],
)
)
out = capsys.readouterr().out
@@ -103,5 +103,5 @@ class TestCronCommandLifecycle:
jobs = list_jobs()
assert len(jobs) == 1
assert jobs[0]["skills"] == ["blogwatcher", "find-nearby"]
assert jobs[0]["skills"] == ["blogwatcher", "maps"]
assert jobs[0]["name"] == "Skill combo"
+64 -25
View File
@@ -22,7 +22,7 @@ class TestGeminiProviderRegistry:
assert pconfig.id == "gemini"
assert pconfig.name == "Google AI Studio"
assert pconfig.auth_type == "api_key"
assert pconfig.inference_base_url == "https://generativelanguage.googleapis.com/v1beta/openai"
assert pconfig.inference_base_url == "https://generativelanguage.googleapis.com/v1beta"
def test_gemini_env_vars(self):
pconfig = PROVIDER_REGISTRY["gemini"]
@@ -99,7 +99,7 @@ class TestGeminiCredentials:
creds = resolve_api_key_provider_credentials("gemini")
assert creds["provider"] == "gemini"
assert creds["api_key"] == "google-secret"
assert creds["base_url"] == "https://generativelanguage.googleapis.com/v1beta/openai"
assert creds["base_url"] == "https://generativelanguage.googleapis.com/v1beta"
def test_resolve_with_gemini_api_key(self, monkeypatch):
monkeypatch.setenv("GEMINI_API_KEY", "gemini-secret")
@@ -119,7 +119,7 @@ class TestGeminiCredentials:
assert result["provider"] == "gemini"
assert result["api_mode"] == "chat_completions"
assert result["api_key"] == "google-key"
assert result["base_url"] == "https://generativelanguage.googleapis.com/v1beta/openai"
assert result["base_url"] == "https://generativelanguage.googleapis.com/v1beta"
# ── Model Catalog ──
@@ -193,50 +193,89 @@ class TestGeminiAgentInit:
importlib.reload(run_agent)
def test_gemini_agent_uses_chat_completions(self, monkeypatch):
"""Gemini falls through to chat_completions — no special elif needed."""
"""Gemini still reports chat_completions even though the transport is native."""
monkeypatch.setenv("GOOGLE_API_KEY", "test-key")
with patch("run_agent.OpenAI") as mock_openai:
mock_openai.return_value = MagicMock()
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client:
mock_client.return_value = MagicMock()
from run_agent import AIAgent
agent = AIAgent(
model="gemini-2.5-flash",
provider="gemini",
api_key="test-key",
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
base_url="https://generativelanguage.googleapis.com/v1beta",
)
assert agent.api_mode == "chat_completions"
assert agent.provider == "gemini"
def test_gemini_uses_bearer_auth(self, monkeypatch):
"""Gemini OpenAI-compatible endpoint should receive the real API key."""
def test_gemini_agent_uses_native_client(self, monkeypatch):
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_REAL_KEY")
real_key = "AIzaSy_REAL_KEY"
with patch("run_agent.OpenAI") as mock_openai:
mock_openai.return_value = MagicMock()
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
patch("run_agent.OpenAI") as mock_openai, \
patch("run_agent.ContextCompressor") as mock_compressor:
mock_client.return_value = MagicMock()
mock_compressor.return_value = MagicMock(context_length=1048576, threshold_tokens=524288)
from run_agent import AIAgent
AIAgent(
model="gemini-2.5-flash",
provider="gemini",
api_key=real_key,
api_key="AIzaSy_REAL_KEY",
base_url="https://generativelanguage.googleapis.com/v1beta",
)
assert mock_client.called
mock_openai.assert_not_called()
def test_gemini_custom_base_url_keeps_openai_client(self, monkeypatch):
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_REAL_KEY")
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
patch("run_agent.OpenAI") as mock_openai, \
patch("run_agent.ContextCompressor") as mock_compressor:
mock_openai.return_value = MagicMock()
mock_compressor.return_value = MagicMock(context_length=128000, threshold_tokens=64000)
from run_agent import AIAgent
AIAgent(
model="gemini-2.5-flash",
provider="gemini",
api_key="AIzaSy_REAL_KEY",
base_url="https://proxy.example.com/v1",
)
mock_openai.assert_called_once()
def test_gemini_openai_compat_base_url_keeps_openai_client(self, monkeypatch):
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_REAL_KEY")
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
patch("run_agent.OpenAI") as mock_openai, \
patch("run_agent.ContextCompressor") as mock_compressor:
mock_openai.return_value = MagicMock()
mock_compressor.return_value = MagicMock(context_length=1048576, threshold_tokens=524288)
from run_agent import AIAgent
AIAgent(
model="gemini-2.5-flash",
provider="gemini",
api_key="AIzaSy_REAL_KEY",
base_url="https://generativelanguage.googleapis.com/v1beta/openai",
)
call_kwargs = mock_openai.call_args[1]
assert call_kwargs.get("api_key") == real_key
headers = call_kwargs.get("default_headers", {})
assert "x-goog-api-key" not in headers
mock_openai.assert_called_once()
def test_gemini_resolve_provider_client_auth(self, monkeypatch):
"""resolve_provider_client('gemini') should pass the real API key through."""
def test_gemini_resolve_provider_client_uses_native_client(self, monkeypatch):
"""resolve_provider_client('gemini') should build GeminiNativeClient."""
monkeypatch.setenv("GEMINI_API_KEY", "AIzaSy_TEST_KEY")
real_key = "AIzaSy_TEST_KEY"
with patch("agent.auxiliary_client.OpenAI") as mock_openai:
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
mock_client.return_value = MagicMock()
from agent.auxiliary_client import resolve_provider_client
resolve_provider_client("gemini")
assert mock_client.called
mock_openai.assert_not_called()
def test_gemini_resolve_provider_client_keeps_openai_for_non_native_base_url(self, monkeypatch):
monkeypatch.setenv("GOOGLE_API_KEY", "AIzaSy_TEST_KEY")
monkeypatch.setenv("GEMINI_BASE_URL", "https://proxy.example.com/v1")
with patch("agent.gemini_native_adapter.GeminiNativeClient") as mock_client, \
patch("agent.auxiliary_client.OpenAI") as mock_openai:
mock_openai.return_value = MagicMock()
from agent.auxiliary_client import resolve_provider_client
resolve_provider_client("gemini")
call_kwargs = mock_openai.call_args[1]
assert call_kwargs.get("api_key") == real_key
headers = call_kwargs.get("default_headers", {})
assert "x-goog-api-key" not in headers
mock_openai.assert_called_once()
# ── models.dev Integration ──
+55 -15
View File
@@ -38,7 +38,7 @@ def _make_agent(
agent.status_callback = None
agent.tool_progress_callback = None
agent._compression_warning = None
agent.config = None
agent._aux_compression_context_length_config = None
compressor = MagicMock(spec=ContextCompressor)
compressor.context_length = main_context
@@ -138,13 +138,7 @@ def test_feasibility_check_passes_config_context_length(mock_get_client, mock_ct
get_model_context_length so custom endpoints that lack /models still
report the correct context window (fixes #8499)."""
agent = _make_agent(main_context=200_000, threshold_percent=0.85)
agent.config = {
"auxiliary": {
"compression": {
"context_length": 1_000_000,
},
},
}
agent._aux_compression_context_length_config = 1_000_000
mock_client = MagicMock()
mock_client.base_url = "http://custom-endpoint:8080/v1"
mock_client.api_key = "sk-custom"
@@ -166,13 +160,7 @@ def test_feasibility_check_passes_config_context_length(mock_get_client, mock_ct
def test_feasibility_check_ignores_invalid_context_length(mock_get_client, mock_ctx_len):
"""Non-integer context_length in config is silently ignored."""
agent = _make_agent(main_context=200_000, threshold_percent=0.50)
agent.config = {
"auxiliary": {
"compression": {
"context_length": "not-a-number",
},
},
}
agent._aux_compression_context_length_config = None
mock_client = MagicMock()
mock_client.base_url = "http://custom:8080/v1"
mock_client.api_key = "sk-test"
@@ -189,6 +177,58 @@ def test_feasibility_check_ignores_invalid_context_length(mock_get_client, mock_
)
def test_init_feasibility_check_uses_aux_context_override_from_config():
"""Real AIAgent init should cache and forward auxiliary.compression.context_length."""
class _StubCompressor:
def __init__(self, *args, **kwargs):
self.context_length = 200_000
self.threshold_tokens = 100_000
self.threshold_percent = 0.50
def get_tool_schemas(self):
return []
def on_session_start(self, *args, **kwargs):
return None
cfg = {
"auxiliary": {
"compression": {
"context_length": 1_000_000,
},
},
}
mock_client = MagicMock()
mock_client.base_url = "http://custom-endpoint:8080/v1"
mock_client.api_key = "sk-custom"
with (
patch("hermes_cli.config.load_config", return_value=cfg),
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("run_agent.ContextCompressor", new=_StubCompressor),
patch("agent.auxiliary_client.get_text_auxiliary_client", return_value=(mock_client, "custom/big-model")),
patch("agent.model_metadata.get_model_context_length", return_value=1_000_000) as mock_ctx_len,
):
agent = AIAgent(
api_key="test-key-1234567890",
base_url="https://openrouter.ai/api/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert agent._aux_compression_context_length_config == 1_000_000
mock_ctx_len.assert_called_once_with(
"custom/big-model",
base_url="http://custom-endpoint:8080/v1",
api_key="sk-custom",
config_context_length=1_000_000,
)
@patch("agent.auxiliary_client.get_text_auxiliary_client")
def test_warns_when_no_auxiliary_provider(mock_get_client):
"""Warning emitted when no auxiliary provider is configured."""
+280
View File
@@ -828,3 +828,283 @@ def test_respond_unpacks_sid_tuple_correctly():
server._pending.pop("rid-x", None)
server._answers.pop("rid-x", None)
# ---------------------------------------------------------------------------
# /model switch and other agent-mutating commands must reject while the
# session is running. agent.switch_model() mutates self.model, self.provider,
# self.base_url, self.client etc. in place — the worker thread running
# agent.run_conversation is reading those on every iteration. Same class of
# bug as the session.undo / session.compress mid-run silent-drop; same fix
# pattern: reject with 4009 while running.
# ---------------------------------------------------------------------------
def test_config_set_model_rejects_while_running(monkeypatch):
"""/model via config.set must reject during an in-flight turn."""
seen = {"called": False}
def _fake_apply(sid, session, raw):
seen["called"] = True
return {"value": raw, "warning": ""}
monkeypatch.setattr(server, "_apply_model_switch", _fake_apply)
server._sessions["sid"] = _session(running=True)
try:
resp = server.handle_request({
"id": "1", "method": "config.set",
"params": {"session_id": "sid", "key": "model", "value": "anthropic/claude-sonnet-4.6"},
})
assert resp.get("error")
assert resp["error"]["code"] == 4009
assert "session busy" in resp["error"]["message"]
assert not seen["called"], (
"_apply_model_switch was called mid-turn — would race with "
"the worker thread reading agent.model / agent.client"
)
finally:
server._sessions.pop("sid", None)
def test_config_set_model_allowed_when_idle(monkeypatch):
"""Regression guard: idle sessions can still switch models."""
seen = {"called": False}
def _fake_apply(sid, session, raw):
seen["called"] = True
return {"value": "newmodel", "warning": ""}
monkeypatch.setattr(server, "_apply_model_switch", _fake_apply)
server._sessions["sid"] = _session(running=False)
try:
resp = server.handle_request({
"id": "1", "method": "config.set",
"params": {"session_id": "sid", "key": "model", "value": "newmodel"},
})
assert resp.get("result")
assert resp["result"]["value"] == "newmodel"
assert seen["called"]
finally:
server._sessions.pop("sid", None)
def test_mirror_slash_side_effects_rejects_mutating_commands_while_running(monkeypatch):
"""Slash worker passthrough (e.g. /model, /personality, /prompt,
/compress) must reject during an in-flight turn. Same race as
config.set mutates live agent state while run_conversation is
reading it."""
import types
applied = {"model": False, "compress": False}
def _fake_apply_model(sid, session, arg):
applied["model"] = True
return {"value": arg, "warning": ""}
def _fake_compress(session, focus):
applied["compress"] = True
return (0, {})
monkeypatch.setattr(server, "_apply_model_switch", _fake_apply_model)
monkeypatch.setattr(server, "_compress_session_history", _fake_compress)
session = _session(running=True)
session["agent"] = types.SimpleNamespace(model="x")
for cmd, expected_name in [
("/model new/model", "model"),
("/personality default", "personality"),
("/prompt", "prompt"),
("/compress", "compress"),
]:
warning = server._mirror_slash_side_effects("sid", session, cmd)
assert "session busy" in warning, (
f"{cmd} should have returned busy warning, got: {warning!r}"
)
assert f"/{expected_name}" in warning
# None of the mutating side-effect helpers should have fired.
assert not applied["model"], "model switch fired despite running session"
assert not applied["compress"], "compress fired despite running session"
def test_mirror_slash_side_effects_allowed_when_idle(monkeypatch):
"""Regression guard: idle session still runs the side effects."""
import types
applied = {"model": False}
def _fake_apply_model(sid, session, arg):
applied["model"] = True
return {"value": arg, "warning": ""}
monkeypatch.setattr(server, "_apply_model_switch", _fake_apply_model)
session = _session(running=False)
session["agent"] = types.SimpleNamespace(model="x")
warning = server._mirror_slash_side_effects("sid", session, "/model foo")
# Should NOT contain "session busy" — the switch went through.
assert "session busy" not in warning
assert applied["model"]
# ---------------------------------------------------------------------------
# session.create / session.close race: fast /new churn must not orphan the
# slash_worker subprocess or the global approval-notify registration.
# ---------------------------------------------------------------------------
def test_session_create_close_race_does_not_orphan_worker(monkeypatch):
"""Regression guard: if session.close runs while session.create's
_build thread is still constructing the agent, the build thread
must detect the orphan and clean up the slash_worker + notify
registration it's about to install. Without the cleanup those
resources leak the subprocess stays alive until atexit and the
notify callback lingers in the global registry."""
import threading
closed_workers: list[str] = []
unregistered_keys: list[str] = []
class _FakeWorker:
def __init__(self, key, model):
self.key = key
self._closed = False
def close(self):
self._closed = True
closed_workers.append(self.key)
class _FakeAgent:
def __init__(self):
self.model = "x"
self.provider = "openrouter"
self.base_url = ""
self.api_key = ""
# Make _build block until we release it — simulates slow agent init
release_build = threading.Event()
def _slow_make_agent(sid, key):
release_build.wait(timeout=3.0)
return _FakeAgent()
# Stub everything _build touches
monkeypatch.setattr(server, "_make_agent", _slow_make_agent)
monkeypatch.setattr(server, "_SlashWorker", _FakeWorker)
monkeypatch.setattr(server, "_get_db", lambda: types.SimpleNamespace(create_session=lambda *a, **kw: None))
monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"})
monkeypatch.setattr(server, "_probe_credentials", lambda _a: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
# Shim register/unregister to observe leaks
import tools.approval as _approval
monkeypatch.setattr(_approval, "register_gateway_notify",
lambda key, cb: None)
monkeypatch.setattr(_approval, "unregister_gateway_notify",
lambda key: unregistered_keys.append(key))
monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None)
# Start: session.create spawns _build thread, returns synchronously
resp = server.handle_request({
"id": "1", "method": "session.create", "params": {"cols": 80},
})
assert resp.get("result"), f"got error: {resp.get('error')}"
sid = resp["result"]["session_id"]
# Build thread is blocked in _slow_make_agent. Close the session
# NOW — this pops _sessions[sid] before _build can install the
# worker/notify.
close_resp = server.handle_request({
"id": "2", "method": "session.close", "params": {"session_id": sid},
})
assert close_resp.get("result", {}).get("closed") is True
# At this point session.close saw slash_worker=None (not yet
# installed) so it didn't close anything. Release the build thread
# and let it finish — it should detect the orphan and clean up the
# worker it just allocated + unregister the notify.
release_build.set()
# Give the build thread a moment to run through its finally.
for _ in range(100):
if closed_workers:
break
import time
time.sleep(0.02)
assert len(closed_workers) == 1, (
f"orphan worker was not cleaned up — closed_workers={closed_workers}"
)
# Notify may be unregistered by both session.close (unconditional)
# and the orphan-cleanup path; the key guarantee is that the build
# thread does at least one unregister call (any prior close
# already popped the callback; the duplicate is a no-op).
assert len(unregistered_keys) >= 1, (
f"orphan notify registration was not unregistered — "
f"unregistered_keys={unregistered_keys}"
)
def test_session_create_no_race_keeps_worker_alive(monkeypatch):
"""Regression guard: when session.close does NOT race, the build
thread must install the worker + notify normally and leave them
alone (no over-eager cleanup)."""
closed_workers: list[str] = []
unregistered_keys: list[str] = []
class _FakeWorker:
def __init__(self, key, model):
self.key = key
def close(self):
closed_workers.append(self.key)
class _FakeAgent:
def __init__(self):
self.model = "x"
self.provider = "openrouter"
self.base_url = ""
self.api_key = ""
monkeypatch.setattr(server, "_make_agent", lambda sid, key: _FakeAgent())
monkeypatch.setattr(server, "_SlashWorker", _FakeWorker)
monkeypatch.setattr(server, "_get_db", lambda: types.SimpleNamespace(create_session=lambda *a, **kw: None))
monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"})
monkeypatch.setattr(server, "_probe_credentials", lambda _a: None)
monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None)
monkeypatch.setattr(server, "_emit", lambda *a, **kw: None)
import tools.approval as _approval
monkeypatch.setattr(_approval, "register_gateway_notify", lambda key, cb: None)
monkeypatch.setattr(_approval, "unregister_gateway_notify",
lambda key: unregistered_keys.append(key))
monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None)
resp = server.handle_request({
"id": "1", "method": "session.create", "params": {"cols": 80},
})
sid = resp["result"]["session_id"]
# Wait for the build to finish (ready event inside session dict).
session = server._sessions[sid]
session["agent_ready"].wait(timeout=2.0)
# Build finished without a close race — nothing should have been
# cleaned up by the orphan check.
assert closed_workers == [], (
f"build thread closed its own worker despite no race: {closed_workers}"
)
assert unregistered_keys == [], (
f"build thread unregistered its own notify despite no race: {unregistered_keys}"
)
# Session should have the live worker installed.
assert session.get("slash_worker") is not None
# Cleanup
server._sessions.pop(sid, None)
+5 -5
View File
@@ -192,23 +192,23 @@ class TestUnifiedCronjobTool:
result = json.loads(
cronjob(
action="create",
skills=["blogwatcher", "find-nearby"],
skills=["blogwatcher", "maps"],
prompt="Use both skills and combine the result.",
schedule="every 1h",
name="Combo job",
)
)
assert result["success"] is True
assert result["skills"] == ["blogwatcher", "find-nearby"]
assert result["skills"] == ["blogwatcher", "maps"]
listing = json.loads(cronjob(action="list"))
assert listing["jobs"][0]["skills"] == ["blogwatcher", "find-nearby"]
assert listing["jobs"][0]["skills"] == ["blogwatcher", "maps"]
def test_multi_skill_default_name_prefers_prompt_when_present(self):
result = json.loads(
cronjob(
action="create",
skills=["blogwatcher", "find-nearby"],
skills=["blogwatcher", "maps"],
prompt="Use both skills and combine the result.",
schedule="every 1h",
)
@@ -220,7 +220,7 @@ class TestUnifiedCronjobTool:
created = json.loads(
cronjob(
action="create",
skills=["blogwatcher", "find-nearby"],
skills=["blogwatcher", "maps"],
prompt="Use both skills and combine the result.",
schedule="every 1h",
)
+79
View File
@@ -4,6 +4,7 @@ import io
import json
import sys
import threading
import time
from unittest.mock import MagicMock, patch
import pytest
@@ -432,3 +433,81 @@ def test_command_dispatch_returns_skill_payload(server):
assert result["type"] == "skill"
assert result["message"] == fake_msg
assert result["name"] == "hermes-agent-dev"
# ── dispatch(): pool routing for long handlers (#12546) ──────────────
def test_dispatch_runs_short_handlers_inline(server):
"""Non-long handlers return their response synchronously from dispatch()."""
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
resp = server.dispatch({"id": "r1", "method": "fast.ping", "params": {}})
assert resp == {"jsonrpc": "2.0", "id": "r1", "result": {"pong": True}}
def test_dispatch_offloads_long_handlers_and_emits_via_stdout(capture):
"""Long handlers run on the pool and write their response via write_json."""
server, buf = capture
server._methods["slash.exec"] = lambda rid, params: server._ok(rid, {"output": "hi"})
resp = server.dispatch({"id": "r2", "method": "slash.exec", "params": {}})
assert resp is None
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written == {"jsonrpc": "2.0", "id": "r2", "result": {"output": "hi"}}
def test_dispatch_long_handler_does_not_block_fast_handler(server):
"""A slow long handler must not prevent a concurrent fast handler from completing."""
released = threading.Event()
server._methods["slash.exec"] = lambda rid, params: (released.wait(timeout=5), server._ok(rid, {"done": True}))[1]
server._methods["fast.ping"] = lambda rid, params: server._ok(rid, {"pong": True})
t0 = time.monotonic()
assert server.dispatch({"id": "slow", "method": "slash.exec", "params": {}}) is None
fast_resp = server.dispatch({"id": "fast", "method": "fast.ping", "params": {}})
fast_elapsed = time.monotonic() - t0
assert fast_resp["result"] == {"pong": True}
assert fast_elapsed < 0.5, f"fast handler blocked for {fast_elapsed:.2f}s behind slow handler"
released.set()
def test_dispatch_long_handler_exception_produces_error_response(capture):
"""An exception inside a pool-dispatched handler still yields a JSON-RPC error."""
server, buf = capture
def boom(rid, params):
raise RuntimeError("kaboom")
server._methods["slash.exec"] = boom
server.dispatch({"id": "r3", "method": "slash.exec", "params": {}})
for _ in range(50):
if buf.getvalue():
break
time.sleep(0.01)
written = json.loads(buf.getvalue())
assert written["id"] == "r3"
assert written["error"]["code"] == -32000
assert "kaboom" in written["error"]["message"]
def test_dispatch_unknown_long_method_still_goes_inline(server):
"""Method name not in _LONG_HANDLERS takes the sync path even if handler is slow."""
server._methods["some.method"] = lambda rid, params: server._ok(rid, {"ok": True})
resp = server.dispatch({"id": "r4", "method": "some.method", "params": {}})
assert resp["result"] == {"ok": True}
+2 -2
View File
@@ -2,7 +2,7 @@ import json
import signal
import sys
from tui_gateway.server import handle_request, resolve_skin, write_json
from tui_gateway.server import dispatch, resolve_skin, write_json
signal.signal(signal.SIGPIPE, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -28,7 +28,7 @@ def main():
sys.exit(0)
continue
resp = handle_request(req)
resp = dispatch(req)
if resp is not None:
if not write_json(resp):
sys.exit(0)
+102 -2
View File
@@ -1,4 +1,5 @@
import atexit
import concurrent.futures
import copy
import json
import os
@@ -36,6 +37,23 @@ _cfg_cache: dict | None = None
_cfg_mtime: float | None = None
_SLASH_WORKER_TIMEOUT_S = max(5.0, float(os.environ.get("HERMES_TUI_SLASH_TIMEOUT_S", "45") or 45))
# ── Async RPC dispatch (#12546) ──────────────────────────────────────
# A handful of handlers block the dispatcher loop in entry.py for seconds
# to minutes (slash.exec, cli.exec, shell.exec, session.resume,
# session.branch). While they're running, inbound RPCs — notably
# approval.respond and session.interrupt — sit unread in the stdin pipe.
# We route only those slow handlers onto a small thread pool; everything
# else stays on the main thread so ordering stays sane for the fast path.
# write_json is already _stdout_lock-guarded, so concurrent response
# writes are safe.
_LONG_HANDLERS = frozenset({"cli.exec", "session.branch", "session.resume", "shell.exec", "slash.exec"})
_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=max(2, int(os.environ.get("HERMES_TUI_RPC_POOL_WORKERS", "4") or 4)),
thread_name_prefix="tui-rpc",
)
atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True))
# Reserve real stdout for JSON-RPC only; redirect Python's stdout to stderr
# so stray print() from libraries/tools becomes harmless gateway.stderr instead
# of corrupting the JSON protocol.
@@ -200,6 +218,29 @@ def handle_request(req: dict) -> dict | None:
return fn(req.get("id"), req.get("params", {}))
def dispatch(req: dict) -> dict | None:
"""Route inbound RPCs — long handlers to the pool, everything else inline.
Returns a response dict when handled inline. Returns None when the
handler was scheduled on the pool; the worker writes its own
response via write_json when done.
"""
if req.get("method") not in _LONG_HANDLERS:
return handle_request(req)
def run():
try:
resp = handle_request(req)
except Exception as exc:
resp = _err(req.get("id"), -32000, f"handler error: {exc}")
if resp is not None:
write_json(resp)
_pool.submit(run)
return None
def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None:
ready = session.get("agent_ready")
if ready is not None and not ready.wait(timeout=timeout):
@@ -1088,7 +1129,23 @@ def _(rid, params: dict) -> dict:
}
def _build() -> None:
session = _sessions[sid]
session = _sessions.get(sid)
if session is None:
# session.close ran before the build thread got scheduled.
ready.set()
return
# Track what we allocate so we can clean up if session.close
# races us to the finish line. session.close pops _sessions[sid]
# unconditionally and tries to close the slash_worker it finds;
# if _build is still mid-construction when close runs, close
# finds slash_worker=None / notify unregistered and returns
# cleanly — leaving us, the build thread, to later install the
# worker + notify on an orphaned session dict. The finally
# block below detects the orphan and cleans up instead of
# leaking a subprocess and a global notify registration.
worker = None
notify_registered = False
try:
tokens = _set_session_context(key)
try:
@@ -1100,13 +1157,15 @@ def _(rid, params: dict) -> dict:
session["agent"] = agent
try:
session["slash_worker"] = _SlashWorker(key, getattr(agent, "model", _resolve_model()))
worker = _SlashWorker(key, getattr(agent, "model", _resolve_model()))
session["slash_worker"] = worker
except Exception:
pass
try:
from tools.approval import register_gateway_notify, load_permanent_allowlist
register_gateway_notify(key, lambda data: _emit("approval.request", sid, data))
notify_registered = True
load_permanent_allowlist()
except Exception:
pass
@@ -1122,6 +1181,23 @@ def _(rid, params: dict) -> dict:
session["agent_error"] = str(e)
_emit("error", sid, {"message": f"agent init failed: {e}"})
finally:
# Orphan check: if session.close raced us and popped
# _sessions[sid] while we were building, the dict we just
# populated is unreachable. Clean up the subprocess and
# the global notify registration ourselves — session.close
# couldn't see them at the time it ran.
if _sessions.get(sid) is not session:
if worker is not None:
try:
worker.close()
except Exception:
pass
if notify_registered:
try:
from tools.approval import unregister_gateway_notify
unregister_gateway_notify(key)
except Exception:
pass
ready.set()
threading.Thread(target=_build, daemon=True).start()
@@ -1743,6 +1819,19 @@ def _(rid, params: dict) -> dict:
if not value:
return _err(rid, 4002, "model value required")
if session:
# Reject during an in-flight turn. agent.switch_model()
# mutates self.model / self.provider / self.base_url /
# self.client in place; the worker thread running
# agent.run_conversation is reading those on every
# iteration. A mid-turn swap can send an HTTP request
# with the new base_url but old model (or vice versa),
# producing 400/404s the user never asked for. Parity
# with the gateway's running-agent /model guard.
if session.get("running"):
return _err(
rid, 4009,
"session busy — /interrupt the current turn before switching models",
)
result = _apply_model_switch(params.get("session_id", ""), session, value)
else:
result = _apply_model_switch("", {"agent": None}, value)
@@ -2446,6 +2535,17 @@ def _mirror_slash_side_effects(sid: str, session: dict, command: str) -> str:
return ""
name, arg, agent = parts[0], (parts[1].strip() if len(parts) > 1 else ""), session.get("agent")
# Reject agent-mutating commands during an in-flight turn. These
# all do read-then-mutate on live agent/session state that the
# worker thread running agent.run_conversation is using. Parity
# with the session.compress / session.undo guards and the gateway
# runner's running-agent /model guard.
_MUTATES_WHILE_RUNNING = {"model", "personality", "prompt", "compress"}
if name in _MUTATES_WHILE_RUNNING and session.get("running"):
return (
f"session busy — /interrupt the current turn before running /{name}"
)
try:
if name == "model" and arg and agent:
result = _apply_model_switch(sid, session, arg)
+6 -3
View File
@@ -4,9 +4,12 @@ import { providerDisplayNames } from '../domain/providers.js'
describe('providerDisplayNames', () => {
it('returns bare names when all are unique', () => {
expect(providerDisplayNames([{ name: 'Anthropic', slug: 'anthropic' }, { name: 'OpenAI', slug: 'openai' }])).toEqual(
['Anthropic', 'OpenAI']
)
expect(
providerDisplayNames([
{ name: 'Anthropic', slug: 'anthropic' },
{ name: 'OpenAI', slug: 'openai' }
])
).toEqual(['Anthropic', 'OpenAI'])
})
it('appends slug to every collision so the disambiguation is symmetric', () => {
@@ -46,7 +46,6 @@ const pushNote = pushUnique(6)
const pushTool = pushUnique(8)
export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev: GatewayEvent) => void {
const { dequeue, queueEditRef, sendQueued } = ctx.composer
const { rpc } = ctx.gateway
const { STARTUP_RESUME_ID, newSession, resumeById, setCatalog } = ctx.session
const { bellOnComplete, stdout, sys } = ctx.system
@@ -394,16 +393,6 @@ export function createGatewayEventHandler(ctx: GatewayEventHandlerContext): (ev:
patchUiState(state => ({ ...state, usage: { ...state.usage, ...ev.payload!.usage } }))
}
if (queueEditRef.current !== null) {
return
}
const next = dequeue()
if (next) {
sendQueued(next)
}
return
}
-5
View File
@@ -193,11 +193,6 @@ export interface InputHandlerResult {
}
export interface GatewayEventHandlerContext {
composer: {
dequeue: () => string | undefined
queueEditRef: MutableRefObject<null | number>
sendQueued: (text: string) => void
}
gateway: GatewayServices
session: {
STARTUP_RESUME_ID: string
-1
View File
@@ -7,7 +7,6 @@ import type {
SudoRespondResponse,
VoiceRecordResponse
} from '../gatewayTypes.js'
import { writeOsc52Clipboard } from '../lib/osc52.js'
import { getInputSelection } from './inputSelectionStore.js'
+6 -9
View File
@@ -380,12 +380,13 @@ export function useMainApp(gw: GatewayClient) {
sys
})
const prevSidRef = useRef<null | string>(null)
// Drain one queued message whenever the session settles (busy → false):
// agent turn ends, interrupt, shell.exec finishes, error recovered, or the
// session first comes up with pre-queued messages. Without this, shell.exec
// and error paths never emit message.complete, so anything enqueued while
// `!sleep` / a failed turn was running would stay stuck forever.
useEffect(() => {
const prev = prevSidRef.current
prevSidRef.current = ui.sid
if (prev !== null || !ui.sid || ui.busy || composerRefs.queueEditRef.current !== null) {
if (!ui.sid || ui.busy || composerRefs.queueEditRef.current !== null) {
return
}
@@ -416,7 +417,6 @@ export function useMainApp(gw: GatewayClient) {
const onEvent = useMemo(
() =>
createGatewayEventHandler({
composer: { dequeue: composerActions.dequeue, queueEditRef: composerRefs.queueEditRef, sendQueued },
gateway,
session: {
STARTUP_RESUME_ID,
@@ -432,11 +432,8 @@ export function useMainApp(gw: GatewayClient) {
[
appendMessage,
bellOnComplete,
composerActions,
composerRefs,
gateway,
panel,
sendQueued,
session.newSession,
session.resetSession,
session.resumeById,
+8 -2
View File
@@ -181,7 +181,10 @@ export function ModelPicker({ gw, onCancel, onSelect, sessionId, t }: ModelPicke
const idx = off + i
return (
<Text color={providerIdx === idx ? t.color.cornsilk : t.color.dim} key={providers[idx]?.slug ?? `row-${idx}`}>
<Text
color={providerIdx === idx ? t.color.cornsilk : t.color.dim}
key={providers[idx]?.slug ?? `row-${idx}`}
>
{providerIdx === idx ? '▸ ' : ' '}
{i + 1}. {row}
</Text>
@@ -212,7 +215,10 @@ export function ModelPicker({ gw, onCancel, onSelect, sessionId, t }: ModelPicke
const idx = off + i
return (
<Text color={modelIdx === idx ? t.color.cornsilk : t.color.dim} key={`${provider?.slug ?? 'prov'}:${idx}:${row}`}>
<Text
color={modelIdx === idx ? t.color.cornsilk : t.color.dim}
key={`${provider?.slug ?? 'prov'}:${idx}:${row}`}
>
{modelIdx === idx ? '▸ ' : ' '}
{i + 1}. {row}
</Text>
+9 -21
View File
@@ -155,31 +155,21 @@ export function ConfirmPrompt({ onCancel, onConfirm, req, t }: ConfirmPromptProp
const [sel, setSel] = useState(0)
useInput((ch, key) => {
if (key.escape || (key.ctrl && ch.toLowerCase() === 'c')) {
onCancel()
return
}
const lower = ch.toLowerCase()
if (key.escape || (key.ctrl && lower === 'c') || lower === 'n') {
return onCancel()
}
if (lower === 'y') {
onConfirm()
return
return onConfirm()
}
if (lower === 'n') {
onCancel()
return
}
if (key.upArrow && sel > 0) {
if (key.upArrow) {
setSel(0)
}
if (key.downArrow && sel < 1) {
if (key.downArrow) {
setSel(1)
}
@@ -189,12 +179,10 @@ export function ConfirmPrompt({ onCancel, onConfirm, req, t }: ConfirmPromptProp
})
const accent = req.danger ? t.color.error : t.color.warn
const confirmLabel = req.confirmLabel ?? 'Yes'
const cancelLabel = req.cancelLabel ?? 'No'
const rows = [
{ color: t.color.cornsilk, label: cancelLabel },
{ color: req.danger ? t.color.error : t.color.cornsilk, label: confirmLabel }
{ color: t.color.cornsilk, label: req.cancelLabel ?? 'No' },
{ color: req.danger ? t.color.error : t.color.cornsilk, label: req.confirmLabel ?? 'Yes' }
]
return (
+1 -3
View File
@@ -1,5 +1,3 @@
export const STARTUP_RESUME_ID = (process.env.HERMES_TUI_RESUME ?? '').trim()
export const MOUSE_TRACKING = !/^(?:1|true|yes|on)$/i.test((process.env.HERMES_TUI_DISABLE_MOUSE ?? '').trim())
export const NO_CONFIRM_DESTRUCTIVE = /^(?:1|true|yes|on)$/i.test(
(process.env.HERMES_TUI_NO_CONFIRM ?? '').trim()
)
export const NO_CONFIRM_DESTRUCTIVE = /^(?:1|true|yes|on)$/i.test((process.env.HERMES_TUI_NO_CONFIRM ?? '').trim())
+1 -2
View File
@@ -10,8 +10,7 @@ export const fmtCwdBranch = (cwd: string, branch: null | string, max = 40) => {
return shortCwd(cwd, max)
}
const b = branch.length > 16 ? `${branch.slice(-15)}` : branch
const tag = ` (${b})`
const tag = ` (${branch.length > 16 ? `${branch.slice(-15)}` : branch})`
return `${shortCwd(cwd, Math.max(8, max - tag.length))}${tag}`
}
+3 -9
View File
@@ -5,13 +5,7 @@ export const providerDisplayNames = (providers: readonly { name: string; slug: s
counts.set(p.name, (counts.get(p.name) ?? 0) + 1)
}
return providers.map(p => {
const dup = (counts.get(p.name) ?? 0) > 1
if (!dup || !p.slug || p.slug === p.name) {
return p.name
}
return `${p.name} (${p.slug})`
})
return providers.map(p =>
(counts.get(p.name) ?? 0) > 1 && p.slug && p.slug !== p.name ? `${p.name} (${p.slug})` : p.name
)
}
+3 -196
View File
@@ -6,7 +6,7 @@ description: "Install Hermes Agent on Linux, macOS, WSL2, or Android via Termux"
# Installation
Get Hermes Agent up and running in under two minutes with the one-line installer, or follow the manual steps for full control.
Get Hermes Agent up and running in under two minutes with the one-line installer.
## Quick Install
@@ -82,202 +82,9 @@ If you use Nix (on NixOS, macOS, or Linux), there's a dedicated setup path with
---
## Manual Installation
## Manual / Developer Installation
If you prefer full control over the installation process, follow these steps.
### Step 1: Clone the Repository
Clone with `--recurse-submodules` to pull the required submodules:
```bash
git clone --recurse-submodules https://github.com/NousResearch/hermes-agent.git
cd hermes-agent
```
If you already cloned without `--recurse-submodules`:
```bash
git submodule update --init --recursive
```
### Step 2: Install uv & Create Virtual Environment
```bash
# Install uv (if not already installed)
curl -LsSf https://astral.sh/uv/install.sh | sh
# Create venv with Python 3.11 (uv downloads it if not present — no sudo needed)
uv venv venv --python 3.11
```
:::tip
You do **not** need to activate the venv to use `hermes`. The entry point has a hardcoded shebang pointing to the venv Python, so it works globally once symlinked.
:::
### Step 3: Install Python Dependencies
```bash
# Tell uv which venv to install into
export VIRTUAL_ENV="$(pwd)/venv"
# Install with all extras
uv pip install -e ".[all]"
```
If you only want the core agent (no Telegram/Discord/cron support):
```bash
uv pip install -e "."
```
<details>
<summary><strong>Optional extras breakdown</strong></summary>
| Extra | What it adds | Install command |
|-------|-------------|-----------------|
| `all` | Everything below | `uv pip install -e ".[all]"` |
| `messaging` | Telegram, Discord & Slack gateway | `uv pip install -e ".[messaging]"` |
| `cron` | Cron expression parsing for scheduled tasks | `uv pip install -e ".[cron]"` |
| `cli` | Terminal menu UI for setup wizard | `uv pip install -e ".[cli]"` |
| `modal` | Modal cloud execution backend | `uv pip install -e ".[modal]"` |
| `tts-premium` | ElevenLabs premium voices | `uv pip install -e ".[tts-premium]"` |
| `voice` | CLI microphone input + audio playback | `uv pip install -e ".[voice]"` |
| `pty` | PTY terminal support | `uv pip install -e ".[pty]"` |
| `termux` | Tested Android / Termux bundle (`cron`, `cli`, `pty`, `mcp`, `honcho`, `acp`) | `python -m pip install -e ".[termux]" -c constraints-termux.txt` |
| `honcho` | AI-native memory (Honcho integration) | `uv pip install -e ".[honcho]"` |
| `mcp` | Model Context Protocol support | `uv pip install -e ".[mcp]"` |
| `homeassistant` | Home Assistant integration | `uv pip install -e ".[homeassistant]"` |
| `acp` | ACP editor integration support | `uv pip install -e ".[acp]"` |
| `slack` | Slack messaging | `uv pip install -e ".[slack]"` |
| `dev` | pytest & test utilities | `uv pip install -e ".[dev]"` |
You can combine extras: `uv pip install -e ".[messaging,cron]"`
:::tip Termux users
`.[all]` is not currently available on Android because the `voice` extra pulls `faster-whisper`, which depends on `ctranslate2` wheels that are not published for Android. Use `.[termux]` for the tested mobile install path, then add individual extras only as needed.
:::
</details>
### Step 4: Install Optional Submodules (if needed)
```bash
# RL training backend (optional)
uv pip install -e "./tinker-atropos"
```
Both are optional — if you skip them, the corresponding toolsets simply won't be available.
### Step 5: Install Node.js Dependencies (Optional)
Only needed for **browser automation** (Browserbase-powered) and **WhatsApp bridge**:
```bash
npm install
```
### Step 6: Create the Configuration Directory
```bash
# Create the directory structure
mkdir -p ~/.hermes/{cron,sessions,logs,memories,skills,pairing,hooks,image_cache,audio_cache,whatsapp/session}
# Copy the example config file
cp cli-config.yaml.example ~/.hermes/config.yaml
# Create an empty .env file for API keys
touch ~/.hermes/.env
```
### Step 7: Add Your API Keys
Open `~/.hermes/.env` and add at minimum an LLM provider key:
```bash
# Required — at least one LLM provider:
OPENROUTER_API_KEY=sk-or-v1-your-key-here
# Optional — enable additional tools:
FIRECRAWL_API_KEY=fc-your-key # Web search & scraping (or self-host, see docs)
FAL_KEY=your-fal-key # Image generation (FLUX)
```
Or set them via the CLI:
```bash
hermes config set OPENROUTER_API_KEY sk-or-v1-your-key-here
```
### Step 8: Add `hermes` to Your PATH
```bash
mkdir -p ~/.local/bin
ln -sf "$(pwd)/venv/bin/hermes" ~/.local/bin/hermes
```
If `~/.local/bin` isn't on your PATH, add it to your shell config:
```bash
# Bash
echo 'export PATH="$HOME/.local/bin:$PATH"' >> ~/.bashrc && source ~/.bashrc
# Zsh
echo 'export PATH="$HOME/.local/bin:$PATH"' >> ~/.zshrc && source ~/.zshrc
# Fish
fish_add_path $HOME/.local/bin
```
### Step 9: Configure Your Provider
```bash
hermes model # Select your LLM provider and model
```
### Step 10: Verify the Installation
```bash
hermes version # Check that the command is available
hermes doctor # Run diagnostics to verify everything is working
hermes status # Check your configuration
hermes chat -q "Hello! What tools do you have available?"
```
---
## Quick-Reference: Manual Install (Condensed)
For those who just want the commands:
```bash
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Clone & enter
git clone --recurse-submodules https://github.com/NousResearch/hermes-agent.git
cd hermes-agent
# Create venv with Python 3.11
uv venv venv --python 3.11
export VIRTUAL_ENV="$(pwd)/venv"
# Install everything
uv pip install -e ".[all]"
uv pip install -e "./tinker-atropos"
npm install # optional, for browser tools and WhatsApp
# Configure
mkdir -p ~/.hermes/{cron,sessions,logs,memories,skills,pairing,hooks,image_cache,audio_cache,whatsapp/session}
cp cli-config.yaml.example ~/.hermes/config.yaml
touch ~/.hermes/.env
echo 'OPENROUTER_API_KEY=sk-or-v1-your-key' >> ~/.hermes/.env
# Make hermes available globally
mkdir -p ~/.local/bin
ln -sf "$(pwd)/venv/bin/hermes" ~/.local/bin/hermes
# Verify
hermes doctor
hermes
```
If you want to clone the repo and install from source — for contributing, running from a specific branch, or having full control over the virtual environment — see the [Development Setup](../developer-guide/contributing.md#development-setup) section in the Contributing guide.
---
+150 -105
View File
@@ -1,12 +1,35 @@
---
sidebar_position: 1
title: "Quickstart"
description: "Your first conversation with Hermes Agent — from install to chatting in 2 minutes"
description: "Your first conversation with Hermes Agent — from install to chatting in under 5 minutes"
---
# Quickstart
This guide walks you through installing Hermes Agent, setting up a provider, and having your first conversation. By the end, you'll know the key features and how to explore further.
This guide gets you from zero to a working Hermes setup that survives real use. Install, choose a provider, verify a working chat, and know exactly what to do when something breaks.
## Who this is for
- Brand new and want the shortest path to a working setup
- Switching providers and don't want to lose time to config mistakes
- Setting up Hermes for a team, bot, or always-on workflow
- Tired of "it installed, but it still does nothing"
## The fastest path
Pick the row that matches your goal:
| Goal | Do this first | Then do this |
|---|---|---|
| I just want Hermes working on my machine | `hermes setup` | Run a real chat and verify it responds |
| I already know my provider | `hermes model` | Save the config, then start chatting |
| I want a bot or always-on setup | `hermes gateway setup` after CLI works | Connect Telegram, Discord, Slack, or another platform |
| I want a local or self-hosted model | `hermes model` → custom endpoint | Verify the endpoint, model name, and context length |
| I want multi-provider fallback | `hermes model` first | Add routing and fallback only after the base chat works |
**Rule of thumb:** if Hermes cannot complete a normal chat, do not add more features yet. Get one clean conversation working first, then layer on gateway, cron, skills, voice, or routing.
---
## 1. Install Hermes Agent
@@ -31,86 +54,109 @@ After it finishes, reload your shell:
source ~/.bashrc # or source ~/.zshrc
```
## 2. Set Up a Provider
For detailed installation options, prerequisites, and troubleshooting, see the [Installation guide](./installation.md).
The installer configures your LLM provider automatically. To change it later, use one of these commands:
## 2. Choose a Provider
The single most important setup step. Use `hermes model` to walk through the choice interactively:
```bash
hermes model # Choose your LLM provider and model
hermes tools # Configure which tools are enabled
hermes setup # Or configure everything at once
hermes model
```
`hermes model` walks you through selecting an inference provider:
Good defaults:
| Provider | What it is | How to set up |
|----------|-----------|---------------|
| **Nous Portal** | Subscription-based, zero-config | OAuth login via `hermes model` |
| **OpenAI Codex** | ChatGPT OAuth, uses Codex models | Device code auth via `hermes model` |
| **Anthropic** | Claude models directly (Pro/Max or API key) | `hermes model` with Claude Code auth, or an Anthropic API key |
| **OpenRouter** | Multi-provider routing across many models | Enter your API key |
| **Z.AI** | GLM / Zhipu-hosted models | Set `GLM_API_KEY` / `ZAI_API_KEY` |
| **Kimi / Moonshot** | Moonshot-hosted coding and chat models | Set `KIMI_API_KEY` |
| **Kimi / Moonshot China** | China-region Moonshot endpoint | Set `KIMI_CN_API_KEY` |
| **Arcee AI** | Trinity models | Set `ARCEEAI_API_KEY` |
| **Xiaomi MiMo** | Xiaomi MiMo models via [platform.xiaomimimo.com](https://platform.xiaomimimo.com) | Set `XIAOMI_API_KEY` |
| **AWS Bedrock** | Anthropic Claude, Amazon Nova, DeepSeek v3.2, and Meta Llama via AWS | Standard boto3 auth (`AWS_PROFILE` or `AWS_ACCESS_KEY_ID` + `AWS_REGION`) |
| **Qwen Portal (OAuth)** | Qwen 3.5 / Qwen-Coder models via Alibaba's consumer Qwen Portal | OAuth via `hermes model` (optional: `HERMES_QWEN_BASE_URL`) |
| **MiniMax** | International MiniMax endpoint | Set `MINIMAX_API_KEY` |
| **MiniMax China** | China-region MiniMax endpoint | Set `MINIMAX_CN_API_KEY` |
| **Alibaba Cloud** | Qwen models via DashScope | Set `DASHSCOPE_API_KEY` |
| **Hugging Face** | 20+ open models via unified router (Qwen, DeepSeek, Kimi, etc.) | Set `HF_TOKEN` |
| **Kilo Code** | KiloCode-hosted models | Set `KILOCODE_API_KEY` |
| **OpenCode Zen** | Pay-as-you-go access to curated models | Set `OPENCODE_ZEN_API_KEY` |
| **OpenCode Go** | $10/month subscription for open models | Set `OPENCODE_GO_API_KEY` |
| **DeepSeek** | Direct DeepSeek API access | Set `DEEPSEEK_API_KEY` |
| **NVIDIA NIM** | Nemotron models via build.nvidia.com or local NIM | Set `NVIDIA_API_KEY` (optional: `NVIDIA_BASE_URL`) |
| **Ollama Cloud** | Managed Ollama catalog without local GPU | Set `OLLAMA_API_KEY` (or pick **Ollama Cloud** in `hermes model`) |
| **Google Gemini (OAuth)** | Gemini via Cloud Code Assist — free and paid tiers | OAuth via `hermes model` (optional: `HERMES_GEMINI_PROJECT_ID` for paid tiers) |
| **xAI (Grok)** | Grok 4 models via Responses API + prompt caching | Set `XAI_API_KEY` (alias: `grok`) |
| **GitHub Copilot** | GitHub Copilot subscription (GPT-5.x, Claude, Gemini, etc.) | OAuth via `hermes model`, or `COPILOT_GITHUB_TOKEN` / `GH_TOKEN` |
| **GitHub Copilot ACP** | Copilot ACP agent backend (spawns local `copilot` CLI) | `hermes model` (requires `copilot` CLI + `copilot login`) |
| **Vercel AI Gateway** | Vercel AI Gateway routing | Set `AI_GATEWAY_API_KEY` |
| **Custom Endpoint** | VLLM, SGLang, Ollama, or any OpenAI-compatible API | Set base URL + API key |
| Situation | Recommended path |
|---|---|
| Least friction | Nous Portal or OpenRouter |
| You already have Claude or Codex auth | Anthropic or OpenAI Codex |
| You want local/private inference | Ollama or any custom OpenAI-compatible endpoint |
| You want multi-provider routing | OpenRouter |
| You have a custom GPU server | vLLM, SGLang, LiteLLM, or any OpenAI-compatible endpoint |
For most first-time users: choose a provider, accept the defaults unless you know why you're changing them. The full provider catalog with env vars and setup steps lives on the [Providers](../integrations/providers.md) page.
:::caution Minimum context: 64K tokens
Hermes Agent requires a model with at least **64,000 tokens** of context. Models with smaller windows cannot maintain enough working memory for multi-step tool-calling workflows and will be rejected at startup. Most hosted models (Claude, GPT, Gemini, Qwen, DeepSeek) meet this easily. If you're running a local model, set its context size to at least 64K (e.g. `--ctx-size 65536` for llama.cpp or `-c 65536` for Ollama).
:::
:::tip
You can switch providers at any time with `hermes model` — no code changes, no lock-in. When configuring a custom endpoint, Hermes will prompt for the context window size and auto-detect it when possible. See [Context Length Detection](../integrations/providers.md#context-length-detection) for details.
You can switch providers at any time with `hermes model` — no lock-in. For a full list of all supported providers and setup details, see [AI Providers](../integrations/providers.md).
:::
## 3. Start Chatting
### How settings are stored
Hermes separates secrets from normal config:
- **Secrets and tokens**`~/.hermes/.env`
- **Non-secret settings**`~/.hermes/config.yaml`
The easiest way to set values correctly is through the CLI:
```bash
hermes config set model anthropic/claude-opus-4.6
hermes config set terminal.backend docker
hermes config set OPENROUTER_API_KEY sk-or-...
```
The right value goes to the right file automatically.
## 3. Run Your First Chat
```bash
hermes # classic CLI
hermes --tui # modern TUI (recommended)
```
That's it! You'll see a welcome banner with your model, available tools, and skills. Type a message and press Enter.
You'll see a welcome banner with your model, available tools, and skills. Use a prompt that's specific and easy to verify:
:::tip Pick your interface
Hermes ships with two terminal interfaces: the classic `prompt_toolkit` CLI and a newer [TUI](../user-guide/tui.md) with modal overlays, mouse selection, and non-blocking input. Both share the same sessions, slash commands, and config — try each with `hermes` vs `hermes --tui`.
:::
```
What can you help me with?
Summarize this repo in 5 bullets and tell me what the main entrypoint is.
```
The agent has access to tools for web search, file operations, terminal commands, and more — all out of the box.
```
Check my current directory and tell me what looks like the main project file.
```
## 4. Try Key Features
```
Help me set up a clean GitHub PR workflow for this codebase.
```
### Ask it to use the terminal
**What success looks like:**
- The banner shows your chosen model/provider
- Hermes replies without error
- It can use a tool if needed (terminal, file read, web search)
- The conversation continues normally for more than one turn
If that works, you're past the hardest part.
## 4. Verify Sessions Work
Before moving on, make sure resume works:
```bash
hermes --continue # Resume the most recent session
hermes -c # Short form
```
That should bring you back to the session you just had. If it doesn't, check whether you're in the same profile and whether the session actually saved. This matters later when you're juggling multiple setups or machines.
## 5. Try Key Features
### Use the terminal
```
What's my disk usage? Show the top 5 largest directories.
```
The agent will run terminal commands on your behalf and show you the results.
The agent runs terminal commands on your behalf and shows results.
### Use slash commands
### Slash commands
Type `/` to see an autocomplete dropdown of all commands:
@@ -128,22 +174,27 @@ Press `Alt+Enter` or `Ctrl+J` to add a new line. Great for pasting code or writi
### Interrupt the agent
If the agent is taking too long, just type a new message and press Enter — it interrupts the current task and switches to your new instructions. `Ctrl+C` also works.
If the agent is taking too long, type a new message and press Enter — it interrupts the current task and switches to your new instructions. `Ctrl+C` also works.
### Resume a session
## 6. Add the Next Layer
When you exit, hermes prints a resume command:
Only after the base chat works. Pick what you need:
### Bot or shared assistant
```bash
hermes --continue # Resume the most recent session
hermes -c # Short form
hermes gateway setup # Interactive platform configuration
```
## 5. Explore Further
Connect [Telegram](/docs/user-guide/messaging/telegram), [Discord](/docs/user-guide/messaging/discord), [Slack](/docs/user-guide/messaging/slack), [WhatsApp](/docs/user-guide/messaging/whatsapp), [Signal](/docs/user-guide/messaging/signal), [Email](/docs/user-guide/messaging/email), or [Home Assistant](/docs/user-guide/messaging/homeassistant).
Here are some things to try next:
### Automation and tools
### Set up a sandboxed terminal
- `hermes tools` — tune tool access per platform
- `hermes skills` — browse and install reusable workflows
- Cron — only after your bot or CLI setup is stable
### Sandboxed terminal
For safety, run the agent in a Docker container or on a remote server:
@@ -152,71 +203,25 @@ hermes config set terminal.backend docker # Docker isolation
hermes config set terminal.backend ssh # Remote server
```
### Connect messaging platforms
Chat with Hermes from your phone or other surfaces via Telegram, Discord, Slack, WhatsApp, Signal, Email, or Home Assistant:
```bash
hermes gateway setup # Interactive platform configuration
```
### Add voice mode
Want microphone input in the CLI or spoken replies in messaging?
### Voice mode
```bash
pip install "hermes-agent[voice]"
# Includes faster-whisper for free local speech-to-text
```
Then start Hermes and enable it inside the CLI:
Then in the CLI: `/voice on`. Press `Ctrl+B` to record. See [Voice Mode](../user-guide/features/voice-mode.md).
```text
/voice on
```
Press `Ctrl+B` to record, or use `/voice tts` to have Hermes speak its replies. See [Voice Mode](../user-guide/features/voice-mode.md) for the full setup across CLI, Telegram, Discord, and Discord voice channels.
### Schedule automated tasks
```
Every morning at 9am, check Hacker News for AI news and send me a summary on Telegram.
```
The agent will set up a cron job that runs automatically via the gateway.
### Browse and install skills
### Skills
```bash
hermes skills search kubernetes
hermes skills search react --source skills-sh
hermes skills search https://mintlify.com/docs --source well-known
hermes skills install openai/skills/k8s
hermes skills install official/security/1password
hermes skills install skills-sh/vercel-labs/json-render/json-render-react --force
```
Tips:
- Use `--source skills-sh` to search the public `skills.sh` directory.
- Use `--source well-known` with a docs/site URL to discover skills from `/.well-known/skills/index.json`.
- Use `--force` only after reviewing a third-party skill. It can override non-dangerous policy blocks, but not a `dangerous` scan verdict.
Or use `/skills` inside a chat session.
Or use the `/skills` slash command inside chat.
### Use Hermes inside an editor via ACP
Hermes can also run as an ACP server for ACP-compatible editors like VS Code, Zed, and JetBrains:
```bash
pip install -e '.[acp]'
hermes acp
```
See [ACP Editor Integration](../user-guide/features/acp.md) for setup details.
### Try MCP servers
Connect to external tools via the Model Context Protocol:
### MCP servers
```yaml
# Add to ~/.hermes/config.yaml
@@ -228,6 +233,43 @@ mcp_servers:
GITHUB_PERSONAL_ACCESS_TOKEN: "ghp_xxx"
```
### Editor integration (ACP)
```bash
pip install -e '.[acp]'
hermes acp
```
See [ACP Editor Integration](../user-guide/features/acp.md).
---
## Common Failure Modes
These are the problems that waste the most time:
| Symptom | Likely cause | Fix |
|---|---|---|
| Hermes opens but gives empty or broken replies | Provider auth or model selection is wrong | Run `hermes model` again and confirm provider, model, and auth |
| Custom endpoint "works" but returns garbage | Wrong base URL, model name, or not actually OpenAI-compatible | Verify the endpoint in a separate client first |
| Gateway starts but nobody can message it | Bot token, allowlist, or platform setup is incomplete | Re-run `hermes gateway setup` and check `hermes gateway status` |
| `hermes --continue` can't find old session | Switched profiles or session never saved | Check `hermes sessions list` and confirm you're in the right profile |
| Model unavailable or odd fallback behavior | Provider routing or fallback settings are too aggressive | Keep routing off until the base provider is stable |
| `hermes doctor` flags config problems | Config values are missing or stale | Fix the config, retest a plain chat before adding features |
## Recovery Toolkit
When something feels off, use this order:
1. `hermes doctor`
2. `hermes model`
3. `hermes setup`
4. `hermes sessions list`
5. `hermes --continue`
6. `hermes gateway status`
That sequence gets you from "broken vibes" back to a known state fast.
---
## Quick Reference
@@ -249,3 +291,6 @@ mcp_servers:
- **[Configuration](../user-guide/configuration.md)** — Customize your setup
- **[Messaging Gateway](../user-guide/messaging/index.md)** — Connect Telegram, Discord, Slack, WhatsApp, Signal, Email, or Home Assistant
- **[Tools & Toolsets](../user-guide/features/tools.md)** — Explore available capabilities
- **[AI Providers](../integrations/providers.md)** — Full provider list and setup details
- **[Skills System](../user-guide/features/skills.md)** — Reusable workflows and knowledge
- **[Tips & Best Practices](../guides/tips.md)** — Power user tips
@@ -0,0 +1,300 @@
---
sidebar_position: 10
title: "Tutorial: GitHub PR Review Agent"
description: "Build an automated AI code reviewer that monitors your repos, reviews pull requests, and delivers feedback — hands-free"
---
# Tutorial: Build a GitHub PR Review Agent
**The problem:** Your team opens PRs faster than you can review them. PRs sit for days waiting for eyeballs. Junior devs merge bugs because nobody had time to check. You spend your mornings catching up on diffs instead of building.
**The solution:** An AI agent that watches your repos around the clock, reviews every new PR for bugs, security issues, and code quality, and sends you a summary — so you only spend time on PRs that actually need human judgment.
**What you'll build:**
```
┌──────────────┐ ┌───────────────┐ ┌──────────────┐ ┌──────────────┐
│ Cron Timer │────▶│ Hermes Agent │────▶│ GitHub API │────▶│ Review to │
│ (every 2h) │ │ + gh CLI │ │ (PR diffs) │ │ Telegram/ │
│ │ │ + skill │ │ │ │ Discord/ │
│ │ │ + memory │ │ │ │ local file │
└──────────────┘ └───────────────┘ └──────────────┘ └──────────────┘
```
This guide uses **cron jobs** to poll for PRs on a schedule — no server or public endpoint needed. Works behind NAT and firewalls.
:::tip Want real-time reviews instead?
If you have a public endpoint available, check out [Automated GitHub PR Comments with Webhooks](./webhook-github-pr-review.md) — GitHub pushes events to Hermes instantly when PRs are opened or updated.
:::
---
## Prerequisites
- **Hermes Agent installed** — see the [Installation guide](/docs/getting-started/installation)
- **Gateway running** for cron jobs:
```bash
hermes gateway install # Install as a service
# or
hermes gateway # Run in foreground
```
- **GitHub CLI (`gh`) installed and authenticated**:
```bash
# Install
brew install gh # macOS
sudo apt install gh # Ubuntu/Debian
# Authenticate
gh auth login
```
- **Messaging configured** (optional) — [Telegram](/docs/user-guide/messaging/telegram) or [Discord](/docs/user-guide/messaging/discord)
:::tip No messaging? No problem
Use `deliver: "local"` to save reviews to `~/.hermes/cron/output/`. Great for testing before wiring up notifications.
:::
---
## Step 1: Verify the Setup
Make sure Hermes can access GitHub. Start a chat:
```bash
hermes
```
Test with a simple command:
```
Run: gh pr list --repo NousResearch/hermes-agent --state open --limit 3
```
You should see a list of open PRs. If this works, you're ready.
---
## Step 2: Try a Manual Review
Still in the chat, ask Hermes to review a real PR:
```
Review this pull request. Read the diff, check for bugs, security issues,
and code quality. Be specific about line numbers and quote problematic code.
Run: gh pr diff 3888 --repo NousResearch/hermes-agent
```
Hermes will:
1. Execute `gh pr diff` to fetch the code changes
2. Read through the entire diff
3. Produce a structured review with specific findings
If you're happy with the quality, time to automate it.
---
## Step 3: Create a Review Skill
A skill gives Hermes consistent review guidelines that persist across sessions and cron runs. Without one, review quality varies.
```bash
mkdir -p ~/.hermes/skills/code-review
```
Create `~/.hermes/skills/code-review/SKILL.md`:
```markdown
---
name: code-review
description: Review pull requests for bugs, security issues, and code quality
---
# Code Review Guidelines
When reviewing a pull request:
## What to Check
1. **Bugs** — Logic errors, off-by-one, null/undefined handling
2. **Security** — Injection, auth bypass, secrets in code, SSRF
3. **Performance** — N+1 queries, unbounded loops, memory leaks
4. **Style** — Naming conventions, dead code, missing error handling
5. **Tests** — Are changes tested? Do tests cover edge cases?
## Output Format
For each finding:
- **File:Line** — exact location
- **Severity** — Critical / Warning / Suggestion
- **What's wrong** — one sentence
- **Fix** — how to fix it
## Rules
- Be specific. Quote the problematic code.
- Don't flag style nitpicks unless they affect readability.
- If the PR looks good, say so. Don't invent problems.
- End with: APPROVE / REQUEST_CHANGES / COMMENT
```
Verify it loaded — start `hermes` and you should see `code-review` in the skills list at startup.
---
## Step 4: Teach It Your Conventions
This is what makes the reviewer actually useful. Start a session and teach Hermes your team's standards:
```
Remember: In our backend repo, we use Python with FastAPI.
All endpoints must have type annotations and Pydantic models.
We don't allow raw SQL — only SQLAlchemy ORM.
Test files go in tests/ and must use pytest fixtures.
```
```
Remember: In our frontend repo, we use TypeScript with React.
No `any` types allowed. All components must have props interfaces.
We use React Query for data fetching, never useEffect for API calls.
```
These memories persist forever — the reviewer will enforce your conventions without being told each time.
---
## Step 5: Create the Automated Cron Job
Now wire it all together. Create a cron job that runs every 2 hours:
```bash
hermes cron create "0 */2 * * *" \
"Check for new open PRs and review them.
Repos to monitor:
- myorg/backend-api
- myorg/frontend-app
Steps:
1. Run: gh pr list --repo REPO --state open --limit 5 --json number,title,author,createdAt
2. For each PR created or updated in the last 4 hours:
- Run: gh pr diff NUMBER --repo REPO
- Review the diff using the code-review guidelines
3. Format output as:
## PR Reviews — today
### [repo] #[number]: [title]
**Author:** [name] | **Verdict:** APPROVE/REQUEST_CHANGES/COMMENT
[findings]
If no new PRs found, say: No new PRs to review." \
--name "pr-review" \
--deliver telegram \
--skill code-review
```
Verify it's scheduled:
```bash
hermes cron list
```
### Other useful schedules
| Schedule | When |
|----------|------|
| `0 */2 * * *` | Every 2 hours |
| `0 9,13,17 * * 1-5` | Three times a day, weekdays only |
| `0 9 * * 1` | Weekly Monday morning roundup |
| `30m` | Every 30 minutes (high-traffic repos) |
---
## Step 6: Run It On Demand
Don't want to wait for the schedule? Trigger it manually:
```bash
hermes cron run pr-review
```
Or from within a chat session:
```
/cron run pr-review
```
---
## Going Further
### Post Reviews Directly to GitHub
Instead of delivering to Telegram, have the agent comment on the PR itself:
Add this to your cron prompt:
```
After reviewing, post your review:
- For issues: gh pr review NUMBER --repo REPO --comment --body "YOUR_REVIEW"
- For critical issues: gh pr review NUMBER --repo REPO --request-changes --body "YOUR_REVIEW"
- For clean PRs: gh pr review NUMBER --repo REPO --approve --body "Looks good"
```
:::caution
Make sure `gh` has a token with `repo` scope. Reviews are posted as whoever `gh` is authenticated as.
:::
### Weekly PR Dashboard
Create a Monday morning overview of all your repos:
```bash
hermes cron create "0 9 * * 1" \
"Generate a weekly PR dashboard:
- myorg/backend-api
- myorg/frontend-app
- myorg/infra
For each repo show:
1. Open PR count and oldest PR age
2. PRs merged this week
3. Stale PRs (older than 5 days)
4. PRs with no reviewer assigned
Format as a clean summary." \
--name "weekly-dashboard" \
--deliver telegram
```
### Multi-Repo Monitoring
Scale up by adding more repos to the prompt. The agent processes them sequentially — no extra setup needed.
---
## Troubleshooting
### "gh: command not found"
The gateway runs in a minimal environment. Ensure `gh` is in the system PATH and restart the gateway.
### Reviews are too generic
1. Add the `code-review` skill (Step 3)
2. Teach Hermes your conventions via memory (Step 4)
3. The more context it has about your stack, the better the reviews
### Cron job doesn't run
```bash
hermes gateway status # Is the gateway running?
hermes cron list # Is the job enabled?
```
### Rate limits
GitHub allows 5,000 API requests/hour for authenticated users. Each PR review uses ~3-5 requests (list + diff + optional comments). Even reviewing 100 PRs/day stays well within limits.
---
## What's Next?
- **[Webhook-Based PR Reviews](./webhook-github-pr-review.md)** — get instant reviews when PRs are opened (requires a public endpoint)
- **[Daily Briefing Bot](/docs/guides/daily-briefing-bot)** — combine PR reviews with your morning news digest
- **[Build a Plugin](/docs/guides/build-a-hermes-plugin)** — wrap the review logic into a shareable plugin
- **[Profiles](/docs/user-guide/profiles)** — run a dedicated reviewer profile with its own memory and config
- **[Fallback Providers](/docs/user-guide/features/fallback-providers)** — ensure reviews run even when one provider is down
@@ -0,0 +1,329 @@
---
sidebar_position: 11
sidebar_label: "GitHub PR Reviews via Webhook"
title: "Automated GitHub PR Comments with Webhooks"
description: "Connect Hermes to GitHub so it automatically fetches PR diffs, reviews code changes, and posts comments — triggered by webhooks with no manual prompting"
---
# Automated GitHub PR Comments with Webhooks
This guide walks you through connecting Hermes Agent to GitHub so it automatically fetches a pull request's diff, analyzes the code changes, and posts a comment — triggered by a webhook event with no manual prompting.
When a PR is opened or updated, GitHub sends a webhook POST to your Hermes instance. Hermes runs the agent with a prompt that instructs it to retrieve the diff via the `gh` CLI, and the response is posted back to the PR thread.
:::tip Want a simpler setup without a public endpoint?
If you don't have a public URL or just want to get started quickly, check out [Build a GitHub PR Review Agent](./github-pr-review-agent.md) — uses cron jobs to poll for PRs on a schedule, works behind NAT and firewalls.
:::
:::info Reference docs
For the full webhook platform reference (all config options, delivery types, dynamic subscriptions, security model) see [Webhooks](/docs/user-guide/messaging/webhooks).
:::
:::warning Prompt injection risk
Webhook payloads contain attacker-controlled data — PR titles, commit messages, and descriptions can contain malicious instructions. When your webhook endpoint is exposed to the internet, run the gateway in a sandboxed environment (Docker, SSH backend). See the [security section](#security-notes) below.
:::
---
## Prerequisites
- Hermes Agent installed and running (`hermes gateway`)
- [`gh` CLI](https://cli.github.com/) installed and authenticated on the gateway host (`gh auth login`)
- A publicly reachable URL for your Hermes instance (see [Local testing with ngrok](#local-testing-with-ngrok) if running locally)
- Admin access to the GitHub repository (required to manage webhooks)
---
## Step 1 — Enable the webhook platform
Add the following to your `~/.hermes/config.yaml`:
```yaml
platforms:
webhook:
enabled: true
extra:
port: 8644 # default; change if another service occupies this port
rate_limit: 30 # max requests per minute per route (not a global cap)
routes:
github-pr-review:
secret: "your-webhook-secret-here" # must match the GitHub webhook secret exactly
events:
- pull_request
# The agent is instructed to fetch the actual diff before reviewing.
# {number} and {repository.full_name} are resolved from the GitHub payload.
prompt: |
A pull request event was received (action: {action}).
PR #{number}: {pull_request.title}
Author: {pull_request.user.login}
Branch: {pull_request.head.ref} → {pull_request.base.ref}
Description: {pull_request.body}
URL: {pull_request.html_url}
If the action is "closed" or "labeled", stop here and do not post a comment.
Otherwise:
1. Run: gh pr diff {number} --repo {repository.full_name}
2. Review the code changes for correctness, security issues, and clarity.
3. Write a concise, actionable review comment and post it.
deliver: github_comment
deliver_extra:
repo: "{repository.full_name}"
pr_number: "{number}"
```
**Key fields:**
| Field | Description |
|---|---|
| `secret` (route-level) | HMAC secret for this route. Falls back to `extra.secret` global if omitted. |
| `events` | List of `X-GitHub-Event` header values to accept. Empty list = accept all. |
| `prompt` | Template; `{field}` and `{nested.field}` resolve from the GitHub payload. |
| `deliver` | `github_comment` posts via `gh pr comment`. `log` just writes to the gateway log. |
| `deliver_extra.repo` | Resolves to e.g. `org/repo` from the payload. |
| `deliver_extra.pr_number` | Resolves to the PR number from the payload. |
:::note The payload does not contain code
The GitHub webhook payload includes PR metadata (title, description, branch names, URLs) but **not the diff**. The prompt above instructs the agent to run `gh pr diff` to fetch the actual changes. The `terminal` tool is included in the default `hermes-webhook` toolset, so no extra configuration is needed.
:::
---
## Step 2 — Start the gateway
```bash
hermes gateway
```
You should see:
```
[webhook] Listening on 0.0.0.0:8644 — routes: github-pr-review
```
Verify it's running:
```bash
curl http://localhost:8644/health
# {"status": "ok", "platform": "webhook"}
```
---
## Step 3 — Register the webhook on GitHub
1. Go to your repository → **Settings****Webhooks** → **Add webhook**
2. Fill in:
- **Payload URL:** `https://your-public-url.example.com/webhooks/github-pr-review`
- **Content type:** `application/json`
- **Secret:** the same value you set for `secret` in the route config
- **Which events?** → Select individual events → check **Pull requests**
3. Click **Add webhook**
GitHub will immediately send a `ping` event to confirm the connection. It is safely ignored — `ping` is not in your `events` list — and returns `{"status": "ignored", "event": "ping"}`. It is only logged at DEBUG level, so it won't appear in the console at the default log level.
---
## Step 4 — Open a test PR
Create a branch, push a change, and open a PR. Within 3090 seconds (depending on PR size and model), Hermes should post a review comment.
To follow the agent's progress in real time:
```bash
tail -f "${HERMES_HOME:-$HOME/.hermes}/logs/gateway.log"
```
---
## Local testing with ngrok
If Hermes is running on your laptop, use [ngrok](https://ngrok.com/) to expose it:
```bash
ngrok http 8644
```
Copy the `https://...ngrok-free.app` URL and use it as your GitHub Payload URL. On the free ngrok tier the URL changes each time ngrok restarts — update your GitHub webhook each session. Paid ngrok accounts get a static domain.
You can smoke-test a static route directly with `curl` — no GitHub account or real PR needed.
:::tip Use `deliver: log` when testing locally
Change `deliver: github_comment` to `deliver: log` in your config while testing. Otherwise the agent will attempt to post a comment to the fake `org/repo#99` repo in the test payload, which will fail. Switch back to `deliver: github_comment` once you're satisfied with the prompt output.
:::
```bash
SECRET="your-webhook-secret-here"
BODY='{"action":"opened","number":99,"pull_request":{"title":"Test PR","body":"Adds a feature.","user":{"login":"testuser"},"head":{"ref":"feat/x"},"base":{"ref":"main"},"html_url":"https://github.com/org/repo/pull/99"},"repository":{"full_name":"org/repo"}}'
SIG=$(printf '%s' "$BODY" | openssl dgst -sha256 -hmac "$SECRET" -hex | awk '{print "sha256="$2}')
curl -s -X POST http://localhost:8644/webhooks/github-pr-review \
-H "Content-Type: application/json" \
-H "X-GitHub-Event: pull_request" \
-H "X-Hub-Signature-256: $SIG" \
-d "$BODY"
# Expected: {"status":"accepted","route":"github-pr-review","event":"pull_request","delivery_id":"..."}
```
Then watch the agent run:
```bash
tail -f "${HERMES_HOME:-$HOME/.hermes}/logs/gateway.log"
```
:::note
`hermes webhook test <name>` only works for **dynamic subscriptions** created with `hermes webhook subscribe`. It does not read routes from `config.yaml`.
:::
---
## Filtering to specific actions
GitHub sends `pull_request` events for many actions: `opened`, `synchronize`, `reopened`, `closed`, `labeled`, etc. The `events` list filters only by the `X-GitHub-Event` header value — it cannot filter by action sub-type at the routing level.
The prompt in Step 1 already handles this by instructing the agent to stop early for `closed` and `labeled` events.
:::warning The agent still runs and consumes tokens
The "stop here" instruction prevents a meaningful review, but the agent still runs to completion for every `pull_request` event regardless of action. GitHub webhooks can only filter by event type (`pull_request`, `push`, `issues`, etc.) — not by action sub-type (`opened`, `closed`, `labeled`). There is no routing-level filter for sub-actions. For high-volume repos, accept this cost or filter upstream with a GitHub Actions workflow that calls your webhook URL conditionally.
:::
> There is no Jinja2 or conditional template syntax. `{field}` and `{nested.field}` are the only substitutions supported. Anything else is passed verbatim to the agent.
---
## Using a skill for consistent review style
Load a [Hermes skill](/docs/user-guide/features/skills) to give the agent a consistent review persona. Add `skills` to your route inside `platforms.webhook.extra.routes` in `config.yaml`:
```yaml
platforms:
webhook:
enabled: true
extra:
routes:
github-pr-review:
secret: "your-webhook-secret-here"
events: [pull_request]
prompt: |
A pull request event was received (action: {action}).
PR #{number}: {pull_request.title} by {pull_request.user.login}
URL: {pull_request.html_url}
If the action is "closed" or "labeled", stop here and do not post a comment.
Otherwise:
1. Run: gh pr diff {number} --repo {repository.full_name}
2. Review the diff using your review guidelines.
3. Write a concise, actionable review comment and post it.
skills:
- review
deliver: github_comment
deliver_extra:
repo: "{repository.full_name}"
pr_number: "{number}"
```
> **Note:** Only the first skill in the list that is found is loaded. Hermes does not stack multiple skills — subsequent entries are ignored.
---
## Sending responses to Slack or Discord instead
Replace the `deliver` and `deliver_extra` fields inside your route with your target platform:
```yaml
# Inside platforms.webhook.extra.routes.<route-name>:
# Slack
deliver: slack
deliver_extra:
chat_id: "C0123456789" # Slack channel ID (omit to use the configured home channel)
# Discord
deliver: discord
deliver_extra:
chat_id: "987654321012345678" # Discord channel ID (omit to use home channel)
```
The target platform must also be enabled and connected in the gateway. If `chat_id` is omitted, the response is sent to that platform's configured home channel.
Valid `deliver` values: `log` · `github_comment` · `telegram` · `discord` · `slack` · `signal` · `sms`
---
## GitLab support
The same adapter works with GitLab. GitLab uses `X-Gitlab-Token` for authentication (plain string match, not HMAC) — Hermes handles both automatically.
For event filtering, GitLab sets `X-GitLab-Event` to values like `Merge Request Hook`, `Push Hook`, `Pipeline Hook`. Use the exact header value in `events`:
```yaml
events:
- Merge Request Hook
```
GitLab payload fields differ from GitHub's — e.g. `{object_attributes.title}` for the MR title and `{object_attributes.iid}` for the MR number. The easiest way to discover the full payload structure is GitLab's **Test** button in your webhook settings, combined with the **Recent Deliveries** log. Alternatively, omit `prompt` from your route config — Hermes will then pass the full payload as formatted JSON directly to the agent, and the agent's response (visible in the gateway log with `deliver: log`) will describe its structure.
---
## Security notes
- **Never use `INSECURE_NO_AUTH`** in production — it disables signature validation entirely. It is only for local development.
- **Rotate your webhook secret** periodically and update it in both GitHub (webhook settings) and your `config.yaml`.
- **Rate limiting** is 30 req/min per route by default (configurable via `extra.rate_limit`). Exceeding it returns `429`.
- **Duplicate deliveries** (webhook retries) are deduplicated via a 1-hour idempotency cache. The cache key is `X-GitHub-Delivery` if present, then `X-Request-ID`, then a millisecond timestamp. When neither delivery ID header is set, retries are **not** deduplicated.
- **Prompt injection:** PR titles, descriptions, and commit messages are attacker-controlled. Malicious PRs could attempt to manipulate the agent's actions. Run the gateway in a sandboxed environment (Docker, VM) when exposed to the public internet.
---
## Troubleshooting
| Symptom | Check |
|---|---|
| `401 Invalid signature` | Secret in config.yaml doesn't match GitHub webhook secret |
| `404 Unknown route` | Route name in the URL doesn't match the key in `routes:` |
| `429 Rate limit exceeded` | 30 req/min per route exceeded — common when re-delivering test events from GitHub's UI; wait a minute or raise `extra.rate_limit` |
| No comment posted | `gh` not installed, not on PATH, or not authenticated (`gh auth login`) |
| Agent runs but no comment | Check the gateway log — if the agent output was empty or just "SKIP", delivery is still attempted |
| Port already in use | Change `extra.port` in config.yaml |
| Agent runs but reviews only the PR description | The prompt isn't including the `gh pr diff` instruction — the diff is not in the webhook payload |
| Can't see the ping event | Ignored events return `{"status":"ignored","event":"ping"}` at DEBUG log level only — check GitHub's delivery log (repo → Settings → Webhooks → your webhook → Recent Deliveries) |
**GitHub's Recent Deliveries tab** (repo → Settings → Webhooks → your webhook) shows the exact request headers, payload, HTTP status, and response body for every delivery. It is the fastest way to diagnose failures without touching your server logs.
---
## Full config reference
```yaml
platforms:
webhook:
enabled: true
extra:
host: "0.0.0.0" # bind address (default: 0.0.0.0)
port: 8644 # listen port (default: 8644)
secret: "" # optional global fallback secret
rate_limit: 30 # requests per minute per route
max_body_bytes: 1048576 # payload size limit in bytes (default: 1 MB)
routes:
<route-name>:
secret: "required-per-route"
events: [] # [] = accept all; otherwise list X-GitHub-Event values
prompt: "" # {field} / {nested.field} resolved from payload
skills: [] # first matching skill is loaded (only one)
deliver: "log" # log | github_comment | telegram | discord | slack | signal | sms
deliver_extra: {} # repo + pr_number for github_comment; chat_id for others
```
---
## What's Next?
- **[Cron-Based PR Reviews](./github-pr-review-agent.md)** — poll for PRs on a schedule, no public endpoint needed
- **[Webhook Reference](/docs/user-guide/messaging/webhooks)** — full config reference for the webhook platform
- **[Build a Plugin](/docs/guides/build-a-hermes-plugin)** — package review logic into a shareable plugin
- **[Profiles](/docs/user-guide/profiles)** — run a dedicated reviewer profile with its own memory and config
+14
View File
@@ -38,6 +38,20 @@ You need at least one way to connect to an LLM. Use `hermes model` to switch pro
| **Google Gemini (OAuth)** | `hermes model` → "Google Gemini (OAuth)" (provider: `google-gemini-cli`, free tier supported, browser PKCE login) |
| **Custom Endpoint** | `hermes model` → choose "Custom endpoint" (saved in `config.yaml`) |
:::info Google AI Studio transport
The built-in `gemini` provider now uses Google's native Gemini API (`https://generativelanguage.googleapis.com/v1beta`) by default.
If you specifically want Google's OpenAI-compatible route instead, set an explicit base URL:
```yaml
model:
provider: gemini
base_url: https://generativelanguage.googleapis.com/v1beta/openai
```
That explicit `base_url` is preserved and Hermes will stay on the OpenAI-compatible transport for both the main runtime and auxiliary tasks.
:::
:::tip Model key alias
In the `model:` config section, you can use either `default:` or `model:` as the key name for your model ID. Both `model: { default: my-model }` and `model: { model: my-model }` work identically.
:::
@@ -48,7 +48,7 @@ All variables go in `~/.hermes/.env`. You can also set them with `hermes config
| `HF_BASE_URL` | Override Hugging Face base URL (default: `https://router.huggingface.co/v1`) |
| `GOOGLE_API_KEY` | Google AI Studio API key ([aistudio.google.com/app/apikey](https://aistudio.google.com/app/apikey)) |
| `GEMINI_API_KEY` | Alias for `GOOGLE_API_KEY` |
| `GEMINI_BASE_URL` | Override Google AI Studio base URL |
| `GEMINI_BASE_URL` | Override Google AI Studio base URL (native default: `https://generativelanguage.googleapis.com/v1beta`; set `/v1beta/openai` for Google's OpenAI-compatible route) |
| `HERMES_GEMINI_CLIENT_ID` | OAuth client ID for `google-gemini-cli` PKCE login (optional; defaults to Google's public gemini-cli client) |
| `HERMES_GEMINI_CLIENT_SECRET` | OAuth client secret for `google-gemini-cli` (optional) |
| `HERMES_GEMINI_PROJECT_ID` | GCP project ID for paid Gemini tiers (free tier auto-provisions) |
@@ -83,6 +83,7 @@ hermes skills uninstall <skill-name>
| Skill | Description |
|-------|-------------|
| **fastmcp** | Build, test, inspect, install, and deploy MCP servers with FastMCP in Python. Covers wrapping APIs or databases as MCP tools, exposing resources or prompts, and deployment. |
| **mcporter** | The `mcporter` CLI — list, configure, auth, and call MCP servers/tools directly (HTTP or stdio) from the terminal. Useful for ad-hoc MCP interactions; for always-on tool discovery use the built-in `native-mcp` client instead. |
## Migration
@@ -98,6 +99,7 @@ The largest optional category — covers the full ML pipeline from data curation
|-------|-------------|
| **accelerate** | Simplest distributed training API. 4 lines to add distributed support to any PyTorch script. Unified API for DeepSpeed/FSDP/Megatron/DDP. |
| **chroma** | Open-source embedding database. Store embeddings and metadata, perform vector and full-text search. Simple 4-function API for RAG and semantic search. |
| **clip** | OpenAI's vision-language model connecting images and text. Zero-shot image classification, image-text matching, and cross-modal retrieval. Trained on 400M image-text pairs. Use for image search, content moderation, or vision-language tasks without fine-tuning. |
| **faiss** | Facebook's library for efficient similarity search and clustering of dense vectors. Supports billions of vectors, GPU acceleration, and various index types (Flat, IVF, HNSW). |
| **flash-attention** | Optimize transformer attention with Flash Attention for 2-4x speedup and 10-20x memory reduction. Supports PyTorch SDPA, flash-attn library, H100 FP8, and sliding window. |
| **guidance** | Control LLM output with regex and grammars, guarantee valid JSON/XML/code generation, enforce structured formats, and build multi-step workflows with Guidance — Microsoft Research's constrained generation framework. |
@@ -106,15 +108,20 @@ The largest optional category — covers the full ML pipeline from data curation
| **instructor** | Extract structured data from LLM responses with Pydantic validation, retry failed extractions automatically, and stream partial results. |
| **lambda-labs** | Reserved and on-demand GPU cloud instances for ML training and inference. SSH access, persistent filesystems, and multi-node clusters. |
| **llava** | Large Language and Vision Assistant — visual instruction tuning and image-based conversations combining CLIP vision with LLaMA language models. |
| **modal** | Serverless GPU cloud platform for running ML workloads. On-demand GPU access without infrastructure management, ML model deployment as APIs, or batch jobs with automatic scaling. |
| **nemo-curator** | GPU-accelerated data curation for LLM training. Fuzzy deduplication (16x faster), quality filtering (30+ heuristics), semantic dedup, PII redaction. Scales with RAPIDS. |
| **peft-fine-tuning** | Parameter-efficient fine-tuning for LLMs using LoRA, QLoRA, and 25+ methods. Train <1% of parameters with minimal accuracy loss for 7B70B models on limited GPU memory. HuggingFace's official PEFT library. |
| **pinecone** | Managed vector database for production AI. Auto-scaling, hybrid search (dense + sparse), metadata filtering, and low latency (under 100ms p95). |
| **pytorch-fsdp** | Expert guidance for Fully Sharded Data Parallel training with PyTorch FSDP — parameter sharding, mixed precision, CPU offloading, FSDP2. |
| **pytorch-lightning** | High-level PyTorch framework with Trainer class, automatic distributed training (DDP/FSDP/DeepSpeed), callbacks, and minimal boilerplate. |
| **qdrant** | High-performance vector similarity search engine. Rust-powered with fast nearest neighbor search, hybrid search with filtering, and scalable vector storage. |
| **saelens** | Train and analyze Sparse Autoencoders (SAEs) using SAELens to decompose neural network activations into interpretable features. |
| **simpo** | Simple Preference Optimization — reference-free alternative to DPO with better performance (+6.4 pts on AlpacaEval 2.0). No reference model needed. |
| **slime** | LLM post-training with RL using Megatron+SGLang framework. Custom data generation workflows and tight Megatron-LM integration for RL scaling. |
| **stable-diffusion-image-generation** | State-of-the-art text-to-image generation with Stable Diffusion via HuggingFace Diffusers. Text-to-image, image-to-image translation, inpainting, and custom diffusion pipelines. |
| **tensorrt-llm** | Optimize LLM inference with NVIDIA TensorRT for maximum throughput. 10-100x faster than PyTorch on A100/H100 with quantization (FP8/INT4) and in-flight batching. |
| **torchtitan** | PyTorch-native distributed LLM pretraining with 4D parallelism (FSDP2, TP, PP, CP). Scale from 8 to 512+ GPUs with Float8 and torch.compile. |
| **whisper** | OpenAI's general-purpose speech recognition. 99 languages, transcription, translation to English, and language ID. Six model sizes from tiny (39M) to large (1550M). Best for robust multilingual ASR. |
## Productivity
@@ -81,6 +81,8 @@ Creates a new profile.
| `--clone-from <profile>` | Clone from a specific profile instead of the current one. Used with `--clone` or `--clone-all`. |
| `--no-alias` | Skip wrapper script creation. |
Creating a profile does **not** make that profile directory the default project/workspace directory for terminal commands. If you want a profile to start in a specific project, set `terminal.cwd` in that profile's `config.yaml`.
**Examples:**
```bash
@@ -129,6 +131,8 @@ hermes profile show <name>
Displays details about a profile including its home directory, configured model, gateway status, skills count, and configuration file status.
This shows the profile's Hermes home directory, not the terminal working directory. Terminal commands start from `terminal.cwd` (or the launch directory on the local backend when `cwd: "."`).
| Argument | Description |
|----------|-------------|
| `<name>` | Profile to inspect. |
+2 -23
View File
@@ -100,21 +100,12 @@ GitHub workflow skills for managing repositories, pull requests, code reviews, i
| `github-pr-workflow` | Full pull request lifecycle — create branches, commit changes, open PRs, monitor CI status, auto-fix failures, and merge. Works with gh CLI or falls back to git + GitHub REST API via curl. | `github/github-pr-workflow` |
| `github-repo-management` | Clone, create, fork, configure, and manage GitHub repositories. Manage remotes, secrets, releases, and workflows. Works with gh CLI or falls back to git + GitHub REST API via curl. | `github/github-repo-management` |
## leisure
Skills for discovery and everyday tasks.
| Skill | Description | Path |
|-------|-------------|------|
| `find-nearby` | Find nearby places (restaurants, cafes, bars, pharmacies, etc.) using OpenStreetMap. Works with coordinates, addresses, cities, zip codes, or Telegram location pins. No API keys needed. | `leisure/find-nearby` |
## mcp
Skills for working with MCP (Model Context Protocol) servers, tools, and integrations.
| Skill | Description | Path |
|-------|-------------|------|
| `mcporter` | Use the mcporter CLI to list, configure, auth, and call MCP servers/tools directly (HTTP or stdio), including ad-hoc servers, config edits, and CLI/type generation. | `mcp/mcporter` |
| `native-mcp` | Built-in MCP (Model Context Protocol) client that connects to external MCP servers, discovers their tools, and registers them as native Hermes Agent tools. Supports stdio and HTTP transports with automatic reconnection, security filtering, and zero-config tool injection. | `mcp/native-mcp` |
## media
@@ -136,14 +127,6 @@ General-purpose ML operations tools — model hub management, dataset operations
|-------|-------------|------|
| `huggingface-hub` | Hugging Face Hub CLI (hf) — search, download, and upload models and datasets, manage repos, query datasets with SQL, deploy inference endpoints, manage Spaces and buckets. | `mlops/huggingface-hub` |
## mlops/cloud
GPU cloud providers and serverless compute platforms for ML workloads.
| Skill | Description | Path |
|-------|-------------|------|
| `modal-serverless-gpu` | Serverless GPU cloud platform for running ML workloads. Use when you need on-demand GPU access without infrastructure management, deploying ML models as APIs, or running batch jobs with automatic scaling. | `mlops/cloud/modal` |
## mlops/evaluation
Model evaluation benchmarks, experiment tracking, and interpretability tools.
@@ -166,15 +149,12 @@ Model serving, quantization (GGUF/GPTQ), structured output, inference optimizati
## mlops/models
Specific model architectures — computer vision (CLIP, SAM, Stable Diffusion), speech (Whisper), and audio generation (AudioCraft).
Specific model architectures — image segmentation (SAM) and audio generation (AudioCraft / MusicGen). Additional model skills (CLIP, Stable Diffusion, Whisper, LLaVA) are available as optional skills.
| Skill | Description | Path |
|-------|-------------|------|
| `audiocraft-audio-generation` | PyTorch library for audio generation including text-to-music (MusicGen) and text-to-sound (AudioGen). Use when you need to generate music from text descriptions, create sound effects, or perform melody-conditioned music generation. | `mlops/models/audiocraft` |
| `clip` | OpenAI's model connecting vision and language. Enables zero-shot image classification, image-text matching, and cross-modal retrieval. Trained on 400M image-text pairs. Use for image search, content moderation, or vision-language tasks without fine-tuning. Best for general-pur… | `mlops/models/clip` |
| `segment-anything-model` | Foundation model for image segmentation with zero-shot transfer. Use when you need to segment any object in images using points, boxes, or masks as prompts, or automatically generate all object masks in an image. | `mlops/models/segment-anything` |
| `stable-diffusion-image-generation` | State-of-the-art text-to-image generation with Stable Diffusion models via HuggingFace Diffusers. Use when generating images from text prompts, performing image-to-image translation, inpainting, or building custom diffusion pipelines. | `mlops/models/stable-diffusion` |
| `whisper` | OpenAI's general-purpose speech recognition model. Supports 99 languages, transcription, translation to English, and language identification. Six model sizes from tiny (39M params) to large (1550M params). Use for speech-to-text, podcast transcription, or multilingual audio pr… | `mlops/models/whisper` |
## mlops/research
@@ -192,8 +172,6 @@ Fine-tuning, RLHF/DPO/GRPO training, distributed training frameworks, and optimi
|-------|-------------|------|
| `axolotl` | Expert guidance for fine-tuning LLMs with Axolotl - YAML configs, 100+ models, LoRA/QLoRA, DPO/KTO/ORPO/GRPO, multimodal support | `mlops/training/axolotl` |
| `fine-tuning-with-trl` | Fine-tune LLMs using reinforcement learning with TRL - SFT for instruction tuning, DPO for preference alignment, PPO/GRPO for reward optimization, and reward model training. Use when need RLHF, align model with preferences, or train from human feedback. Works with HuggingFace … | `mlops/training/trl-fine-tuning` |
| `peft-fine-tuning` | Parameter-efficient fine-tuning for LLMs using LoRA, QLoRA, and 25+ methods. Use when fine-tuning large models (7B-70B) with limited GPU memory, when you need to train &lt;1% of parameters with minimal accuracy loss, or for multi-adapter serving. HuggingFace's official library… | `mlops/training/peft` |
| `pytorch-fsdp` | Expert guidance for Fully Sharded Data Parallel training with PyTorch FSDP - parameter sharding, mixed precision, CPU offloading, FSDP2 | `mlops/training/pytorch-fsdp` |
| `unsloth` | Expert guidance for fast fine-tuning with Unsloth - 2-5x faster training, 50-80% less memory, LoRA/QLoRA optimization | `mlops/training/unsloth` |
## note-taking
@@ -212,6 +190,7 @@ Skills for document creation, presentations, spreadsheets, and other productivit
|-------|-------------|------|
| `google-workspace` | Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration for Hermes. Uses Hermes-managed OAuth2 setup, prefers the Google Workspace CLI (`gws`) when available for broader API coverage, and falls back to the Python client libraries otherwise. | `productivity/google-workspace` |
| `linear` | Manage Linear issues, projects, and teams via the GraphQL API. Create, update, search, and organize issues. Uses API key auth (no OAuth needed). All operations via curl — no dependencies. | `productivity/linear` |
| `maps` | Location intelligence — geocode, reverse-geocode, nearby POI search (44 categories, coordinates or address via `--near`), driving/walking/cycling distance + time, turn-by-turn directions, timezone, bounding box + area, POI search in a rectangle. Uses OpenStreetMap + Overpass + OSRM. No API key needed. Telegram location-pin friendly. | `productivity/maps` |
| `nano-pdf` | Edit PDFs with natural-language instructions using the nano-pdf CLI. Modify text, fix typos, update titles, and make content changes to specific pages without manual editing. | `productivity/nano-pdf` |
| `notion` | Notion API for creating and managing pages, databases, and blocks via curl. Search, create, update, and query Notion workspaces directly from the terminal. | `productivity/notion` |
| `ocr-and-documents` | Extract text from PDFs and scanned documents. Use web_extract for remote URLs, pymupdf for local text-based PDFs, marker-pdf for OCR/scanned docs. For DOCX use python-docx, for PPTX see the powerpoint skill. | `productivity/ocr-and-documents` |
+17 -1
View File
@@ -257,7 +257,7 @@ terminal:
docker_volumes:
- "/home/user/projects:/workspace/projects" # Read-write (default)
- "/home/user/datasets:/data:ro" # Read-only
- "/home/user/outputs:/outputs" # Agent writes, you read
- "/home/user/.hermes/cache/documents:/output" # Gateway-visible exports
```
This is useful for:
@@ -265,6 +265,22 @@ This is useful for:
- **Receiving files** from the agent (generated code, reports, exports)
- **Shared workspaces** where both you and the agent access the same files
If you use a messaging gateway and want the agent to send generated files via
`MEDIA:/...`, prefer a dedicated host-visible export mount such as
`/home/user/.hermes/cache/documents:/output`.
- Write files inside Docker to `/output/...`
- Emit the **host path** in `MEDIA:`, for example:
`MEDIA:/home/user/.hermes/cache/documents/report.txt`
- Do **not** emit `/workspace/...` or `/output/...` unless that exact path also
exists for the gateway process on the host
:::warning
YAML duplicate keys silently override earlier ones. If you already have a
`docker_volumes:` block, merge new mounts into the same list instead of adding
another `docker_volumes:` key later in the file.
:::
Can also be set via environment variable: `TERMINAL_DOCKER_VOLUMES='["/host:/container"]'` (JSON array).
### Docker Credential Forwarding
+6 -6
View File
@@ -30,7 +30,7 @@ Cron-run sessions cannot recursively create more cron jobs. Hermes disables cron
/cron add 30m "Remind me to check the build"
/cron add "every 2h" "Check server status"
/cron add "every 1h" "Summarize new feed items" --skill blogwatcher
/cron add "every 1h" "Use both skills and combine the result" --skill blogwatcher --skill find-nearby
/cron add "every 1h" "Use both skills and combine the result" --skill blogwatcher --skill maps
```
### From the standalone CLI
@@ -40,7 +40,7 @@ hermes cron create "every 2h" "Check server status"
hermes cron create "every 1h" "Summarize new feed items" --skill blogwatcher
hermes cron create "every 1h" "Use both skills and combine the result" \
--skill blogwatcher \
--skill find-nearby \
--skill maps \
--name "Skill combo"
```
@@ -77,7 +77,7 @@ Skills are loaded in order. The prompt becomes the task instruction layered on t
```python
cronjob(
action="create",
skills=["blogwatcher", "find-nearby"],
skills=["blogwatcher", "maps"],
prompt="Look for new local events and interesting nearby places, then combine them into one short brief.",
schedule="every 6h",
name="Local brief",
@@ -95,7 +95,7 @@ You do not need to delete and recreate jobs just to change them.
```bash
/cron edit <job_id> --schedule "every 4h"
/cron edit <job_id> --prompt "Use the revised task"
/cron edit <job_id> --skill blogwatcher --skill find-nearby
/cron edit <job_id> --skill blogwatcher --skill maps
/cron edit <job_id> --remove-skill blogwatcher
/cron edit <job_id> --clear-skills
```
@@ -105,8 +105,8 @@ You do not need to delete and recreate jobs just to change them.
```bash
hermes cron edit <job_id> --schedule "every 4h"
hermes cron edit <job_id> --prompt "Use the revised task"
hermes cron edit <job_id> --skill blogwatcher --skill find-nearby
hermes cron edit <job_id> --add-skill find-nearby
hermes cron edit <job_id> --skill blogwatcher --skill maps
hermes cron edit <job_id> --add-skill maps
hermes cron edit <job_id> --remove-skill blogwatcher
hermes cron edit <job_id> --clear-skills
```
@@ -112,6 +112,38 @@ hermes gateway
The bot should come online within seconds. Send it a message on Telegram to verify.
## Sending Generated Files from Docker-backed Terminals
If your terminal backend is `docker`, keep in mind that Telegram attachments are
sent by the **gateway process**, not from inside the container. That means the
final `MEDIA:/...` path must be readable on the host where the gateway is
running.
Common pitfall:
- the agent writes a file inside Docker to `/workspace/report.txt`
- the model emits `MEDIA:/workspace/report.txt`
- Telegram delivery fails because `/workspace/report.txt` only exists inside the
container, not on the host
Recommended pattern:
```yaml
terminal:
backend: docker
docker_volumes:
- "/home/user/.hermes/cache/documents:/output"
```
Then:
- write files inside Docker to `/output/...`
- emit the **host-visible** path in `MEDIA:`, for example:
`MEDIA:/home/user/.hermes/cache/documents/report.txt`
If you already have a `docker_volumes:` section, add the new mount to the same
list. YAML duplicate keys silently override earlier ones.
## Webhook Mode
By default, Hermes connects to Telegram using **long polling** — the gateway makes outbound requests to Telegram's servers to fetch new updates. This works well for local and always-on deployments.
@@ -72,6 +72,7 @@ Routes define how different webhook sources are handled. Each route is a named e
| `skills` | No | List of skill names to load for the agent run. |
| `deliver` | No | Where to send the response: `github_comment`, `telegram`, `discord`, `slack`, `signal`, `sms`, `whatsapp`, `matrix`, `mattermost`, `homeassistant`, `email`, `dingtalk`, `feishu`, `wecom`, `weixin`, `bluebubbles`, `qqbot`, or `log` (default). |
| `deliver_extra` | No | Additional delivery config — keys depend on `deliver` type (e.g. `repo`, `pr_number`, `chat_id`). Values support the same `{dot.notation}` templates as `prompt`. |
| `deliver_only` | No | If `true`, skip the agent entirely — the rendered `prompt` template becomes the literal message that gets delivered. Zero LLM cost, sub-second delivery. See [Direct Delivery Mode](#direct-delivery-mode) for use cases. Requires `deliver` to be a real target (not `log`). |
### Full example
@@ -240,6 +241,80 @@ For cross-platform delivery, the target platform must also be enabled and connec
---
## Direct Delivery Mode {#direct-delivery-mode}
By default, every webhook POST triggers an agent run — the payload becomes a prompt, the agent processes it, and the agent's response is delivered. This costs LLM tokens on every event.
For use cases where you just want to **push a plain notification** — no reasoning, no agent loop, just deliver the message — set `deliver_only: true` on the route. The rendered `prompt` template becomes the literal message body, and the adapter dispatches it directly to the configured delivery target.
### When to use direct delivery
- **External service push** — Supabase/Firebase webhook fires on a database change → notify a user in Telegram instantly
- **Monitoring alerts** — Datadog/Grafana alert webhook → push to a Discord channel
- **Inter-agent pings** — Agent A notifies Agent B's user that a long-running task finished
- **Background job completion** — Cron job finishes → post result to Slack
Benefits:
- **Zero LLM tokens** — the agent is never invoked
- **Sub-second delivery** — a single adapter call, no reasoning loop
- **Same security as agent mode** — HMAC auth, rate limits, idempotency, and body-size limits all still apply
- **Synchronous response** — the POST returns `200 OK` once delivery succeeds, or `502` if the target rejects it, so your upstream service can retry intelligently
### Example: Telegram push from Supabase
```yaml
platforms:
webhook:
enabled: true
extra:
port: 8644
secret: "global-secret"
routes:
antenna-matches:
secret: "antenna-webhook-secret"
deliver: "telegram"
deliver_only: true
prompt: "🎉 New match: {match.user_name} matched with you!"
deliver_extra:
chat_id: "{match.telegram_chat_id}"
```
Your Supabase edge function signs the payload with HMAC-SHA256 and POSTs to `https://your-server:8644/webhooks/antenna-matches`. The webhook adapter validates the signature, renders the template from the payload, delivers to Telegram, and returns `200 OK`.
### Example: Dynamic subscription via CLI
```bash
hermes webhook subscribe antenna-matches \
--deliver telegram \
--deliver-chat-id "123456789" \
--deliver-only \
--prompt "🎉 New match: {match.user_name} matched with you!" \
--description "Antenna match notifications"
```
### Response codes
| Status | Meaning |
|--------|---------|
| `200 OK` | Delivered successfully. Body: `{"status": "delivered", "route": "...", "target": "...", "delivery_id": "..."}` |
| `200 OK` (status=duplicate) | Duplicate `X-GitHub-Delivery` ID within the idempotency TTL (1 hour). Not re-delivered. |
| `401 Unauthorized` | HMAC signature invalid or missing. |
| `400 Bad Request` | Malformed JSON body. |
| `404 Not Found` | Unknown route name. |
| `413 Payload Too Large` | Body exceeded `max_body_bytes`. |
| `429 Too Many Requests` | Route rate limit exceeded. |
| `502 Bad Gateway` | Target adapter rejected the message or raised. The error is logged server-side; the response body is a generic `Delivery failed` to avoid leaking adapter internals. |
### Configuration gotchas
- `deliver_only: true` requires `deliver` to be a real target. `deliver: log` (or omitting `deliver`) is rejected at startup — the adapter refuses to start if it finds a misconfigured route.
- The `skills` field is ignored in direct delivery mode (no agent runs, so there's nothing to inject skills into).
- Template rendering uses the same `{dot.notation}` syntax as agent mode, including the `{__raw__}` token.
- Idempotency uses the same `X-GitHub-Delivery` / `X-Request-ID` header — retries with the same ID return `status=duplicate` and do NOT re-deliver.
---
## Dynamic Subscriptions (CLI) {#dynamic-subscriptions}
In addition to static routes in `config.yaml`, you can create webhook subscriptions dynamically using the `hermes webhook` CLI command. This is especially useful when the agent itself needs to set up event-driven triggers.
+38 -4
View File
@@ -4,11 +4,11 @@ sidebar_position: 2
# Profiles: Running Multiple Agents
Run multiple independent Hermes agents on the same machine — each with its own config, API keys, memory, sessions, skills, and gateway.
Run multiple independent Hermes agents on the same machine — each with its own config, API keys, memory, sessions, skills, and gateway state.
## What are profiles?
A profile is a fully isolated Hermes environment. Each profile gets its own directory containing its own `config.yaml`, `.env`, `SOUL.md`, memories, sessions, skills, cron jobs, and state database. Profiles let you run separate agents for different purposes — a coding assistant, a personal bot, a research agent — without any cross-contamination.
A profile is a separate Hermes home directory. Each profile gets its own directory containing its own `config.yaml`, `.env`, `SOUL.md`, memories, sessions, skills, cron jobs, and state database. Profiles let you run separate agents for different purposes — a coding assistant, a personal bot, a research agent — without mixing up Hermes state.
When you create a profile, it automatically becomes its own command. Create a profile called `coder` and you immediately have `coder chat`, `coder setup`, `coder gateway start`, etc.
@@ -20,7 +20,7 @@ coder setup # configure API keys and model
coder chat # start chatting
```
That's it. `coder` is now a fully independent agent. It has its own config, its own memory, its own everything.
That's it. `coder` is now its own Hermes profile with its own config, memory, and state.
## Creating a profile
@@ -104,6 +104,32 @@ The CLI always shows which profile is active:
- **Banner**: Shows `Profile: coder` on startup
- **`hermes profile`**: Shows current profile name, path, model, gateway status
## Profiles vs workspaces vs sandboxing
Profiles are often confused with workspaces or sandboxes, but they are different things:
- A **profile** gives Hermes its own state directory: `config.yaml`, `.env`, `SOUL.md`, sessions, memory, logs, cron jobs, and gateway state.
- A **workspace** or **working directory** is where terminal commands start. That is controlled separately by `terminal.cwd`.
- A **sandbox** is what limits filesystem access. Profiles do **not** sandbox the agent.
On the default `local` terminal backend, the agent still has the same filesystem access as your user account. A profile does not stop it from accessing folders outside the profile directory.
If you want a profile to start in a specific project folder, set an explicit absolute `terminal.cwd` in that profile's `config.yaml`:
```yaml
terminal:
backend: local
cwd: /absolute/path/to/project
```
Using `cwd: "."` on the local backend means "the directory Hermes was launched from", not "the profile directory".
Also note:
- `SOUL.md` can guide the model, but it does not enforce a workspace boundary.
- Changes to `SOUL.md` take effect cleanly on a new session. Existing sessions may still be using the old prompt state.
- Asking the model "what directory are you in?" is not a reliable isolation test. If you need a predictable starting directory for tools, set `terminal.cwd` explicitly.
## Running gateways
Each profile runs its own gateway as a separate process with its own bot token:
@@ -151,6 +177,12 @@ coder config set model.model anthropic/claude-sonnet-4
echo "You are a focused coding assistant." > ~/.hermes/profiles/coder/SOUL.md
```
If you want this profile to work in a specific project by default, also set its own `terminal.cwd`:
```bash
coder config set terminal.cwd /absolute/path/to/project
```
## Updating
`hermes update` pulls code once (shared) and syncs new bundled skills to **all** profiles automatically:
@@ -201,6 +233,8 @@ Add the line to your `~/.bashrc` or `~/.zshrc` for persistent completion. Comple
## How it works
Profiles use the `HERMES_HOME` environment variable. When you run `coder chat`, the wrapper script sets `HERMES_HOME=~/.hermes/profiles/coder` before launching hermes. Since 119+ files in the codebase resolve paths via `get_hermes_home()`, everything automatically scopes to the profile's directory — config, sessions, memory, skills, state database, gateway PID, logs, and cron jobs.
Profiles use the `HERMES_HOME` environment variable. When you run `coder chat`, the wrapper script sets `HERMES_HOME=~/.hermes/profiles/coder` before launching hermes. Since 119+ files in the codebase resolve paths via `get_hermes_home()`, Hermes state automatically scopes to the profile's directory — config, sessions, memory, skills, state database, gateway PID, logs, and cron jobs.
This is separate from terminal working directory. Tool execution starts from `terminal.cwd` (or the launch directory when `cwd: "."` on the local backend), not automatically from `HERMES_HOME`.
The default profile is simply `~/.hermes` itself. No migration needed — existing installs work identically.
+2
View File
@@ -162,6 +162,8 @@ const sidebars: SidebarsConfig = {
'guides/cron-troubleshooting',
'guides/work-with-skills',
'guides/delegation-patterns',
'guides/github-pr-review-agent',
'guides/webhook-github-pr-review',
'guides/migrate-from-openclaw',
'guides/aws-bedrock',
],