Compare commits

..

6 Commits

Author SHA1 Message Date
alt-glitch 2b47b40c10 docs(lsp): add feature page — setup, CLI, supported languages, troubleshooting
Covers: enable flow, server installation (detect-only default vs
hermes lsp install), how diagnostics reach the model, config knobs,
all 26 supported languages, and troubleshooting common issues.
2026-05-12 15:23:47 +00:00
alt-glitch b1a609fba3 chore: remove plan from PR (working document, not shipped) 2026-05-12 15:18:20 +00:00
alt-glitch 6d80aa80eb refactor(lsp): simplify __init__.py per /simplify review
- Remove dead _post_tool_call (body was only comments)
- Remove _on_session_start (redundant — _ensure_service lazy-inits)
- Remove _atexit_cleanup (duplicate of _on_session_end)
- Switch _baselines from dict to set (presence sentinel only)
- Remove redundant enabled_for recheck in transform_tool_result
- Remove V4A guard (path-empty check already covers it)
- Use modern type syntax (X | None, dict[], set[])
- Reduce from 322 → 217 lines, same behavior

77/77 tests pass.
2026-05-12 15:01:44 +00:00
alt-glitch e0a1778028 fix(lsp): address review findings — TOCTOU, None guard, JSON safety
Fixes from Claude Code adversarial review:
- Snapshot _service to local var before .is_active() (TOCTOU fix)
- Guard session_id against None with 'or ""'
- Remove text-append fallback — only inject when result is dict JSON
- Add ValueError to json.dumps except clause
- Guard result=None with 'or ""' and isinstance check

Non-dict JSON results and non-JSON results are now left unmodified
(return None = no injection) rather than risking format corruption.
2026-05-12 13:13:53 +00:00
alt-glitch 40a9327248 fix(lsp): wire CLI subcommands via setup_lsp_parser for plugin registration
register_cli_command's setup_fn receives an already-created parser,
not the parent's SubParsersAction. Added setup_lsp_parser() that adds
subcommands (status, list, install, restart, which) to the provided
parser.

Verified: 'hermes lsp status' works from cold shell when plugin is
enabled in plugins.enabled config.
2026-05-12 13:08:49 +00:00
alt-glitch 23344a9a3c feat(lsp): plugin-based LSP diagnostics with zero core changes
Ship LSP semantic diagnostics as a bundled plugin (plugins/lsp/) using
existing hook system.  Zero lines of core code modified.

Plugin wiring:
- pre_tool_call: capture LSP baseline before write_file/patch
- transform_tool_result: inject diagnostics into tool result JSON
- on_session_start/on_session_end + atexit: lifecycle management

Key design:
- Baselines keyed by (session_id, abs_path) for concurrent safety
- Diagnostics added as 'lsp_diagnostics' JSON field (preserves shape)
- Per-file workspace detection (no static session-start gate)
- V4A multi-file patch skipped for MVP
- Short timeout (3s) — cold start degrades gracefully
- os.path.exists heuristic for Docker/SSH backend skip
- First relevant write with no server → INFO log with install hint

Tests: 77/77 pass including:
- Protocol framing, reporter formatting, workspace resolution
- Client E2E against mock LSP server (live_system_guard_bypass)
- Eventlog steady-state silence contract
- Backend-gate heuristic (local vs non-local paths)
- Full hook flow integration (pre→write→transform with diagnostics)

Source: PR #24168 by @teknium1, PR #24155 by @OutThisLife
Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
2026-05-12 13:01:13 +00:00
125 changed files with 1472 additions and 6185 deletions
-14
View File
@@ -273,20 +273,6 @@ BROWSER_SESSION_TIMEOUT=300
# Browser sessions are automatically closed after this period of no activity
BROWSER_INACTIVITY_TIMEOUT=120
# Camofox local anti-detection browser (Camoufox-based Firefox).
# Set CAMOFOX_URL to route the browser tools through a local Camofox server
# instead of agent-browser/Browserbase. See docs/user-guide/features/browser.md.
# CAMOFOX_URL=http://localhost:9377
# Externally managed Camofox sessions — when another app owns the visible
# Camofox browser, set these so Hermes shares the same userId/profile instead
# of creating its own isolated session.
# CAMOFOX_USER_ID=
# CAMOFOX_SESSION_KEY=
# Set to true to reuse an already-open Camofox tab for this identity before
# creating a new one (useful for gateway restarts).
# CAMOFOX_ADOPT_EXISTING_TAB=false
# =============================================================================
# SESSION LOGGING
# =============================================================================
+1 -4
View File
@@ -55,14 +55,11 @@ jobs:
e2e:
runs-on: ubuntu-latest
timeout-minutes: 15
timeout-minutes: 10
steps:
- name: Checkout code
uses: actions/checkout@34e114876b0b11c390a56381ad16ebd13914f8d5 # v4
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install -y ripgrep
- name: Install uv
uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5
+1 -1
View File
@@ -3828,7 +3828,7 @@ def _resolve_task_provider_model(
# (e.g. OPENROUTER_API_KEY) instead of locking into "custom".
return cfg_provider, resolved_model, cfg_base_url, None, resolved_api_mode
if cfg_provider and cfg_provider != "auto":
return cfg_provider, resolved_model, cfg_base_url, cfg_api_key, resolved_api_mode
return cfg_provider, resolved_model, None, None, resolved_api_mode
return "auto", resolved_model, None, None, resolved_api_mode
-106
View File
@@ -1,106 +0,0 @@
"""Language Server Protocol (LSP) integration for Hermes Agent.
Hermes runs full language servers (pyright, gopls, rust-analyzer,
typescript-language-server, etc.) as subprocesses and pipes their
``textDocument/publishDiagnostics`` output into the post-write lint
delta filter used by ``write_file`` and ``patch``.
LSP is **gated on git workspace detection** — if the agent's cwd is
inside a git repository, LSP runs against that workspace; otherwise the
file_operations layer falls back to its existing in-process syntax
checks. This keeps users on user-home cwd's (e.g. Telegram gateway
chats) from spawning daemons they don't need.
Public API:
from agent.lsp import get_service
svc = get_service()
if svc and svc.enabled_for(path):
await svc.touch_file(path)
diags = svc.diagnostics_for(path)
The bulk of the wiring is internal — most callers only need the layer
in :func:`tools.file_operations.FileOperations._check_lint_delta`,
which is already wired (see that module).
Architecture is documented in ``website/docs/user-guide/features/lsp.md``.
"""
from __future__ import annotations
import atexit
import logging
import threading
from typing import Optional
from agent.lsp.manager import LSPService
logger = logging.getLogger("agent.lsp")
_service: Optional[LSPService] = None
_atexit_registered = False
_service_lock = threading.Lock()
def get_service() -> Optional[LSPService]:
"""Return the process-wide LSP service singleton, or None when disabled.
The service is created lazily on first call. ``None`` is returned
when LSP is disabled in config, when no workspace can be detected,
or when the platform doesn't support subprocess-based LSP servers.
On first creation, registers an :mod:`atexit` handler that tears
down spawned language servers on Python exit so a long-running
CLI or gateway session doesn't leak pyright/gopls/etc. processes
when it terminates.
"""
global _service, _atexit_registered
if _service is not None:
return _service if _service.is_active() else None
with _service_lock:
if _service is not None:
return _service if _service.is_active() else None
_service = LSPService.create_from_config()
if not _atexit_registered:
# ``atexit`` handlers run in LIFO order on normal Python
# exit and on SystemExit, but NOT on os._exit() or
# uncaught signals. Language servers are stateless
# subprocesses — losing them on SIGKILL is fine; they'll
# be reaped by the kernel along with their parent. We
# care about clean exits where Python flushes stdio
# before terminating; without this hook every
# ``hermes chat`` exit would leak pyright processes that
# outlive the parent for a few seconds while their
# stdout buffers drain.
atexit.register(_atexit_shutdown)
_atexit_registered = True
return _service if (_service is not None and _service.is_active()) else None
def shutdown_service() -> None:
"""Tear down the LSP service if one was started.
Safe to call multiple times; safe to call when no service was created.
"""
global _service
with _service_lock:
svc = _service
_service = None
if svc is not None:
try:
svc.shutdown()
except Exception as e: # noqa: BLE001
logger.debug("LSP shutdown error: %s", e)
def _atexit_shutdown() -> None:
"""atexit-registered wrapper. Logs at debug because by the time
atexit fires the user has already seen the agent's final output —
a noisy shutdown line on top of that is just clutter."""
try:
shutdown_service()
except Exception as e: # noqa: BLE001
logger.debug("atexit LSP shutdown failed: %s", e)
__all__ = ["get_service", "shutdown_service", "LSPService"]
+22 -72
View File
@@ -10,7 +10,7 @@ import os
import re
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse
import requests
@@ -1330,40 +1330,21 @@ def _resolve_codex_oauth_context_length(
return None
def _resolve_nous_context_length(
model: str,
base_url: str = "",
api_key: str = "",
) -> Tuple[Optional[int], str]:
"""Resolve Nous Portal model context length.
def _resolve_nous_context_length(model: str) -> Optional[int]:
"""Resolve Nous Portal model context length via OpenRouter metadata.
Tries the live Nous inference endpoint first (authoritative), then falls
back to OpenRouter metadata with suffix/version matching.
Nous model IDs are bare after prefix-stripping (e.g. 'qwen3.6-plus',
'claude-opus-4-6') while OpenRouter uses prefixed IDs (e.g.
'qwen/qwen3.6-plus', 'anthropic/claude-opus-4.6'). Version
normalization (dot↔dash) is applied to handle name drifts.
Returns ``(context_length, source)`` where ``source`` is one of:
- ``"portal"`` — live /v1/models response (authoritative)
- ``"openrouter"`` — OpenRouter cache fallback (non-authoritative;
callers must NOT persist this to the on-disk cache or a single
portal blip will freeze the wrong value in forever)
- ``""`` — could not resolve
Nous model IDs are bare (e.g. 'claude-opus-4-6') while OpenRouter uses
prefixed IDs (e.g. 'anthropic/claude-opus-4.6'). Try suffix matching
with version normalization (dot↔dash).
"""
# Portal first — the Nous /models endpoint is authoritative for what our
# infrastructure enforces and may differ from OR (e.g. OR reports 1M for
# qwen3.6-plus; the portal correctly says 262144). Fall back to the OR
# catalog only if the portal doesn't list the model.
if base_url:
portal_ctx = _resolve_endpoint_context_length(model, base_url, api_key=api_key)
if portal_ctx is not None:
return portal_ctx, "portal"
metadata = fetch_model_metadata()
metadata = fetch_model_metadata() # OpenRouter cache
def _safe_ctx(or_id: str, entry: dict) -> Optional[int]:
"""Return context length, but reject stale 32k values for Kimi models.
Apply the same guard used for the generic OpenRouter path (step 6 in
resolve_context_length) so the Nous portal path does not short-circuit it.
"""
ctx = entry.get("context_length")
if ctx is None:
return None
@@ -1376,20 +1357,19 @@ def _resolve_nous_context_length(
return None
return ctx
# Exact match first
if model in metadata:
ctx = _safe_ctx(model, metadata[model])
if ctx is not None:
return ctx, "openrouter"
return _safe_ctx(model, metadata[model])
normalized = _normalize_model_version(model).lower()
for or_id, entry in metadata.items():
bare = or_id.split("/", 1)[1] if "/" in or_id else or_id
if bare.lower() == model.lower() or _normalize_model_version(bare).lower() == normalized:
ctx = _safe_ctx(or_id, entry)
if ctx is not None:
return ctx, "openrouter"
return _safe_ctx(or_id, entry)
# Partial prefix match for cases like gemini-3-flash → gemini-3-flash-preview
# Require match to be at a word boundary (followed by -, :, or end of string)
model_lower = model.lower()
for or_id, entry in metadata.items():
bare = or_id.split("/", 1)[1] if "/" in or_id else or_id
@@ -1397,11 +1377,9 @@ def _resolve_nous_context_length(
if candidate.startswith(query) and (
len(candidate) == len(query) or candidate[len(query)] in "-:."
):
ctx = _safe_ctx(or_id, entry)
if ctx is not None:
return ctx, "openrouter"
return _safe_ctx(or_id, entry)
return None, ""
return None
def get_model_context_length(
@@ -1416,18 +1394,14 @@ def get_model_context_length(
Resolution order:
0. Explicit config override (model.context_length or custom_providers per-model)
1. Persistent cache (previously discovered via probing). Nous URLs
bypass the cache here so step 5b can always reconcile against
the authoritative portal /v1/models response.
1. Persistent cache (previously discovered via probing)
1b. AWS Bedrock static table (must precede custom-endpoint probe)
2. Active endpoint metadata (/models for explicit custom endpoints)
3. Local server query (for local endpoints)
4. Anthropic /v1/models API (API-key users only, not OAuth)
5. Provider-aware lookups (before generic OpenRouter cache):
a. Copilot live /models API
b. Nous: live /v1/models probe first (authoritative), then OR
cache fallback with suffix/version normalisation. Only
portal-derived values are persisted to disk.
b. Nous suffix-match via OpenRouter cache
c. Codex OAuth /models probe
d. GMI /models endpoint
e. Ollama native /api/show probe (any base_url, provider-agnostic)
@@ -1490,20 +1464,6 @@ def get_model_context_length(
model, base_url, f"{cached:,}",
)
_invalidate_cached_context_length(model, base_url)
# Nous Portal: the portal /v1/models endpoint is authoritative.
# Bypass the persistent cache so step 5b can always reconcile
# against it — this corrects pre-fix entries seeded from the
# OR catalog (the same OR underreport class that the Kimi/Qwen
# DEFAULT_CONTEXT_LENGTHS overrides exist to mitigate) without
# touching the on-disk file when the portal is unreachable.
# The in-memory 300s endpoint metadata cache makes the per-call
# cost amortise to ~0 within a process.
elif _infer_provider_from_url(base_url) == "nous":
logger.debug(
"Bypassing persistent cache for %s@%s (Nous portal authoritative)",
model, base_url,
)
# Fall through; step 5b reconciles and overwrites if portal responds.
else:
return cached
@@ -1595,18 +1555,8 @@ def get_model_context_length(
pass # Fall through to models.dev
if effective_provider == "nous":
ctx, source = _resolve_nous_context_length(
model, base_url=base_url or "", api_key=api_key or ""
)
ctx = _resolve_nous_context_length(model)
if ctx:
# Persist ONLY portal-derived values. Caching an OR-fallback
# value here would freeze in a wrong number on the first portal
# blip / auth glitch and step-1 would short-circuit it forever.
# OR's catalog is community-maintained and is precisely why the
# Kimi/Qwen DEFAULT_CONTEXT_LENGTHS overrides exist — we don't
# want it leaking into the persistent cache for Nous URLs.
if base_url and source == "portal":
save_context_length(model, base_url, ctx)
return ctx
if effective_provider == "openai-codex":
# Codex OAuth enforces lower context limits than the direct OpenAI
+1 -1
View File
@@ -268,7 +268,7 @@ TOOL_USE_ENFORCEMENT_GUIDANCE = (
# Model name substrings that trigger tool-use enforcement guidance.
# Add new patterns here when a model family needs explicit steering.
TOOL_USE_ENFORCEMENT_MODELS = ("gpt", "codex", "gemini", "gemma", "grok", "glm")
TOOL_USE_ENFORCEMENT_MODELS = ("gpt", "codex", "gemini", "gemma", "grok")
# OpenAI GPT/Codex-specific execution guidance. Addresses known failure modes
# where GPT models abandon work on partial results, skip prerequisite lookups,
-11
View File
@@ -370,17 +370,6 @@ _OFFICIAL_DOCS_PRICING: Dict[tuple[str, str], PricingEntry] = {
source_url="https://api-docs.deepseek.com/quick_start/pricing",
pricing_version="deepseek-pricing-2026-03-16",
),
(
"deepseek",
"deepseek-v4-pro",
): PricingEntry(
input_cost_per_million=Decimal("1.74"),
output_cost_per_million=Decimal("3.48"),
cache_read_cost_per_million=Decimal("0.0145"),
source="official_docs_snapshot",
source_url="https://api-docs.deepseek.com/quick_start/pricing",
pricing_version="deepseek-pricing-2026-05-12",
),
# Google Gemini
(
"google",
+2 -5
View File
@@ -3669,7 +3669,7 @@ class HermesCLI:
if self.show_timestamps:
label = f"{label} {datetime.now().strftime('%H:%M')}"
w = shutil.get_terminal_size().columns
fill = w - 2 - HermesCLI._status_bar_display_width(label)
fill = w - 2 - len(label)
_cprint(f"\n{_ACCENT}╭─{label}{'' * max(fill - 1, 0)}{_RST}")
self._stream_buf += text
@@ -8805,9 +8805,6 @@ class HermesCLI:
elif parts[i] == "--source" and i + 1 < len(parts):
source = parts[i + 1]
i += 2
elif parts[i].isdigit():
days = int(parts[i])
i += 1
else:
i += 1
@@ -10393,7 +10390,7 @@ class HermesCLI:
label = " ⚕ Hermes "
if self.show_timestamps:
label = f"{label}{datetime.now().strftime('%H:%M')} "
fill = w - 2 - HermesCLI._status_bar_display_width(label)
fill = w - 2 - len(label)
_cprint(f"\n{_ACCENT}╭─{label}{'' * max(fill - 1, 0)}{_RST}")
_cprint(f"{_STREAM_PAD}{sentence.rstrip()}")
-1
View File
@@ -111,7 +111,6 @@ _HOME_TARGET_ENV_VARS = {
"weixin": "WEIXIN_HOME_CHANNEL",
"bluebubbles": "BLUEBUBBLES_HOME_CHANNEL",
"qqbot": "QQBOT_HOME_CHANNEL",
"whatsapp": "WHATSAPP_HOME_CHANNEL",
}
# Legacy env var names kept for back-compat. Each entry is the current
+1 -1
View File
@@ -2,7 +2,7 @@
Hermes Gateway - Multi-platform messaging integration.
This module provides a unified gateway for connecting the Hermes agent
to various messaging platforms (Telegram, Discord, WhatsApp, Weixin, and more) with:
to various messaging platforms (Telegram, Discord, WhatsApp) with:
- Session management (persistent conversations with reset policies)
- Dynamic context injection (agent knows where messages come from)
- Delivery routing (cron job outputs to appropriate channels)
+1 -1
View File
@@ -2,7 +2,7 @@
Gateway configuration management.
Handles loading and validating configuration for:
- Connected platforms (Telegram, Discord, WhatsApp, Weixin, and more)
- Connected platforms (Telegram, Discord, WhatsApp)
- Home channels for each platform
- Session reset policies
- Delivery preferences
-6
View File
@@ -1168,9 +1168,6 @@ class APIServerAdapter(BasePlatformAdapter):
agent_ref=agent_ref,
gateway_session_key=gateway_session_key,
))
# Ensure SSE drain loops can terminate without relying on polling
# agent_task.done(), which can race with queue timeout checks.
agent_task.add_done_callback(lambda _fut: _stream_q.put(None))
return await self._write_sse_chat_completion(
request, completion_id, model_name, created, _stream_q,
@@ -2200,9 +2197,6 @@ class APIServerAdapter(BasePlatformAdapter):
agent_ref=agent_ref,
gateway_session_key=gateway_session_key,
))
# Ensure SSE drain loops can terminate without relying on polling
# agent_task.done(), which can race with queue timeout checks.
agent_task.add_done_callback(lambda _fut: _stream_q.put(None))
response_id = f"resp_{uuid.uuid4().hex[:28]}"
model_name = body.get("model", self._model_name)
+1 -102
View File
@@ -1,7 +1,7 @@
"""
Base platform adapter interface.
All platform adapters (Telegram, Discord, WhatsApp, Weixin, and more) inherit from this
All platform adapters (Telegram, Discord, WhatsApp) inherit from this
and implement the required methods.
"""
@@ -1743,55 +1743,6 @@ class BasePlatformAdapter(ABC):
"""
return SendResult(success=False, error="Not supported")
async def send_clarify(
self,
chat_id: str,
question: str,
choices: Optional[list],
clarify_id: str,
session_key: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a clarify prompt to the user.
Two render modes:
* **Multiple choice** (``choices`` is a non-empty list) adapters
that override this should render inline buttons (one per choice
plus a final "Other" / free-text option). Button callbacks
MUST resolve via
``tools.clarify_gateway.resolve_gateway_clarify(clarify_id, response)``
with the chosen string. Picking the "Other" button calls
``mark_awaiting_text(clarify_id)`` so the next message in the
session is captured as the response.
* **Open-ended** (``choices`` is None or empty) render the
question as a plain text message; the next user message in the
session is captured by the gateway's text-intercept and
resolves the clarify automatically (see
``GatewayRunner._maybe_intercept_clarify_text``).
The default implementation falls back to a numbered text list,
which works on every platform the user replies with a number
("2") or with the literal choice text, and the gateway intercepts
and resolves. Adapters with native button UIs (Telegram, Discord)
SHOULD override this for a richer UX.
"""
if choices:
lines = [f"{question}", ""]
for i, choice in enumerate(choices, start=1):
lines.append(f" {i}. {choice}")
lines.append("")
lines.append("Reply with the number, the option text, or your own answer.")
text = "\n".join(lines)
else:
text = f"{question}"
return await self.send(
chat_id=chat_id,
content=text,
metadata=metadata,
)
async def send_private_notice(
self,
chat_id: str,
@@ -2880,58 +2831,6 @@ class BasePlatformAdapter(ABC):
logger.error("[%s] Command '/%s' dispatch failed: %s", self.name, cmd, e, exc_info=True)
return
# Clarify text-capture bypass: if the agent is blocked on a
# clarify_tool call awaiting a free-form text response (open-
# ended clarify, or user picked "Other"), the next non-command
# message in this session MUST reach the runner so the
# clarify-intercept can resolve it and unblock the agent.
#
# Without this bypass: the message gets queued in
# _pending_messages AND triggers an interrupt, killing the
# agent run mid-clarify and discarding the user's answer.
# Same shape as the /approve deadlock fix (PR #4926) — both
# cases are "agent thread blocked on Event.wait, message must
# reach the resolver before being treated as a new turn."
if not cmd:
try:
from tools import clarify_gateway as _clarify_mod
_has_text_clarify = (
_clarify_mod.get_pending_for_session(session_key) is not None
)
except Exception:
_has_text_clarify = False
if _has_text_clarify:
logger.debug(
"[%s] Routing message to clarify text-intercept for %s",
self.name, session_key,
)
try:
_thread_meta = _thread_metadata_for_source(
event.source, _reply_anchor_for_event(event)
)
response = await self._message_handler(event)
_text, _eph_ttl = self._unwrap_ephemeral(response)
if _text:
_r = await self._send_with_retry(
chat_id=event.source.chat_id,
content=_text,
reply_to=_reply_anchor_for_event(event),
metadata=_thread_meta,
)
if _eph_ttl > 0 and _r.success and _r.message_id:
self._schedule_ephemeral_delete(
chat_id=event.source.chat_id,
message_id=_r.message_id,
ttl_seconds=_eph_ttl,
)
except Exception as e:
logger.error(
"[%s] Clarify text-intercept dispatch failed: %s",
self.name, e, exc_info=True,
)
return
if self._busy_session_handler is not None:
try:
if await self._busy_session_handler(event, session_key):
+1 -3
View File
@@ -446,9 +446,7 @@ class SignalAdapter(BasePlatformAdapter):
if sent_msg and isinstance(sent_msg, dict):
dest = sent_msg.get("destinationNumber") or sent_msg.get("destination")
sent_ts = sent_msg.get("timestamp")
sent_msg_group_info = sent_msg.get("groupInfo") or {}
sent_msg_group_id = sent_msg_group_info.get("groupId") if sent_msg_group_info else None
if dest == self._account_normalized or sent_msg_group_id:
if dest == self._account_normalized:
# Check if this is an echo of our own outbound reply
if sent_ts and sent_ts in self._recent_sent_timestamps:
self._recent_sent_timestamps.discard(sent_ts)
+2 -216
View File
@@ -427,9 +427,6 @@ class TelegramAdapter(BasePlatformAdapter):
# Slash-confirm button state: confirm_id → session_key (for /reload-mcp
# and any other slash-confirm prompts; see GatewayRunner._request_slash_confirm).
self._slash_confirm_state: Dict[str, str] = {}
# Clarify button state: clarify_id → session_key (for the clarify tool's
# multiple-choice prompts; see GatewayRunner clarify_callback wiring).
self._clarify_state: Dict[str, str] = {}
# Notification mode for message sends.
# "important" — only final responses, approvals, and slash confirmations
# trigger notifications; tool progress, streaming, status
@@ -2218,80 +2215,6 @@ class TelegramAdapter(BasePlatformAdapter):
logger.warning("[%s] send_slash_confirm failed: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def send_clarify(
self,
chat_id: str,
question: str,
choices: Optional[list],
clarify_id: str,
session_key: str,
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Render a clarify prompt with one inline button per choice.
Multi-choice mode (``choices`` non-empty): renders one button per
option plus a final "✏️ Other (type answer)" button. Picking the
"Other" button flips the entry into text-capture mode so the next
message becomes the response.
Open-ended mode (``choices`` empty): renders the question as plain
text no buttons. The next message in the session is captured by
the gateway's text-intercept and resolves the clarify.
"""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
text = f"{_html.escape(question)}"
thread_id = self._metadata_thread_id(metadata)
kwargs: Dict[str, Any] = {
"chat_id": int(chat_id),
"text": text,
"parse_mode": ParseMode.HTML,
**self._link_preview_kwargs(),
}
if choices:
# Telegram caps callback_data at 64 bytes; keep "cl:<id>:<idx>"
# short. Button label is also capped (~64 chars in practice).
rows = []
for idx, choice in enumerate(choices):
label = str(choice)
if len(label) > 60:
label = label[:57] + "..."
rows.append([
InlineKeyboardButton(
f"{idx + 1}. {label}",
callback_data=f"cl:{clarify_id}:{idx}",
)
])
rows.append([
InlineKeyboardButton(
"✏️ Other (type answer)",
callback_data=f"cl:{clarify_id}:other",
)
])
kwargs["reply_markup"] = InlineKeyboardMarkup(rows)
reply_to_id = self._reply_to_message_id_for_send(None, metadata)
kwargs["reply_to_message_id"] = reply_to_id
kwargs.update(
self._thread_kwargs_for_send(
chat_id,
thread_id,
metadata,
reply_to_message_id=reply_to_id,
)
)
msg = await self._send_message_with_thread_fallback(**kwargs)
self._clarify_state[clarify_id] = session_key
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
logger.warning("[%s] send_clarify failed: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def send_model_picker(
self,
chat_id: str,
@@ -2772,116 +2695,11 @@ class TelegramAdapter(BasePlatformAdapter):
{"thread_id": str(thread_id)},
)
)
await self._send_message_with_thread_fallback(**send_kwargs)
await self._bot.send_message(**send_kwargs)
except Exception as exc:
logger.error("[%s] slash-confirm callback failed: %s", self.name, exc, exc_info=True)
return
# --- Clarify callbacks (cl:clarify_id:idx | cl:clarify_id:other) ---
if data.startswith("cl:"):
parts = data.split(":", 2)
if len(parts) == 3:
clarify_id = parts[1]
choice_token = parts[2]
caller_id = str(getattr(query.from_user, "id", ""))
if not self._is_callback_user_authorized(
caller_id,
chat_id=query_chat_id,
chat_type=str(query_chat_type) if query_chat_type is not None else None,
thread_id=str(query_thread_id) if query_thread_id is not None else None,
user_name=query_user_name,
):
await query.answer(text="⛔ You are not authorized to answer this prompt.")
return
session_key = self._clarify_state.get(clarify_id)
if not session_key:
await query.answer(text="This prompt has already been resolved.")
return
user_display = getattr(query.from_user, "first_name", "User")
if choice_token == "other":
# Flip into text-capture mode and tell the user to type
# their answer. The gateway's text-intercept will pick
# up the next message in this session and resolve the
# clarify. Do NOT pop _clarify_state yet — we still
# need it if the user is slow to respond and the entry
# is cleared by something else.
try:
from tools.clarify_gateway import mark_awaiting_text
mark_awaiting_text(clarify_id)
except Exception as exc:
logger.warning("[%s] mark_awaiting_text failed: %s", self.name, exc)
await query.answer(text="✏️ Type your answer in the chat.")
try:
await query.edit_message_text(
text=f"{query.message.text or ''}\n\n<i>Awaiting typed response from {_html.escape(user_display)}…</i>",
parse_mode=ParseMode.HTML,
reply_markup=None,
)
except Exception:
pass
return
# Numeric choice → resolve immediately with the chosen text
try:
idx = int(choice_token)
except (ValueError, TypeError):
await query.answer(text="Invalid choice.")
return
# Look up the choice text from the entry registered in the
# clarify primitive. Fall back to the index if the entry
# has been cleaned up (race with timeout / session reset).
resolved_text: Optional[str] = None
try:
from tools.clarify_gateway import _entries as _clarify_entries # type: ignore
entry = _clarify_entries.get(clarify_id)
if entry and entry.choices and 0 <= idx < len(entry.choices):
resolved_text = entry.choices[idx]
except Exception:
resolved_text = None
if resolved_text is None:
# Race: entry vanished. Echo the index as a number so
# the agent at least sees an intentional response
# rather than nothing.
resolved_text = f"choice {idx + 1}"
# Pop state and resolve
self._clarify_state.pop(clarify_id, None)
try:
from tools.clarify_gateway import resolve_gateway_clarify
resolved = resolve_gateway_clarify(clarify_id, resolved_text)
except Exception as exc:
logger.error("[%s] resolve_gateway_clarify failed: %s", self.name, exc)
resolved = False
await query.answer(text=f"{resolved_text[:60]}")
try:
await query.edit_message_text(
text=f"{_html.escape(query.message.text or '')}\n\n<b>{_html.escape(user_display)}:</b> {_html.escape(resolved_text)}",
parse_mode=ParseMode.HTML,
reply_markup=None,
)
except Exception:
pass
if resolved:
logger.info(
"Telegram clarify button resolved (id=%s, choice=%r, user=%s)",
clarify_id, resolved_text, user_display,
)
else:
logger.warning(
"Telegram clarify button: resolve_gateway_clarify returned False (id=%s)",
clarify_id,
)
return
# --- Update prompt callbacks ---
if not data.startswith("update_prompt:"):
return
@@ -4761,27 +4579,6 @@ class TelegramAdapter(BasePlatformAdapter):
logger.debug("[%s] set_message_reaction failed (%s): %s", self.name, emoji, e)
return False
async def _clear_reactions(self, chat_id: str, message_id: str) -> bool:
"""Clear all reactions from a Telegram message.
Calling ``set_message_reaction`` with ``reaction=None`` (or an empty
sequence) is the documented Bot API way to remove all bot-set
reactions on a message equivalent to Bot API 10.0's
``deleteMessageReaction`` but supported in PTB 22.6 already.
"""
if not self._bot:
return False
try:
await self._bot.set_message_reaction(
chat_id=int(chat_id),
message_id=int(message_id),
reaction=None,
)
return True
except Exception as e:
logger.debug("[%s] clear reactions failed: %s", self.name, e)
return False
async def on_processing_start(self, event: MessageEvent) -> None:
"""Add an in-progress reaction when message processing begins."""
if not self._reactions_enabled():
@@ -4796,23 +4593,12 @@ class TelegramAdapter(BasePlatformAdapter):
Unlike Discord (additive reactions), Telegram's set_message_reaction
replaces all existing reactions in one call no remove step needed.
On CANCELLED outcomes (e.g. the user runs ``/stop``, or a session is
interrupted mid-flight), we explicitly clear the 👀 in-progress
reaction so it doesn't linger on the user's message indefinitely.
Without this clear, the only way to remove the 👀 was to wait for
another agent run to swap it to 👍/👎 which never happens if the
cancellation was the last activity in the chat.
"""
if not self._reactions_enabled():
return
chat_id = getattr(event.source, "chat_id", None)
message_id = getattr(event, "message_id", None)
if not (chat_id and message_id):
return
if outcome == ProcessingOutcome.CANCELLED:
await self._clear_reactions(chat_id, message_id)
else:
if chat_id and message_id and outcome != ProcessingOutcome.CANCELLED:
await self._set_reaction(
chat_id,
message_id,
-110
View File
@@ -5828,37 +5828,6 @@ class GatewayRunner:
)
_update_prompts.pop(_quick_key, None)
# Intercept messages that are responses to a pending clarify
# request that is awaiting free-form text (either an open-ended
# clarify with no choices, or one where the user picked the
# "Other" button). The first non-empty user message in the
# session resolves the clarify and unblocks the agent thread —
# we do NOT route it to the agent as a new turn.
try:
from tools import clarify_gateway as _clarify_mod
_pending_clarify = _clarify_mod.get_pending_for_session(_quick_key)
except Exception:
_pending_clarify = None
if _pending_clarify is not None:
_raw_clarify_reply = (event.text or "").strip()
# Skip slash commands — the user clearly wanted to issue a
# command, not answer the clarify. Leave the clarify pending
# so the user can retry; if it times out, the agent unblocks
# with an empty response.
if _raw_clarify_reply and not _raw_clarify_reply.startswith("/"):
_resolved = _clarify_mod.resolve_gateway_clarify(
_pending_clarify.clarify_id, _raw_clarify_reply,
)
if _resolved:
logger.info(
"Gateway intercepted clarify text response (session=%s, id=%s)",
_quick_key, _pending_clarify.clarify_id,
)
# Acknowledge with empty string so adapters that emit
# the agent's response don't double-post. The agent
# itself will produce the next user-facing message.
return ""
# Intercept messages that are responses to a pending /reload-mcp
# (or future) slash-confirm prompt. Recognized confirm replies are
# /approve, /always, /cancel (plus short aliases). Anything else
@@ -7543,7 +7512,6 @@ class GatewayRunner:
hook_ctx = {
"platform": source.platform.value if source.platform else "",
"user_id": source.user_id,
"chat_id": source.chat_id or "",
"session_id": session_entry.session_id,
"message": message_text[:500],
}
@@ -14989,76 +14957,6 @@ class GatewayRunner:
if _pdc is not None:
_pdc[session_key] = _release_bg_review_messages
# ------------------------------------------------------------------
# Clarify callback: present a clarify prompt and block on a response.
#
# Runs on the agent's worker thread (see clarify_tool's synchronous
# callback contract). Bridges sync→async by scheduling the
# adapter's send_clarify on the gateway event loop, then blocks on
# the clarify primitive's threading.Event with a configurable
# timeout. Returns the user's response string, or a sentinel
# explaining that no response arrived (so the agent can adapt
# rather than hang forever).
# ------------------------------------------------------------------
def _clarify_callback_sync(question: str, choices) -> str:
from tools import clarify_gateway as _clarify_mod
import uuid as _uuid
if not _status_adapter:
return ""
clarify_id = _uuid.uuid4().hex[:10]
_clarify_mod.register(
clarify_id=clarify_id,
session_key=session_key or "",
question=question,
choices=list(choices) if choices else None,
)
# Pause typing — like approval, we don't want a "thinking..."
# status to obscure the prompt or block the user from typing
# an "Other" response on platforms that disable input while
# typing is active (Slack Assistant API).
try:
_status_adapter.pause_typing_for_chat(_status_chat_id)
except Exception:
pass
send_ok = False
try:
fut = asyncio.run_coroutine_threadsafe(
_status_adapter.send_clarify(
chat_id=_status_chat_id,
question=question,
choices=list(choices) if choices else None,
clarify_id=clarify_id,
session_key=session_key or "",
metadata=_status_thread_metadata,
),
_loop_for_step,
)
result = fut.result(timeout=15)
send_ok = bool(getattr(result, "success", False))
except Exception as exc:
logger.warning("Clarify send failed: %s", exc)
send_ok = False
if not send_ok:
# Couldn't deliver the prompt — clean up and return
# sentinel so the agent can fall back to a sensible
# default rather than hanging.
_clarify_mod.clear_session(session_key or "")
return "[clarify prompt could not be delivered]"
timeout = _clarify_mod.get_clarify_timeout()
response = _clarify_mod.wait_for_response(clarify_id, timeout=float(timeout))
if response is None or response == "":
# Timeout or session-boundary cancellation
return f"[user did not respond within {int(timeout / 60)}m]"
return response
agent.clarify_callback = _clarify_callback_sync
# Store agent reference for interrupt support
agent_holder[0] = agent
# Capture the full tool definitions for transcript logging
@@ -15330,14 +15228,6 @@ class GatewayRunner:
result = agent.run_conversation(_run_message, conversation_history=agent_history, task_id=session_id)
finally:
unregister_gateway_notify(_approval_session_key)
# Cancel any pending clarify entries so blocked agent
# threads don't hang past the end of the run (interrupt,
# completion, gateway shutdown). Idempotent.
try:
from tools.clarify_gateway import clear_session as _clear_clarify_session
_clear_clarify_session(_approval_session_key)
except Exception:
pass
reset_current_session_key(_approval_session_token)
result_holder[0] = result
+5 -38
View File
@@ -124,33 +124,16 @@ def get_process_start_time(pid: int) -> Optional[int]:
def _read_process_cmdline(pid: int) -> Optional[str]:
"""Return the process command line as a space-separated string.
On Linux, reads /proc/<pid>/cmdline directly. On macOS and other
platforms without /proc, falls back to ``ps -p <pid> -o command=``.
"""
"""Return the process command line as a space-separated string."""
cmdline_path = Path(f"/proc/{pid}/cmdline")
try:
raw = cmdline_path.read_bytes()
except (FileNotFoundError, PermissionError, OSError):
pass
else:
if raw:
return raw.replace(b"\x00", b" ").decode("utf-8", errors="ignore").strip()
return None
try:
result = subprocess.run(
["ps", "-p", str(pid), "-o", "command="],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except (OSError, subprocess.TimeoutExpired):
pass
return None
if not raw:
return None
return raw.replace(b"\x00", b" ").decode("utf-8", errors="ignore").strip()
def _looks_like_gateway_process(pid: int) -> bool:
@@ -611,22 +594,6 @@ def acquire_scoped_lock(scope: str, identity: str, metadata: Optional[dict[str,
and current_start != existing.get("start_time")
):
stale = True
# When start_time comparison is unavailable (macOS / Windows
# have no /proc, so both sides are None), fall back to
# checking the live process command line. When cmdline is
# also unreadable (Windows has no ps), consult the lock
# record's own argv — the gateway writes it at startup and
# it's the only identity signal on platforms without ps.
# Both oracles must indicate "not a gateway" to mark stale.
if (
not stale
and existing.get("start_time") is None
and current_start is None
and not _looks_like_gateway_process(existing_pid)
):
live_cmdline = _read_process_cmdline(existing_pid)
if live_cmdline is not None or not _record_looks_like_gateway(existing):
stale = True
# Check if process is stopped (Ctrl+Z / SIGTSTP) — stopped
# processes still appear alive to _pid_exists but are not
# actually running. Treat them as stale so --replace works.
+1 -10
View File
@@ -5271,7 +5271,6 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
get_curated_nous_model_ids, get_pricing_for_provider,
check_nous_free_tier, partition_nous_models_by_tier,
union_with_portal_free_recommendations,
union_with_portal_paid_recommendations,
)
model_ids = get_curated_nous_model_ids()
@@ -5280,27 +5279,19 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
if model_ids:
pricing = get_pricing_for_provider("nous")
free_tier = check_nous_free_tier()
_portal_for_recs = auth_state.get("portal_base_url", "")
if free_tier:
# The Portal's freeRecommendedModels endpoint is the
# source of truth for what's free *right now*. Augment
# the curated list with anything new the Portal flags
# as free so users on older Hermes builds still see
# newly-launched free models without a CLI release.
_portal_for_recs = auth_state.get("portal_base_url", "")
model_ids, pricing = union_with_portal_free_recommendations(
model_ids, pricing, _portal_for_recs,
)
model_ids, unavailable_models = partition_nous_models_by_tier(
model_ids, pricing, free_tier=True,
)
else:
# Paid-tier mirror: pull paidRecommendedModels so newly
# launched paid models surface in the picker even if
# the in-repo curated list and docs-hosted manifest
# haven't caught up yet.
model_ids, pricing = union_with_portal_paid_recommendations(
model_ids, pricing, _portal_for_recs,
)
_portal = auth_state.get("portal_base_url", "")
if model_ids:
print(f"Showing {len(model_ids)} curated models — use \"Enter custom model name\" for others.")
+9 -12
View File
@@ -468,23 +468,20 @@ def telegram_bot_commands() -> list[tuple[str, str]]:
Telegram command names cannot contain hyphens, so they are replaced with
underscores. Aliases are skipped -- Telegram shows one menu entry per
canonical command.
canonical command. Commands that require arguments are skipped because
selecting a Telegram BotCommand sends only ``/command`` and would execute
an incomplete command.
Built-in commands that require arguments (e.g. /queue, /steer, /background)
are **included** because their handlers return usage text when selected
without a payload, making them discoverable via autocomplete.
Plugin-registered slash commands that require arguments are **excluded**
because plugins may not provide a no-arg usage fallback.
Plugin-registered slash commands are included so plugins get native
autocomplete in Telegram without touching core code.
"""
overrides = _resolve_config_gates()
result: list[tuple[str, str]] = []
for cmd in COMMAND_REGISTRY:
if not _is_gateway_available(cmd, overrides):
continue
# Built-in arg-taking commands are included — their handlers show
# usage text when invoked without arguments, and hiding them from
# the menu hurts discoverability (issue #24312).
if _requires_argument(cmd.args_hint):
continue
tg_name = _sanitize_telegram_name(cmd.name)
if tg_name:
result.append((tg_name, cmd.description))
@@ -1362,9 +1359,9 @@ class SlashCommandCompleter(Completer):
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=2,
cwd=cwd, encoding="utf-8", errors="replace",
cwd=cwd,
)
if proc.returncode == 0 and proc.stdout and proc.stdout.strip():
if proc.returncode == 0 and proc.stdout.strip():
raw = proc.stdout.strip().split("\n")
# Store relative paths
for p in raw[:5000]:
-67
View File
@@ -477,12 +477,6 @@ DEFAULT_CONFIG = {
# threshold before escalating to a full timeout. The warning fires
# once per run and does not interrupt the agent. 0 = disable warning.
"gateway_timeout_warning": 900,
# Maximum time (seconds) the gateway will block an agent waiting for
# a clarify-tool response from the user. Hit this and the agent
# unblocks with "[user did not respond within Xm]" so it can adapt
# rather than pinning the running-agent guard forever. CLI clarify
# blocks indefinitely (input() is synchronous) and ignores this.
"clarify_timeout": 600,
# Periodic "still working" notification interval (seconds).
# Sends a status message every N seconds so the user knows the
# agent hasn't died during long tasks. 0 = disable notifications.
@@ -634,12 +628,6 @@ DEFAULT_CONFIG = {
# so the server maps it to a persistent Firefox profile automatically.
# When false (default), each session gets a random userId (ephemeral).
"managed_persistence": False,
# Optional externally managed Camofox identity. Useful when another
# app owns the visible browser and Hermes should operate in it.
"user_id": "",
"session_key": "",
# Rehydrate tab_id from Camofox before creating a new tab.
"adopt_existing_tab": False,
},
},
@@ -929,14 +917,6 @@ DEFAULT_CONFIG = {
"persistent_output": True,
"persistent_output_max_lines": 200,
"inline_diffs": True, # Show inline diff previews for write actions (write_file, patch, skill_manage)
# File-mutation verifier footer. When true (default), the agent
# appends a one-line advisory to its final response whenever a
# write_file / patch call failed during the turn and was never
# superseded by a successful write to the same path. This catches
# the "batch of parallel patches, half fail, model claims success"
# class of over-claim that otherwise forces users to run
# `git status` to verify edits landed. Set false to suppress.
"file_mutation_verifier": True,
"show_cost": False, # Show $ cost in the status bar (off by default)
"skin": "default",
# UI language for static user-facing messages (approval prompts, a
@@ -1505,53 +1485,6 @@ DEFAULT_CONFIG = {
"backup_keep": 5,
},
# Language Server Protocol — semantic diagnostics from real
# language servers (pyright, gopls, rust-analyzer, etc.) wired
# into the post-write lint check used by ``write_file`` and
# ``patch``.
#
# LSP is gated on git-workspace detection: when the agent's
# cwd (or the file being edited) is inside a git worktree, LSP
# runs against that workspace. When neither is in a git repo,
# LSP stays dormant and the in-process syntax check is the only
# tier — handy for Telegram/Discord chats where the cwd is the
# user's home directory.
"lsp": {
# Master toggle. Setting this to false disables the entire
# subsystem — no servers spawn, no background event loop, no
# cost.
"enabled": True,
# Diagnostic-wait mode for the post-write check.
# ``"document"`` waits up to ``wait_timeout`` seconds for the
# current file's diagnostics; ``"full"`` additionally requests
# workspace-wide diagnostics (slower).
"wait_mode": "document",
"wait_timeout": 5.0,
# How to handle missing server binaries.
# ``"auto"`` — try to install via npm/go/pip into
# ``<HERMES_HOME>/lsp/bin/`` on first use.
# ``"manual"`` — only use binaries already on PATH.
# ``"off"`` — alias for ``manual``.
"install_strategy": "auto",
# Per-server overrides. Each key is a server_id from the
# registry (``pyright``, ``typescript``, ``gopls``,
# ``rust-analyzer``, etc.) and accepts:
# disabled: true
# — skip this server even when its extensions match
# command: ["full/path/to/server", "--stdio"]
# — pin a custom binary path; bypasses auto-install
# env: {"KEY": "value"}
# — extra env vars passed to the spawned process
# initialization_options: {...}
# — merged into the LSP ``initializationOptions``
# Empty by default; the registry defaults work for typical
# setups.
"servers": {},
},
# Config schema version - bump this when adding new required fields
"_config_version": 23,
}
+1 -2
View File
@@ -287,8 +287,7 @@ def _build_apikey_providers_list() -> list:
(_pp.models_url or (_pp.base_url.rstrip("/") + "/models"))
if _pp.base_url else None
)
_hc = getattr(_pp, "supports_health_check", True)
_static.append((_label, _key_vars, _models_url, _base_var, _hc))
_static.append((_label, _key_vars, _models_url, _base_var, True))
except Exception:
pass
return _static
+2 -16
View File
@@ -2164,7 +2164,7 @@ Environment="PATH={sane_path}"
Environment="VIRTUAL_ENV={venv_dir}"
Environment="HERMES_HOME={hermes_home}"
Restart=always
RestartSec=5
RestartSec=60
RestartMaxDelaySec=300
RestartSteps=5
RestartForceExitStatus={GATEWAY_SERVICE_RESTART_EXIT_CODE}
@@ -2199,7 +2199,7 @@ Environment="PATH={sane_path}"
Environment="VIRTUAL_ENV={venv_dir}"
Environment="HERMES_HOME={hermes_home}"
Restart=always
RestartSec=5
RestartSec=60
RestartMaxDelaySec=300
RestartSteps=5
RestartForceExitStatus={GATEWAY_SERVICE_RESTART_EXIT_CODE}
@@ -3658,15 +3658,6 @@ def _all_platforms() -> list[dict]:
``hermes setup gateway`` without needing the gateway to be running.
Built-ins keep their dict shape; plugin entries are adapted to the same
shape with ``_registry_entry`` holding the source.
Platform-specific gating: some platforms can't be configured on
every host. Currently:
- Matrix is hidden on Windows. The [matrix] extra pulls
``mautrix[encryption]`` -> ``python-olm``, which has no Windows
wheel and needs ``make`` + libolm to build from sdist. There's
no native Windows path that works, so we don't offer it in the
picker. Users who want Matrix on Windows can run hermes under
WSL.
"""
# Populate the registry so plugin platforms are visible. Idempotent.
# Bundled platform plugins (``kind: platform``) auto-load unconditionally,
@@ -3680,11 +3671,6 @@ def _all_platforms() -> list[dict]:
logger.debug("plugin discovery failed during platform enumeration: %s", e)
platforms = [dict(p) for p in _PLATFORMS]
# Drop platforms that can't function on this host. See docstring.
if sys.platform == "win32":
platforms = [p for p in platforms if p.get("key") != "matrix"]
by_key = {p["key"]: p for p in platforms}
try:
+1 -21
View File
@@ -2590,7 +2590,6 @@ def _model_flow_nous(config, current_model="", args=None):
check_nous_free_tier,
partition_nous_models_by_tier,
union_with_portal_free_recommendations,
union_with_portal_paid_recommendations,
)
model_ids = get_curated_nous_model_ids()
@@ -2646,10 +2645,6 @@ def _model_flow_nous(config, current_model="", args=None):
# with the Portal's freeRecommendedModels list so newly-launched free
# models show up even if this CLI build's hardcoded curated list and
# docs-hosted manifest haven't caught up yet.
#
# For paid users: mirror the same idea with paidRecommendedModels so
# newly-launched paid models surface in the picker too — independent
# of CLI release cadence.
unavailable_models: list[str] = []
if free_tier:
model_ids, pricing = union_with_portal_free_recommendations(
@@ -2658,10 +2653,6 @@ def _model_flow_nous(config, current_model="", args=None):
model_ids, unavailable_models = partition_nous_models_by_tier(
model_ids, pricing, free_tier=True
)
else:
model_ids, pricing = union_with_portal_paid_recommendations(
model_ids, pricing, _nous_portal_url,
)
if not model_ids and not unavailable_models:
print("No models available for Nous Portal after filtering.")
@@ -9390,7 +9381,7 @@ def main():
gateway_parser = subparsers.add_parser(
"gateway",
help="Messaging gateway management",
description="Manage the messaging gateway (Telegram, Discord, WhatsApp, Weixin, and more)",
description="Manage the messaging gateway (Telegram, Discord, WhatsApp)",
)
gateway_subparsers = gateway_parser.add_subparsers(dest="gateway_command")
@@ -9533,17 +9524,6 @@ def main():
gateway_parser.set_defaults(func=cmd_gateway)
# =========================================================================
# lsp command
# =========================================================================
try:
from agent.lsp.cli import register_subparser as _lsp_register
_lsp_register(subparsers)
except Exception as _lsp_err: # noqa: BLE001
# LSP is optional infrastructure — never let a registration
# failure break the CLI overall.
logger.debug("LSP CLI registration failed: %s", _lsp_err)
# =========================================================================
# setup command
# =========================================================================
-65
View File
@@ -621,71 +621,6 @@ def union_with_portal_free_recommendations(
return (augmented_ids, augmented_pricing)
def union_with_portal_paid_recommendations(
curated_ids: list[str],
pricing: dict[str, dict[str, str]],
portal_base_url: str = "",
*,
force_refresh: bool = False,
) -> tuple[list[str], dict[str, dict[str, str]]]:
"""Augment curated list with the Portal's ``paidRecommendedModels``.
Mirror of :func:`union_with_portal_free_recommendations` for paid-tier
users. The Portal's ``/api/nous/recommended-models`` endpoint advertises
which paid models are blessed *right now* independent of what the
in-repo ``_PROVIDER_MODELS["nous"]`` list happens to contain or whether
the docs-hosted catalog manifest has been rebuilt since the last release.
For paid-tier users this lets newly-launched paid models surface in the
picker even if the user is running an older Hermes that doesn't ship
them in its hardcoded curated list. This function returns an augmented
``(model_ids, pricing)`` pair where:
* Portal paid recommendations missing from ``curated_ids`` are
appended at the front (so the picker shows them first).
* ``pricing`` is left untouched we deliberately do NOT synthesize
pricing entries for paid models. Live pricing is fetched separately
via :func:`get_pricing_for_provider`; if the live endpoint hasn't
published pricing yet, the picker shows a blank price column rather
than fabricating numbers. (The free helper synthesizes ``$0`` so
:func:`partition_nous_models_by_tier` keeps free models selectable;
no equivalent gating applies on the paid side, so synthesis would
only mislead the user.)
Failures (network, parse, missing field) are silent and degrade to
returning the inputs unchanged never block the picker on a
Portal-side hiccup.
"""
try:
payload = fetch_nous_recommended_models(
portal_base_url, force_refresh=force_refresh
)
except Exception:
return (list(curated_ids), dict(pricing))
paid_block = payload.get("paidRecommendedModels") if isinstance(payload, dict) else None
if not isinstance(paid_block, list) or not paid_block:
return (list(curated_ids), dict(pricing))
portal_paid_ids: list[str] = []
for entry in paid_block:
name = _extract_model_name(entry)
if name:
portal_paid_ids.append(name)
if not portal_paid_ids:
return (list(curated_ids), dict(pricing))
augmented_ids = list(curated_ids)
seen = set(augmented_ids)
# Prepend Portal paid recommendations that aren't already curated, so
# the Portal-blessed picks surface first in the picker.
new_ones = [mid for mid in portal_paid_ids if mid not in seen]
if new_ones:
augmented_ids = new_ones + augmented_ids
return (augmented_ids, dict(pricing))
# ---------------------------------------------------------------------------
# TTL cache for free-tier detection — avoids repeated API calls within a
# session while still picking up upgrades quickly.
+4 -29
View File
@@ -4021,9 +4021,6 @@ def _get_dashboard_plugins(force_rescan: bool = False) -> list:
global _dashboard_plugins_cache
if _dashboard_plugins_cache is None or force_rescan:
_dashboard_plugins_cache = _discover_dashboard_plugins()
elif _dashboard_plugins_cache:
if any(not Path(p["_dir"]).is_dir() for p in _dashboard_plugins_cache):
_dashboard_plugins_cache = _discover_dashboard_plugins()
return _dashboard_plugins_cache
@@ -4435,33 +4432,11 @@ def start_server(
if open_browser:
import webbrowser
# On headless Linux (no DISPLAY or WAYLAND_DISPLAY) some registered
# browsers are TUI programs (links, lynx, www-browser) that try to
# take over the terminal. That can send SIGHUP to the server process
# and cause an immediate exit even though uvicorn bound successfully.
# Skip the auto-open attempt on headless systems and let the user
# open the URL manually. macOS and Windows are always considered
# display-capable.
_has_display = (
sys.platform != "linux"
or bool(os.environ.get("DISPLAY"))
or bool(os.environ.get("WAYLAND_DISPLAY"))
)
def _open():
time.sleep(1.0)
webbrowser.open(f"http://{host}:{port}")
if _has_display:
def _open():
try:
time.sleep(1.0)
webbrowser.open(f"http://{host}:{port}")
except Exception:
pass
threading.Thread(target=_open, daemon=True).start()
else:
_log.debug(
"Skipping browser-open: no DISPLAY or WAYLAND_DISPLAY detected "
"(headless Linux). Pass --no-open to suppress this detection."
)
threading.Thread(target=_open, daemon=True).start()
print(f" Hermes Web UI → http://{host}:{port}")
uvicorn.run(app, host=host, port=port, log_level="warning")
@@ -1,14 +0,0 @@
{
"name": "example",
"label": "Example",
"description": "Example dashboard plugin — used by test suite for auth coverage",
"icon": "Sparkles",
"version": "1.0.0",
"tab": {
"path": "/example",
"position": "after:skills"
},
"slots": [],
"entry": "dist/index.js",
"api": "plugin_api.py"
}
@@ -1,17 +0,0 @@
"""Example dashboard plugin — backend API routes.
Mounted at /api/plugins/example/ by the dashboard plugin system.
This minimal plugin exists so the test suite has a stable, side-effect-free
GET endpoint to verify that plugin API routes work with auth.
"""
from fastapi import APIRouter
router = APIRouter()
@router.get("/hello")
async def hello():
"""Simple greeting endpoint to demonstrate plugin API routes."""
return {"message": "Hello from the example plugin!", "plugin": "example", "version": "1.0.0"}
+230
View File
@@ -0,0 +1,230 @@
"""LSP Plugin — semantic diagnostics from real language servers.
Hooks into write_file/patch via the Hermes plugin system to surface
type errors, undefined names, missing imports, and other semantic
issues detected by pyright, gopls, rust-analyzer, typescript-language-server,
and ~20 more.
Opt-in: add ``lsp`` to ``plugins.enabled`` in config.yaml.
"""
from __future__ import annotations
import atexit
import json
import logging
import os
import threading
from typing import Any
logger = logging.getLogger("plugins.lsp")
# Module-level state
_service: Any = None # LSPService | None
_service_lock = threading.Lock()
# Presence set: (session_id, abs_path) entries where a baseline was captured.
_baselines: set[tuple[str, str]] = set()
def register(ctx) -> None:
"""Plugin registration — wire hooks and CLI commands."""
ctx.register_hook("on_session_end", _on_session_end)
ctx.register_hook("pre_tool_call", _pre_tool_call)
ctx.register_hook("transform_tool_result", _transform_tool_result)
try:
from plugins.lsp.cli import setup_lsp_parser, run_lsp_command
ctx.register_cli_command(
name="lsp",
help="Language Server Protocol management",
setup_fn=setup_lsp_parser,
handler_fn=run_lsp_command,
)
except Exception as e:
logger.debug("LSP CLI registration failed: %s", e)
atexit.register(_on_session_end)
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def _on_session_end(**kwargs) -> None:
"""Tear down all language servers and clear baselines."""
global _service
with _service_lock:
if _service is not None:
try:
_service.shutdown()
except Exception as e:
logger.debug("LSP shutdown error: %s", e)
_service = None
_baselines.clear()
# ---------------------------------------------------------------------------
# Tool hooks
# ---------------------------------------------------------------------------
def _pre_tool_call(**kwargs) -> None:
"""Snapshot LSP baseline before a file write."""
tool_name = kwargs.get("tool_name", "")
if tool_name not in ("write_file", "patch"):
return
svc = _ensure_service()
if svc is None:
return
args = _parse_args(kwargs.get("args"))
if args is None:
return
path = args.get("path", "")
if not path:
return
abs_path = _resolve_path(path)
# Best-effort local-only check: skip if parent dir doesn't exist on host
if not os.path.exists(os.path.dirname(abs_path) or "."):
return
if not svc.enabled_for(abs_path):
return
session_id = kwargs.get("session_id") or ""
key = (session_id, abs_path)
try:
svc.snapshot_baseline(abs_path)
_baselines.add(key)
except Exception as e:
logger.debug("LSP baseline snapshot failed for %s: %s", abs_path, e)
def _transform_tool_result(**kwargs) -> str | None:
"""Inject LSP diagnostics into the tool result JSON.
Returns modified result string with ``lsp_diagnostics`` field,
or None to leave unchanged.
"""
tool_name = kwargs.get("tool_name", "")
if tool_name not in ("write_file", "patch"):
return None
svc = _service
if svc is None or not svc.is_active():
return None
args = _parse_args(kwargs.get("args"))
if args is None:
return None
path = args.get("path", "")
if not path:
return None
abs_path = _resolve_path(path)
session_id = kwargs.get("session_id") or ""
key = (session_id, abs_path)
if key not in _baselines:
return None
_baselines.discard(key)
# Fetch diagnostics with short timeout
try:
diagnostics = svc.get_diagnostics_sync(abs_path, delta=True, timeout=3.0)
except Exception as e:
logger.debug("LSP diagnostics fetch failed for %s: %s", abs_path, e)
return None
if not diagnostics:
return None
# Format
try:
from plugins.lsp.reporter import report_for_file, truncate
block = report_for_file(abs_path, diagnostics)
if not block:
return None
lsp_output = truncate(block)
except Exception:
return None
# Inject into result JSON (only when result is a JSON dict)
result = kwargs.get("result")
if not isinstance(result, str):
return None
try:
result_data = json.loads(result)
if not isinstance(result_data, dict):
return None
result_data["lsp_diagnostics"] = lsp_output
return json.dumps(result_data, ensure_ascii=False)
except (json.JSONDecodeError, TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _ensure_service():
"""Lazy-initialize the LSP service singleton."""
global _service
svc = _service
if svc is not None:
return svc if svc.is_active() else None
with _service_lock:
if _service is not None:
return _service if _service.is_active() else None
try:
from plugins.lsp.manager import LSPService
_service = LSPService.create_from_config()
except Exception as e:
logger.debug("LSP service creation failed: %s", e)
return None
return _service if (_service and _service.is_active()) else None
def _parse_args(args) -> dict[str, Any] | None:
"""Normalize args (may be dict or JSON string)."""
if isinstance(args, dict):
return args
if isinstance(args, str):
try:
parsed = json.loads(args)
if isinstance(parsed, dict):
return parsed
except (json.JSONDecodeError, TypeError):
pass
return None
def _resolve_path(path: str) -> str:
"""Expand and absolutify a path."""
expanded = os.path.expanduser(path)
if not os.path.isabs(expanded):
expanded = os.path.join(os.getcwd(), expanded)
return os.path.normpath(expanded)
# ---------------------------------------------------------------------------
# Public API (used by plugins/lsp/cli.py)
# ---------------------------------------------------------------------------
def get_service():
"""Return the active LSP service or None."""
svc = _service
return svc if (svc is not None and svc.is_active()) else None
def shutdown_service() -> None:
"""Tear down the LSP service (idempotent)."""
_on_session_end()
+53 -48
View File
@@ -67,6 +67,49 @@ def register_subparser(subparsers: argparse._SubParsersAction) -> None:
parser.set_defaults(func=run_lsp_command)
def setup_lsp_parser(parser: argparse.ArgumentParser) -> None:
"""Set up subcommands on an already-created 'lsp' parser.
Called by the plugin system's register_cli_command pathway, where
main.py creates the top-level ``hermes lsp`` parser and passes it
to us for subcommand wiring.
"""
sub = parser.add_subparsers(dest="lsp_command")
sub_status = sub.add_parser("status", help="Show LSP service status")
sub_status.add_argument(
"--json", action="store_true", help="Emit machine-readable JSON"
)
sub_list = sub.add_parser("list", help="List supported language servers")
sub_list.add_argument(
"--installed-only",
action="store_true",
help="Only show servers whose binary is currently available",
)
sub_install = sub.add_parser("install", help="Install a server binary")
sub_install.add_argument("server", help="Server id (e.g. pyright, gopls)")
sub_install_all = sub.add_parser(
"install-all",
help="Install every server with a known auto-install recipe",
)
sub_install_all.add_argument(
"--include-manual",
action="store_true",
help="Even attempt servers marked manual-install (best effort)",
)
sub_restart = sub.add_parser(
"restart",
help="Tear down running LSP clients (next edit re-spawns)",
)
sub_which = sub.add_parser("which", help="Print binary path for a server")
sub_which.add_argument("server", help="Server id")
def run_lsp_command(args: argparse.Namespace) -> int:
"""Top-level dispatcher for ``hermes lsp <subcommand>``."""
sub = getattr(args, "lsp_command", None) or "status"
@@ -90,9 +133,9 @@ def run_lsp_command(args: argparse.Namespace) -> int:
def _cmd_status(emit_json: bool) -> int:
from agent.lsp import get_service
from agent.lsp.servers import SERVERS
from agent.lsp.install import detect_status
from plugins.lsp import get_service
from plugins.lsp.servers import SERVERS
from plugins.lsp.install import detect_status
svc = get_service()
service_active = svc is not None
@@ -140,17 +183,6 @@ def _cmd_status(emit_json: bool) -> int:
disabled = info.get("disabled_servers") or []
if disabled:
out.append(f" disabled in cfg: {', '.join(disabled)}")
# Surface backend-tool gaps that aren't visible in the registry table:
# some servers spawn fine but emit no diagnostics without a sidecar
# binary (bash-language-server -> shellcheck).
backend_warnings = _backend_warnings()
if backend_warnings:
out.append("")
out.append("Backend warnings")
out.append("================")
for line in backend_warnings:
out.append(f" ! {line}")
out.append("")
out.append("Registered Servers")
out.append("==================")
@@ -175,8 +207,8 @@ def _cmd_status(emit_json: bool) -> int:
def _cmd_list(installed_only: bool) -> int:
from agent.lsp.servers import SERVERS
from agent.lsp.install import detect_status
from plugins.lsp.servers import SERVERS
from plugins.lsp.install import detect_status
for s in SERVERS:
pkg = _recipe_pkg_for(s.server_id)
@@ -190,7 +222,7 @@ def _cmd_list(installed_only: bool) -> int:
def _cmd_install(server_id: str) -> int:
from agent.lsp.install import try_install, INSTALL_RECIPES, detect_status
from plugins.lsp.install import try_install, INSTALL_RECIPES, detect_status
pkg = _recipe_pkg_for(server_id)
pre_status = detect_status(pkg)
if pre_status == "installed":
@@ -214,8 +246,8 @@ def _cmd_install(server_id: str) -> int:
def _cmd_install_all(include_manual: bool) -> int:
from agent.lsp.servers import SERVERS
from agent.lsp.install import try_install, INSTALL_RECIPES, detect_status
from plugins.lsp.servers import SERVERS
from plugins.lsp.install import try_install, INSTALL_RECIPES, detect_status
rc = 0
for s in SERVERS:
@@ -240,7 +272,7 @@ def _cmd_install_all(include_manual: bool) -> int:
def _cmd_restart() -> int:
from agent.lsp import shutdown_service
from plugins.lsp import shutdown_service
shutdown_service()
sys.stdout.write("LSP service shut down. Next edit will respawn clients.\n")
@@ -248,7 +280,7 @@ def _cmd_restart() -> int:
def _cmd_which(server_id: str) -> int:
from agent.lsp.install import INSTALL_RECIPES, hermes_lsp_bin_dir
from plugins.lsp.install import INSTALL_RECIPES, hermes_lsp_bin_dir
import os
import shutil as _shutil
@@ -279,30 +311,3 @@ def _recipe_pkg_for(server_id: str) -> str:
"typescript": "typescript-language-server",
}
return aliases.get(server_id, server_id)
def _backend_warnings() -> list:
"""Return human-readable notes about LSP backend tools that are missing
in a way that won't surface elsewhere.
Some language servers ship as thin wrappers around an external CLI for
actual diagnostics they spawn cleanly but never emit any errors when
the sidecar binary isn't on PATH. bash-language-server / shellcheck
is the load-bearing example.
Returned strings are short, actionable, and include the install
suggestion across common platforms.
"""
import shutil as _shutil
from agent.lsp.install import hermes_lsp_bin_dir
notes: list = []
bash_installed = _shutil.which("bash-language-server") is not None or (
(hermes_lsp_bin_dir() / "bash-language-server").exists()
)
if bash_installed and _shutil.which("shellcheck") is None:
notes.append(
"bash-language-server is installed but shellcheck is missing — "
"diagnostics will be empty (apt: shellcheck, brew: shellcheck, "
"scoop: shellcheck)."
)
return notes
@@ -48,7 +48,7 @@ from pathlib import Path
from typing import Any, Awaitable, Callable, Dict, List, Optional, Set
from urllib.parse import quote, unquote
from agent.lsp.protocol import (
from plugins.lsp.protocol import (
ERROR_CONTENT_MODIFIED,
ERROR_METHOD_NOT_FOUND,
LSPProtocolError,
@@ -33,7 +33,7 @@ import subprocess
import sys
import threading
from pathlib import Path
from typing import Any, Dict, Optional
from typing import Dict, Optional
logger = logging.getLogger("agent.lsp.install")
@@ -41,13 +41,7 @@ logger = logging.getLogger("agent.lsp.install")
# tuple of strategy name + package name + executable name. When the
# install completes, we look for the executable in
# ``<HERMES_HOME>/lsp/bin/`` first, then on PATH.
#
# Optional fields:
# - ``extra_pkgs``: list of sibling packages to install alongside
# ``pkg`` in the same node_modules tree. Used when an LSP server
# has a runtime peer dependency that npm doesn't auto-pull (e.g.
# typescript-language-server needs ``typescript``).
INSTALL_RECIPES: Dict[str, Dict[str, Any]] = {
INSTALL_RECIPES: Dict[str, Dict[str, str]] = {
# Python
"pyright": {"strategy": "npm", "pkg": "pyright", "bin": "pyright-langserver"},
# JS/TS family
@@ -55,11 +49,6 @@ INSTALL_RECIPES: Dict[str, Dict[str, Any]] = {
"strategy": "npm",
"pkg": "typescript-language-server",
"bin": "typescript-language-server",
# typescript-language-server requires the `typescript` SDK
# (tsserver) to be importable from the same node_modules tree;
# otherwise initialize() fails with "Could not find a valid
# TypeScript installation". Install them together.
"extra_pkgs": ["typescript"],
},
"@vue/language-server": {
"strategy": "npm",
@@ -190,11 +179,7 @@ def _do_install(pkg: str) -> Optional[str]:
return None
if strategy == "npm":
return _install_npm(
recipe.get("pkg", pkg),
bin_name,
extra_pkgs=recipe.get("extra_pkgs") or [],
)
return _install_npm(recipe.get("pkg", pkg), bin_name)
if strategy == "go":
return _install_go(recipe.get("pkg", pkg), bin_name)
if strategy == "pip":
@@ -204,36 +189,22 @@ def _do_install(pkg: str) -> Optional[str]:
return None
def _install_npm(
pkg: str,
bin_name: str,
extra_pkgs: Optional[list] = None,
) -> Optional[str]:
def _install_npm(pkg: str, bin_name: str) -> Optional[str]:
"""Install an npm package into our staging dir.
Uses ``npm install --prefix`` so the binaries land in
``<staging>/node_modules/.bin/<bin_name>`` and we symlink them up
one level for direct PATH-style access.
``extra_pkgs`` is a list of sibling packages to install in the
same ``node_modules`` tree. Used for LSP servers with runtime
peer deps that npm doesn't auto-pull (typescript-language-server
needs ``typescript`` next to it; intelephense ships standalone).
"""
npm = shutil.which("npm")
if npm is None:
logger.info("[install] cannot install %s: npm not on PATH", pkg)
return None
staging = hermes_lsp_bin_dir().parent # <HERMES_HOME>/lsp/
install_targets = [pkg] + list(extra_pkgs or [])
try:
logger.info(
"[install] npm install --prefix %s %s",
staging,
" ".join(install_targets),
)
logger.info("[install] npm install --prefix %s %s", staging, pkg)
proc = subprocess.run(
[npm, "install", "--prefix", str(staging), "--silent", "--no-fund", "--no-audit", *install_targets],
[npm, "install", "--prefix", str(staging), "--silent", "--no-fund", "--no-audit", pkg],
check=False,
capture_output=True,
text=True,
+10 -81
View File
@@ -42,20 +42,20 @@ import time
from concurrent.futures import Future as ConcurrentFuture
from typing import Any, Dict, List, Optional, Tuple
from agent.lsp import eventlog
from agent.lsp.client import (
from plugins.lsp import eventlog
from plugins.lsp.client import (
DIAGNOSTICS_DOCUMENT_WAIT,
LSPClient,
file_uri,
)
from agent.lsp.servers import (
from plugins.lsp.servers import (
ServerContext,
ServerDef,
SpawnSpec,
find_server_for_file,
language_id_for,
)
from agent.lsp.workspace import (
from plugins.lsp.workspace import (
clear_cache,
is_inside_workspace,
resolve_workspace_for_file,
@@ -248,15 +248,8 @@ class LSPService:
def enabled_for(self, file_path: str) -> bool:
"""Return True iff LSP should run for this specific file.
Gates on workspace detection (file or cwd inside a git worktree),
on whether any registered server matches the extension, and
on whether the (server_id, workspace_root) pair is in the
broken-set from a previous spawn failure.
Files in already-broken pairs return False so the file_operations
layer skips the LSP path entirely no spawn attempts, no
timeout cost until the service is restarted (``hermes lsp
restart``) or the process exits.
Gates on workspace detection (file or cwd inside a git worktree)
and on whether any registered server matches the extension.
"""
if not self._enabled:
return False
@@ -264,19 +257,7 @@ class LSPService:
if srv is None or srv.server_id in self._disabled_servers:
return False
ws_root, gated_in = resolve_workspace_for_file(file_path)
if not (ws_root and gated_in):
return False
# Broken-set short-circuit. Use the per-server root if we can
# compute one cheaply; otherwise fall back to the workspace
# root as the broken key (which is what _get_or_spawn would
# have used anyway when it failed).
try:
per_server_root = srv.resolve_root(file_path, ws_root) or ws_root
except Exception: # noqa: BLE001
per_server_root = ws_root
if (srv.server_id, per_server_root) in self._broken:
return False
return True
return bool(ws_root and gated_in)
def snapshot_baseline(self, file_path: str) -> None:
"""Snapshot current diagnostics for ``file_path`` as the delta baseline.
@@ -284,10 +265,6 @@ class LSPService:
Called BEFORE a write so the next ``get_diagnostics_sync()``
can filter out pre-existing errors. Best-effort failures
are silently swallowed so a flaky server can't break a write.
Outer timeouts (e.g. server hangs during initialize) mark the
(server_id, workspace_root) pair as broken so subsequent edits
skip it instantly instead of re-paying the timeout cost.
"""
if not self.enabled_for(file_path):
return
@@ -296,7 +273,9 @@ class LSPService:
self._delta_baseline[os.path.abspath(file_path)] = diags or []
except Exception as e: # noqa: BLE001
logger.debug("baseline snapshot failed for %s: %s", file_path, e)
self._mark_broken_for_file(file_path, e)
# Set empty baseline so the next call still does the
# comparison (any post-edit diagnostic will be considered
# "new" — safe default).
self._delta_baseline[os.path.abspath(file_path)] = []
def get_diagnostics_sync(
@@ -332,12 +311,10 @@ class LSPService:
except asyncio.TimeoutError as e:
eventlog.log_timeout(server_id, file_path)
logger.debug("LSP diagnostics timeout for %s: %s", file_path, e)
self._mark_broken_for_file(file_path, e)
return []
except Exception as e: # noqa: BLE001
eventlog.log_server_error(server_id, file_path, e)
logger.debug("LSP diagnostics fetch failed for %s: %s", file_path, e)
self._mark_broken_for_file(file_path, e)
return []
abs_path = os.path.abspath(file_path)
@@ -362,54 +339,6 @@ class LSPService:
eventlog.log_clean(server_id, file_path)
return diags
def _mark_broken_for_file(self, file_path: str, exc: BaseException) -> None:
"""Mark the (server_id, workspace_root) pair as broken so subsequent
edits skip it instantly instead of re-paying timeout cost.
Called when the outer ``_loop.run`` timeout cancels an in-flight
spawn/initialize that the inner ``_get_or_spawn`` task was still
holding open. Without this, every subsequent write would re-enter
the spawn path and re-pay the full ``snapshot_baseline``
timeout (8s) until the binary is fixed.
Also kills any orphan client process that survived the cancelled
future, and emits a single eventlog WARNING so the user knows
which server gave up.
``exc`` is whatever exception the outer wrapper caught used
only for logging, never re-raised.
"""
srv = find_server_for_file(file_path)
if srv is None:
return
ws_root, gated = resolve_workspace_for_file(file_path)
if not (ws_root and gated):
return
try:
per_server_root = srv.resolve_root(file_path, ws_root) or ws_root
except Exception: # noqa: BLE001
per_server_root = ws_root
key = (srv.server_id, per_server_root)
already_broken = key in self._broken
self._broken.add(key)
# Kill any client we managed to spawn before the timeout. The
# cancelled future never reached the broken-set add inside
# ``_get_or_spawn`` so the client may still be hanging in
# ``_clients`` with a half-initialized state.
with self._state_lock:
client = self._clients.pop(key, None)
if client is not None:
try:
# Fire-and-forget shutdown — give it a second to cleanup,
# but don't block. We're already on a slow path.
self._loop.run(client.shutdown(), timeout=1.0)
except Exception: # noqa: BLE001
pass
if not already_broken:
eventlog.log_spawn_failed(srv.server_id, per_server_root, exc)
def shutdown(self) -> None:
"""Tear down all clients and stop the background loop."""
if not self._enabled:
+11
View File
@@ -0,0 +1,11 @@
name: lsp
version: "1.0.0"
description: >-
Semantic diagnostics from real language servers (pyright, gopls,
rust-analyzer, typescript-language-server, etc.) surfaced on
write_file/patch. Opt-in: add 'lsp' to plugins.enabled in config.yaml.
author: NousResearch
hooks:
- pre_tool_call
- transform_tool_result
- on_session_end
+14 -29
View File
@@ -25,7 +25,7 @@ import shutil
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
from agent.lsp.workspace import nearest_root, normalize_path
from plugins.lsp.workspace import nearest_root, normalize_path
logger = logging.getLogger("agent.lsp.servers")
@@ -231,7 +231,7 @@ def _spawn_pyright(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
"pyright-langserver", "pyright"
)
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("pyright", ctx.install_strategy)
if bin_path is None:
return None
@@ -274,7 +274,7 @@ def _detect_python(root: str) -> Optional[str]:
def _spawn_typescript(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "typescript") or _which("typescript-language-server")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("typescript-language-server", ctx.install_strategy)
if bin_path is None:
return None
@@ -291,7 +291,7 @@ def _spawn_typescript(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_gopls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "gopls") or _which("gopls")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("gopls", ctx.install_strategy)
if bin_path is None:
return None
@@ -307,7 +307,7 @@ def _spawn_gopls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_rust_analyzer(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "rust-analyzer") or _which("rust-analyzer")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("rust-analyzer", ctx.install_strategy)
if bin_path is None:
return None
@@ -323,7 +323,7 @@ def _spawn_rust_analyzer(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_clangd(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "clangd") or _which("clangd")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("clangd", ctx.install_strategy)
if bin_path is None:
return None
@@ -336,28 +336,13 @@ def _spawn_clangd(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
)
_BASH_SHELLCHECK_WARNED = False
def _spawn_bash_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "bash-language-server") or _which("bash-language-server")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("bash-language-server", ctx.install_strategy)
if bin_path is None:
return None
# bash-language-server delegates diagnostics to ``shellcheck``. Without
# it on PATH the server starts and accepts requests but never reports
# any problems — to the user it looks like a working integration that
# never finds bugs. Warn once so the gap is visible.
global _BASH_SHELLCHECK_WARNED
if not _BASH_SHELLCHECK_WARNED and _which("shellcheck") is None:
_BASH_SHELLCHECK_WARNED = True
logger.warning(
"bash-language-server: shellcheck not found on PATH — "
"diagnostics will be empty until shellcheck is installed "
"(apt: shellcheck, brew: shellcheck, scoop: shellcheck)."
)
return SpawnSpec(
command=[bin_path, "start"],
workspace_root=root,
@@ -370,7 +355,7 @@ def _spawn_bash_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_yaml_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "yaml-language-server") or _which("yaml-language-server")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("yaml-language-server", ctx.install_strategy)
if bin_path is None:
return None
@@ -386,7 +371,7 @@ def _spawn_yaml_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_lua_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "lua-language-server") or _which("lua-language-server")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("lua-language-server", ctx.install_strategy)
if bin_path is None:
return None
@@ -402,7 +387,7 @@ def _spawn_lua_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_intelephense(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "intelephense") or _which("intelephense")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("intelephense", ctx.install_strategy)
if bin_path is None:
return None
@@ -433,7 +418,7 @@ def _spawn_ocamllsp(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
def _spawn_dockerfile_ls(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
bin_path = _resolve_override(ctx, "dockerfile-ls") or _which("docker-langserver")
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("dockerfile-language-server-nodejs", ctx.install_strategy)
if bin_path is None:
return None
@@ -627,7 +612,7 @@ def _spawn_vue(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
"vue-language-server"
)
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("@vue/language-server", ctx.install_strategy)
if bin_path is None:
return None
@@ -645,7 +630,7 @@ def _spawn_svelte(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
"svelteserver", "svelte-language-server"
)
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("svelte-language-server", ctx.install_strategy)
if bin_path is None:
return None
@@ -663,7 +648,7 @@ def _spawn_astro(root: str, ctx: ServerContext) -> Optional[SpawnSpec]:
"astro-ls", "astro-language-server"
)
if bin_path is None:
from agent.lsp.install import try_install
from plugins.lsp.install import try_install
bin_path = try_install("@astrojs/language-server", ctx.install_strategy)
if bin_path is None:
return None
@@ -8,7 +8,6 @@ xiaomi = ProviderProfile(
aliases=("mimo", "xiaomi-mimo"),
env_vars=("XIAOMI_API_KEY",),
base_url="https://api.xiaomimimo.com/v1",
supports_health_check=False, # /v1/models returns 401 even with valid key
)
register_provider(xiaomi)
+1 -1
View File
@@ -959,7 +959,7 @@ class LineAdapter(BasePlatformAdapter):
if chat_type == "dm" and self._client:
asyncio.create_task(self._client.loading(chat_id))
source_obj = self.build_source(
source_obj = self.create_source(
chat_id=chat_id,
chat_type=chat_type,
user_id=user_id,
-1
View File
@@ -40,7 +40,6 @@ class ProviderProfile:
base_url: str = ""
models_url: str = "" # explicit models endpoint; falls back to {base_url}/models
auth_type: str = "api_key" # api_key|oauth_device_code|oauth_external|copilot|aws_sdk
supports_health_check: bool = True # False → doctor skips /models probe for this provider
# ── Model catalog ─────────────────────────────────────────
# fallback_models: curated list shown in /model picker when live fetch fails.
+43 -25
View File
@@ -136,12 +136,25 @@ termux = [
"hermes-agent[acp]",
]
termux-all = [
# Best-effort "install all" profile for Termux. Same policy as [all]:
# only includes extras that aren't covered by `tools/lazy_deps.py`.
# Backends like telegram/slack/dingtalk/feishu/honcho lazy-install at
# first use, so they're no longer eager-installed here.
# Best-effort "install all" profile for Termux: include broad extras that
# are known to resolve on Android, while intentionally excluding extras that
# currently hard-fail from missing/broken Android wheels/toolchains.
#
# Excluded for now:
# - matrix (mautrix[encryption] -> python-olm build failures on Termux)
# - voice (faster-whisper chain requires ctranslate2/av builds not packaged)
"hermes-agent[termux]",
"hermes-agent[messaging]",
"hermes-agent[slack]",
"hermes-agent[tts-premium]",
"hermes-agent[dingtalk]",
"hermes-agent[feishu]",
"hermes-agent[google]",
# mistral: omitted from broad termux-all profile — `mistralai` PyPI package
# is currently quarantined (malicious 2.4.6 release). Users who explicitly
# want Voxtral STT/TTS can still `pip install hermes-agent[mistral]`
# directly once PyPI un-quarantines.
"hermes-agent[bedrock]",
"hermes-agent[homeassistant]",
"hermes-agent[sms]",
"hermes-agent[web]",
@@ -175,36 +188,41 @@ rl = [
]
yc-bench = ["yc-bench @ git+https://github.com/collinear-ai/yc-bench.git@bfb0c88062450f46341bd9a5298903fc2e952a5c ; python_version >= '3.12'"]
all = [
# Policy (2026-05-12): `[all]` includes only extras that genuinely
# CAN'T be lazy-installed via `tools/lazy_deps.py` — i.e. things every
# session can use, things needed before the agent loop is alive
# (terminal/CLI), and skill deps that packagers (Nix, AUR, Homebrew)
# need in the wheel. Anything an opt-in backend (provider, search,
# TTS, image, memory, messaging platform, terminal sandbox) needs
# MUST live exclusively in `LAZY_DEPS` and resolve at first use —
# otherwise one quarantined PyPI release breaks every fresh install.
#
# Removed from [all] on 2026-05-12 (covered by lazy-install):
# anthropic, exa, firecrawl, parallel-web, fal, edge-tts,
# modal, daytona, vercel, messaging (telegram/discord/slack),
# matrix, slack, honcho, voice (faster-whisper),
# dingtalk, feishu, bedrock, tts-premium (elevenlabs)
#
# Why: the matrix extra in particular pulls `mautrix[encryption]`
# which depends on `python-olm`. python-olm has Linux-only wheels and
# no native build path on Windows or modern macOS. With matrix in
# [all], `uv sync --locked` on Windows tried to build it from sdist
# and failed on `make`. Lazy-install routes that build to first use,
# where the user is expected to have a toolchain available.
"hermes-agent[anthropic]",
"hermes-agent[exa]",
"hermes-agent[firecrawl]",
"hermes-agent[parallel-web]",
"hermes-agent[fal]",
"hermes-agent[edge-tts]",
"hermes-agent[modal]",
"hermes-agent[daytona]",
"hermes-agent[vercel]",
"hermes-agent[messaging]",
# matrix: python-olm (required by matrix-nio[e2e]) is upstream-broken on
# modern macOS (archived libolm, C++ errors with Clang 21+). On Linux the
# [matrix] extra's own marker pulls in the [e2e] variant automatically.
"hermes-agent[matrix]; sys_platform == 'linux'",
"hermes-agent[cron]",
"hermes-agent[cli]",
"hermes-agent[dev]",
"hermes-agent[tts-premium]",
"hermes-agent[slack]",
"hermes-agent[pty]",
"hermes-agent[honcho]",
"hermes-agent[mcp]",
"hermes-agent[homeassistant]",
"hermes-agent[sms]",
"hermes-agent[acp]",
"hermes-agent[voice]",
"hermes-agent[dingtalk]",
"hermes-agent[feishu]",
"hermes-agent[google]",
# mistral: omitted from [all] — `mistralai` PyPI package is currently
# quarantined (malicious 2.4.6 release on 2026-05-12). Pulling it from
# [all] would break every fresh install / AUR build / Docker build / CI
# run until PyPI un-quarantines. Users who explicitly want Voxtral STT/TTS
# can still `pip install hermes-agent[mistral]` once it's available again.
"hermes-agent[bedrock]",
"hermes-agent[web]",
"hermes-agent[youtube]",
]
+7 -233
View File
@@ -347,10 +347,6 @@ _PARALLEL_SAFE_TOOLS = frozenset({
# File tools can run concurrently when they target independent paths.
_PATH_SCOPED_TOOLS = frozenset({"read_file", "write_file", "patch"})
# Tools that mutate files on disk. Used by the per-turn verifier that
# surfaces silently-failed file edits so the model can't over-claim success.
_FILE_MUTATING_TOOLS = frozenset({"write_file", "patch"})
# Maximum number of concurrent worker threads for parallel tool execution.
_MAX_TOOL_WORKERS = 8
@@ -528,68 +524,6 @@ def _append_subdir_hint_to_multimodal(value: Dict[str, Any], hint: str) -> None:
value["text_summary"] = value["text_summary"] + hint
def _extract_file_mutation_targets(tool_name: str, args: Dict[str, Any]) -> List[str]:
"""Return the file paths a ``write_file`` or ``patch`` call is targeting.
For ``write_file`` and ``patch`` in replace mode this is just ``args["path"]``.
For ``patch`` in V4A patch mode we parse the patch content for
``*** Update File:`` / ``*** Add File:`` / ``*** Delete File:`` headers so
the verifier can track each file in a multi-file patch separately.
"""
if tool_name not in _FILE_MUTATING_TOOLS:
return []
if tool_name == "write_file":
p = args.get("path")
return [str(p)] if p else []
# tool_name == "patch"
mode = args.get("mode") or "replace"
if mode == "replace":
p = args.get("path")
return [str(p)] if p else []
if mode == "patch":
body = args.get("patch") or ""
if not isinstance(body, str) or not body:
return []
import re as _re
paths: List[str] = []
for _m in _re.finditer(
r'^\*\*\*\s+(?:Update|Add|Delete)\s+File:\s*(.+)$',
body,
_re.MULTILINE,
):
p = _m.group(1).strip()
if p:
paths.append(p)
return paths
return []
def _extract_error_preview(result: Any, max_len: int = 180) -> str:
"""Pull a one-line error summary out of a tool result for footer display."""
text = _multimodal_text_summary(result) if result is not None else ""
if not isinstance(text, str):
try:
text = str(text)
except Exception:
return ""
# Try to parse JSON and pull the ``error`` field — tool handlers return
# ``{"success": false, "error": "..."}``; raw string wins if parse fails.
stripped = text.strip()
if stripped.startswith("{"):
try:
import json as _json
data = _json.loads(stripped)
if isinstance(data, dict) and isinstance(data.get("error"), str):
text = data["error"]
except Exception:
pass
# Collapse whitespace, trim to max_len.
text = " ".join(text.split())
if len(text) > max_len:
text = text[: max_len - 1] + ""
return text
def _trajectory_normalize_msg(msg: Dict[str, Any]) -> Dict[str, Any]:
"""Strip image blobs from a message for trajectory saving.
@@ -3619,19 +3553,12 @@ class AIAgent:
is_claude = "claude" in model_lower
is_nous_portal = "nousresearch" in eff_base_url.lower()
# Nous Portal Claude rides the 1h prefix_and_2 layout (Portal
# proxies to OpenRouter, which honours ttl=1h on Anthropic
# routes). Qwen does NOT — Alibaba DashScope (the upstream for
# all Qwen routes, including Portal -> OpenRouter -> Alibaba)
# documents a single ``ephemeral`` TTL of 5 minutes; ttl="1h"
# on Qwen markers is silently ignored upstream, so the
# high-value tools[-1] + system-prefix breakpoints never land
# and only the 5m rolling-window markers on the last 2 messages
# get cached. Portal Qwen still gets cache_control via
# _anthropic_prompt_cache_policy returning (True, False) — it
# just rides the standard system_and_3 5m layout instead of the
# mismatched prefix_and_2 1h layout.
if is_nous_portal and is_claude:
# Nous Portal: Claude AND Qwen both get long-lived caching.
# Portal proxies to OpenRouter with identical cache_control
# semantics; any model on Portal that accepts envelope-layout
# markers via _anthropic_prompt_cache_policy also benefits from
# the documented 1h cross-session TTL.
if is_nous_portal and (is_claude or "qwen" in model_lower):
return True
if not is_claude:
@@ -5419,103 +5346,6 @@ class AIAgent:
self._pending_steer = None
return text
def _record_file_mutation_result(
self,
tool_name: str,
args: Dict[str, Any],
result: Any,
is_error: bool,
) -> None:
"""Record a ``write_file`` / ``patch`` outcome for the turn-end verifier.
On failure, store ``{path: {error_preview, tool}}`` entries. On
success, remove any prior failure entries for the same paths (the
model recovered within the turn). Silently no-ops if the per-turn
state dict hasn't been initialised yet (e.g. a tool dispatched
outside ``run_conversation``).
"""
if tool_name not in _FILE_MUTATING_TOOLS:
return
state = getattr(self, "_turn_failed_file_mutations", None)
if state is None:
return
targets = _extract_file_mutation_targets(tool_name, args)
if not targets:
return
if is_error:
preview = _extract_error_preview(result)
for path in targets:
# Keep the FIRST error we saw for a given path unless we
# later see success. A repeated failure with a different
# message shouldn't silently overwrite the original.
if path not in state:
state[path] = {
"tool": tool_name,
"error_preview": preview,
}
else:
for path in targets:
state.pop(path, None)
def _file_mutation_verifier_enabled(self) -> bool:
"""Check whether the per-turn file-mutation verifier footer is on.
Config path: ``display.file_mutation_verifier`` (bool, default True).
``HERMES_FILE_MUTATION_VERIFIER`` env var overrides config. Exposed
as a method so tests can patch a single seam without reaching into
the private ``_turn_failed_file_mutations`` state dict.
"""
try:
import os as _os
env = _os.environ.get("HERMES_FILE_MUTATION_VERIFIER")
if env is not None:
return env.strip().lower() not in ("0", "false", "no", "off")
# Read from the persisted config.yaml so gateway and CLI share
# the same setting. Import lazily to avoid a startup-time cycle.
try:
from hermes_cli.config import load_config as _load_config
_cfg = _load_config() or {}
except Exception:
_cfg = {}
_display = _cfg.get("display") if isinstance(_cfg, dict) else None
if isinstance(_display, dict) and "file_mutation_verifier" in _display:
return bool(_display.get("file_mutation_verifier"))
except Exception:
pass
return True # safe default: verifier on
@staticmethod
def _format_file_mutation_failure_footer(failed: Dict[str, Dict[str, Any]]) -> str:
"""Render the per-turn failed-mutation dict as a user-facing footer.
Displays up to 10 paths with their first error preview, then a
count of any additional failures. Returns an empty string when
the dict is empty so callers can concatenate unconditionally.
"""
if not failed:
return ""
lines = [
"⚠️ File-mutation verifier: "
f"{len(failed)} file(s) were NOT modified this turn despite any "
"wording above that may suggest otherwise. Run `git status` or "
"`read_file` to confirm."
]
shown = 0
for path, info in failed.items():
if shown >= 10:
break
preview = (info.get("error_preview") or "").strip()
tool = info.get("tool") or "patch"
if preview:
lines.append(f"{path} — [{tool}] {preview}")
else:
lines.append(f"{path} — [{tool}] failed")
shown += 1
remaining = len(failed) - shown
if remaining > 0:
lines.append(f" • … and {remaining} more")
return "\n".join(lines)
def _apply_pending_steer_to_tool_results(self, messages: list, num_tool_msgs: int) -> None:
"""Append any pending /steer text to the last tool result in this turn.
@@ -11042,17 +10872,6 @@ class AIAgent:
result_preview = _err_text[:200] if len(_err_text) > 200 else _err_text
logger.warning("Tool %s returned error (%.2fs): %s", function_name, tool_duration, result_preview)
# Track file-mutation outcome for the turn-end verifier.
# `blocked` calls never actually ran — don't let a guardrail
# block count as either a failure or a success.
if not blocked:
try:
self._record_file_mutation_result(
function_name, function_args, function_result, is_error,
)
except Exception as _ver_err:
logging.debug("file-mutation verifier record failed: %s", _ver_err)
if not blocked and self.tool_progress_callback:
try:
self.tool_progress_callback(
@@ -11479,18 +11298,6 @@ class AIAgent:
else:
logger.info("tool %s completed (%.2fs, %d chars)", function_name, tool_duration, _result_len)
# Track file-mutation outcome for the turn-end verifier. See
# the concurrent path for the rationale; both paths must feed
# the same state so the footer reflects every tool call in the
# turn, not just the parallel ones.
if not _execution_blocked:
try:
self._record_file_mutation_result(
function_name, function_args, function_result, _is_error_result,
)
except Exception as _ver_err:
logging.debug("file-mutation verifier record failed: %s", _ver_err)
if not _execution_blocked and self.tool_progress_callback:
try:
self.tool_progress_callback(
@@ -12188,14 +11995,6 @@ class AIAgent:
truncated_response_prefix = ""
compression_attempts = 0
_turn_exit_reason = "unknown" # Diagnostic: why the loop ended
# Per-turn file-mutation verifier state. Keyed by resolved path;
# each failed ``write_file`` / ``patch`` call records the error
# preview. Later successful writes to the same path remove the
# entry (the model recovered). At end-of-turn, any entries still
# present are surfaced in an advisory footer so the model cannot
# over-claim success while the file is actually unchanged on disk.
self._turn_failed_file_mutations: Dict[str, Dict[str, Any]] = {}
# Record the execution thread so interrupt()/clear_interrupt() can
# scope the tool-level interrupt signal to THIS agent's thread only.
@@ -14449,7 +14248,7 @@ class AIAgent:
_ra_raw = _resp_headers.get("retry-after") or _resp_headers.get("Retry-After")
if _ra_raw:
try:
_retry_after = min(float(_ra_raw), 120) # Cap at 2 minutes
_retry_after = min(int(_ra_raw), 120) # Cap at 2 minutes
except (TypeError, ValueError):
pass
wait_time = _retry_after if _retry_after else jittered_backoff(retry_count, base_delay=2.0, max_delay=60.0)
@@ -15511,31 +15310,6 @@ class AIAgent:
else:
logger.info(_diag_msg, *_diag_args)
# File-mutation verifier footer.
# If one or more ``write_file`` / ``patch`` calls failed during this
# turn and were never superseded by a successful write to the same
# path, append an advisory footer to the assistant response. This
# catches the specific case — reported by Ben Eng (#15524-adjacent)
# — where a model issues a batch of parallel patches, half of them
# fail with "Could not find old_string", and the model summarises
# the turn claiming every file was edited. The user then has to
# manually run ``git status`` to catch the lie. With this footer
# the truth is surfaced on every turn, so over-claiming is
# structurally impossible past the model.
#
# Gate: only applied when a real text response exists for this
# turn and the user didn't interrupt. Empty/interrupted turns
# already have other surface text that shouldn't be augmented.
if final_response and not interrupted:
try:
_failed = getattr(self, "_turn_failed_file_mutations", None) or {}
if _failed and self._file_mutation_verifier_enabled():
footer = self._format_file_mutation_failure_footer(_failed)
if footer:
final_response = final_response.rstrip() + "\n\n" + footer
except Exception as _ver_err:
logger.debug("file-mutation verifier footer failed: %s", _ver_err)
# Plugin hook: transform_llm_output
# Fired once per turn after the tool-calling loop completes.
# Plugins can transform the LLM's output text before it's returned.
+36 -49
View File
@@ -806,14 +806,7 @@ function Install-Dependencies {
# current extras spec, NOT because they're equivalent in posture.
if (Test-Path "uv.lock") {
Write-Info "Trying tier: hash-verified (uv.lock) ..."
# Critical flag choice: `--extra all`, NOT `--all-extras`.
# --all-extras = every [project.optional-dependencies] key,
# bypassing the curated [all] extra. On Windows
# that means [matrix] -> python-olm (no wheel,
# needs `make` to build from sdist) and the
# install fails.
# --extra all = just the [all] extra's contents (curated).
& $UvCmd sync --extra all --locked
& $UvCmd sync --all-extras --locked
if ($LASTEXITCODE -eq 0) {
Write-Success "Main package installed (hash-verified via uv.lock)"
$script:InstalledTier = "hash-verified (uv.lock)"
@@ -829,59 +822,53 @@ function Install-Dependencies {
$skipPipFallback = $false
}
# Install main package. Tiered fallback so a single flaky transitive
# doesn't silently drop everything. Each tier's stdout/stderr is
# Install main package. Tiered fallback so a single flaky git+https dep
# (atroposlib / tinker in the [rl] extra) doesn't silently drop
# dashboard/MCP/cron/messaging extras. Each tier's stdout/stderr is
# preserved — no Out-Null swallowing — so the user can see what failed.
#
# Tier 1: [all] — the curated extra in pyproject.toml.
# Tier 2: [all] minus the currently-broken extras list ($brokenExtras).
# Edit $brokenExtras below when something on PyPI breaks; this
# lets users keep the rest of [all] when one transitive is
# unavailable. The list of [all]'s contents is parsed from
# pyproject.toml at runtime — there is NO hand-mirrored copy
# to drift out of sync.
# Tier 3: bare `.` — last-resort so at least the core CLI launches.
# Tier 1: [all] — everything, including RL git+https deps (best case).
# Tier 2: [all] minus a small list of currently-broken extras. The
# broken list is centralised in $brokenExtras below when
# a package gets quarantined / yanked / pulled, add it here
# and the resolver no longer chokes on it. This is what saves
# the user from silently losing 10+ unrelated extras every
# time one upstream package breaks.
# Tier 3: [core-extras] synthesised locally — all PyPI-only extras we
# ship, also minus $brokenExtras. Drops [rl] and [matrix]
# (linux-only) which are the usual failure culprits.
# Tier 4: [web,mcp,cron,cli,messaging,dev] — the minimum we strongly
# believe a user expects `hermes dashboard` / slash commands /
# cron / messaging platforms to work out of the box.
# Tier 5: bare `.` — last-resort so at least the core CLI launches.
# Currently-broken extras. Edit this list when an upstream package
# gets quarantined / yanked / breaks resolution. Empty means everything
# in [all] should be installable; populate with the names of extras
# whose deps are temporarily unavailable.
# whose deps are temporarily unavailable to keep installs working
# for users.
$brokenExtras = @()
# Parse [project.optional-dependencies].all from pyproject.toml.
# tomllib is stdlib on Python 3.11+ which the bootstrap guarantees.
$pythonExeForParse = if (-not $NoVenv) { "$InstallDir\venv\Scripts\python.exe" } else { (& $UvCmd python find $PythonVersion) }
$allExtras = @()
if (Test-Path $pythonExeForParse) {
$parsed = & $pythonExeForParse -c @"
import re, sys, tomllib
try:
with open('pyproject.toml', 'rb') as fh:
data = tomllib.load(fh)
specs = data['project']['optional-dependencies']['all']
out = []
for s in specs:
m = re.search(r'hermes-agent\[([\w-]+)\]', s)
if m: out.append(m.group(1))
print(','.join(out))
except Exception:
sys.exit(1)
"@ 2>$null
if ($LASTEXITCODE -eq 0 -and $parsed) {
$allExtras = $parsed.Trim().Split(',')
}
}
if (-not $allExtras -or $allExtras.Count -eq 0) {
Write-Warn "Could not parse [all] from pyproject.toml; Tier 2 will be a no-op."
$safeAll = "all"
} else {
$safeAll = ($allExtras | Where-Object { $brokenExtras -notcontains $_ }) -join ","
}
$allExtras = @(
"modal","daytona","vercel","messaging","matrix","cron","cli","dev",
"tts-premium","slack","pty","honcho","mcp","homeassistant","sms",
"acp","voice","dingtalk","feishu","google","bedrock","web",
"youtube"
)
$pypiExtras = @(
"web","mcp","cron","cli","voice","messaging","slack","dev","acp",
"pty","homeassistant","sms","tts-premium","honcho","google",
"bedrock","dingtalk","feishu","modal","daytona","vercel","youtube"
)
$safeAll = ($allExtras | Where-Object { $brokenExtras -notcontains $_ }) -join ","
$safePypi = ($pypiExtras | Where-Object { $brokenExtras -notcontains $_ }) -join ","
$brokenLabel = if ($brokenExtras) { ($brokenExtras -join ", ") } else { "none" }
$installTiers = @(
@{ Name = "all"; Spec = ".[all]" },
@{ Name = "all (with RL/matrix extras)"; Spec = ".[all]" },
@{ Name = "all minus known-broken ($brokenLabel)"; Spec = ".[$safeAll]" },
@{ Name = "PyPI-only extras (no git deps)"; Spec = ".[$safePypi]" },
@{ Name = "dashboard + core platforms"; Spec = ".[web,mcp,cron,cli,messaging,dev]" },
@{ Name = "core only (no extras)"; Spec = "." }
)
$installed = $skipPipFallback
+57 -111
View File
@@ -366,27 +366,7 @@ install_uv() {
# Install uv
log_info "Installing uv (fast Python package manager)..."
# Capture installer output so a failure shows the user WHY (network,
# glibc mismatch on old distros, missing curl, ~/.local/bin not
# writable, disk full, corp proxy / TLS interception, etc.) instead
# of the previous "✗ Failed to install uv" with zero diagnostic.
#
# Two-stage: download the installer, then run it. Piping
# `curl | sh` masks curl failures (sh exits 0 on empty stdin)
# and conflates network errors with installer errors.
local _uv_install_log _uv_installer
_uv_install_log="$(mktemp 2>/dev/null || echo "/tmp/hermes-uv-install.$$.log")"
_uv_installer="$(mktemp 2>/dev/null || echo "/tmp/hermes-uv-installer.$$.sh")"
if ! curl -LsSf https://astral.sh/uv/install.sh -o "$_uv_installer" 2>"$_uv_install_log"; then
log_error "Failed to download uv installer from https://astral.sh/uv/install.sh"
log_info "curl output:"
sed 's/^/ /' "$_uv_install_log" >&2
log_info "Install manually: https://docs.astral.sh/uv/getting-started/installation/"
rm -f "$_uv_install_log" "$_uv_installer"
exit 1
fi
if sh "$_uv_installer" >>"$_uv_install_log" 2>&1; then
rm -f "$_uv_installer"
if curl -LsSf https://astral.sh/uv/install.sh | sh 2>/dev/null; then
# uv installs to ~/.local/bin by default
if [ -x "$HOME/.local/bin/uv" ]; then
UV_CMD="$HOME/.local/bin/uv"
@@ -395,22 +375,15 @@ install_uv() {
elif command -v uv &> /dev/null; then
UV_CMD="uv"
else
log_error "uv installer reported success but binary not found on PATH"
log_info "Installer output:"
sed 's/^/ /' "$_uv_install_log" >&2
log_error "uv installed but not found on PATH"
log_info "Try adding ~/.local/bin to your PATH and re-running"
rm -f "$_uv_install_log"
exit 1
fi
rm -f "$_uv_install_log"
UV_VERSION=$($UV_CMD --version 2>/dev/null)
log_success "uv installed ($UV_VERSION)"
else
log_error "Failed to install uv"
log_info "Installer output:"
sed 's/^/ /' "$_uv_install_log" >&2
log_info "Install manually: https://docs.astral.sh/uv/getting-started/installation/"
rm -f "$_uv_install_log" "$_uv_installer"
exit 1
fi
}
@@ -890,7 +863,7 @@ clone_repo() {
stash_name="hermes-install-autostash-$(date -u +%Y%m%d-%H%M%S)"
log_info "Local changes detected, stashing before update..."
git stash push --include-untracked -m "$stash_name"
autostash_ref="stash@{0}"
autostash_ref="$(git rev-parse --verify refs/stash)"
fi
git fetch origin
@@ -1100,35 +1073,12 @@ install_deps() {
# extras spec, NOT because they're equivalent in posture.
if [ -f "uv.lock" ]; then
log_info "Trying tier: hash-verified (uv.lock) ..."
log_info "(this resolves + downloads the curated [all] set — first run on a"
log_info " fresh venv can take 1-5 minutes; uv prints progress below)"
# Stream uv's progress directly to the user instead of swallowing
# it with `2>"$(mktemp)"`. Two reasons:
# 1. `--extra all --locked` against a fresh venv has to pull
# every transitive — silencing stderr makes the install
# look frozen for minutes on slow networks. Users see
# "Trying tier: hash-verified ..." and assume it's hung.
# 2. The previous `2>"$(mktemp)"` substituted the path at
# command-build time but never saved it, so on failure the
# uv error message was unreachable — the user just got the
# generic "lockfile may be stale" warning.
#
# Critical flag choice: `--extra all`, NOT `--all-extras`.
# --all-extras = every [project.optional-dependencies] key.
# This bypasses the curated `[all]` extra
# entirely and pulls e.g. [matrix] (which
# needs python-olm + make on Windows) and
# [rl] (git+https deps that fail offline).
# --extra all = install just the `[all]` extra's contents.
# This respects the curation in pyproject.toml.
# uv's own progress UI handles TTY detection and downgrades
# gracefully when stdout/stderr aren't terminals.
if UV_PROJECT_ENVIRONMENT="$INSTALL_DIR/venv" $UV_CMD sync --extra all --locked; then
if UV_PROJECT_ENVIRONMENT="$INSTALL_DIR/venv" $UV_CMD sync --all-extras --locked 2>"$(mktemp)"; then
log_success "Main package installed (hash-verified via uv.lock)"
log_success "All dependencies installed"
return 0
fi
log_warn "uv.lock sync failed (see uv output above), falling back to PyPI resolve..."
log_warn "uv.lock sync failed (lockfile may be stale), falling back to PyPI resolve..."
else
log_info "uv.lock not found — falling back to PyPI resolve (no hash verification)"
fi
@@ -1139,63 +1089,57 @@ install_deps() {
# fresh install all the way down to "core only" — the user should keep
# everything else they signed up for.
#
# Tier 1: [all] — the curated extra in pyproject.toml.
# Tier 2: [all] minus the currently-broken extras list (_BROKEN_EXTRAS).
# Edit _BROKEN_EXTRAS below when something on PyPI breaks; this
# lets users keep the rest of [all] when one transitive is
# unavailable. The list of [all]'s contents is parsed from
# pyproject.toml at runtime — there is NO hand-mirrored copy
# to drift out of sync. If you want to change what [all]
# contains, edit pyproject.toml only.
# Tier 3: bare `.` — last-resort so at least the core CLI launches.
# Skipped tiers like "PyPI-only extras (no git deps)" used to
# exist to dodge [rl] / [matrix] git+sdist deps; those are no
# longer in [all] post-2026-05-12 lazy-install migration, so
# a separate PyPI-only tier had no remaining content.
# Tier 1: [all] — everything, including RL git+https deps (best case).
# Tier 2: [all] minus the currently-broken extras list. Edit
# _BROKEN_EXTRAS below when something on PyPI breaks; this lets
# users keep voice/honcho/google/slack/matrix/etc. even when
# one transitive is unavailable. List the extras here as bare
# names from pyproject.toml [project.optional-dependencies] —
# the script translates them to `[a,b,c]` form below.
# Tier 3: PyPI-only extras (no git deps) — drops [rl] / [yc-bench]
# which are git+https and may fail in restricted networks.
# Tier 4: dashboard + core platforms — minimum viable interactive set.
# Tier 5: bare `.` — last-resort so at least the core CLI launches.
#
# Each tier's stderr is captured to a tempfile so we can show the user
# WHY the higher tier failed instead of silently dropping support.
local _BROKEN_EXTRAS=() # populate when an extra becomes unresolvable
# Parse [project.optional-dependencies].all from pyproject.toml.
# tomllib is stdlib on Python 3.11+ which uv's bootstrap guarantees.
# Falls back to a hand list if parse fails — defensive only.
local _ALL_EXTRAS_CSV
_ALL_EXTRAS_CSV="$(
"$PYTHON_PATH" - <<'PY' 2>/dev/null
import re, sys, tomllib
try:
with open("pyproject.toml", "rb") as fh:
data = tomllib.load(fh)
specs = data["project"]["optional-dependencies"]["all"]
extras = []
for s in specs:
m = re.search(r"hermes-agent\[([\w-]+)\]", s)
if m:
extras.append(m.group(1))
print(",".join(extras))
except Exception as e:
print("", file=sys.stderr)
sys.exit(1)
PY
)"
if [ -z "$_ALL_EXTRAS_CSV" ]; then
log_warn "Could not parse [all] from pyproject.toml; falling back to .[all] only."
_ALL_EXTRAS_CSV=""
fi
# Build "[all] minus broken" spec by filtering the parsed list.
local _SAFE_SPEC=".[all]"
if [ -n "$_ALL_EXTRAS_CSV" ] && [ "${#_BROKEN_EXTRAS[@]}" -gt 0 ]; then
local _SAFE_EXTRAS=()
local _e _b _skip
IFS=',' read -ra _ALL_EXTRAS_ARR <<< "$_ALL_EXTRAS_CSV"
for _e in "${_ALL_EXTRAS_ARR[@]}"; do
_skip=false
for _b in "${_BROKEN_EXTRAS[@]}"; do
if [ "$_e" = "$_b" ]; then _skip=true; break; fi
done
if [ "$_skip" = false ]; then _SAFE_EXTRAS+=("$_e"); fi
local _ALL_EXTRAS=(
modal daytona vercel messaging matrix cron cli dev tts-premium slack
pty honcho mcp homeassistant sms acp voice dingtalk feishu google
bedrock web youtube
)
# Tier 2: all extras minus _BROKEN_EXTRAS
local _SAFE_EXTRAS=()
local _e _b _skip
for _e in "${_ALL_EXTRAS[@]}"; do
_skip=false
for _b in "${_BROKEN_EXTRAS[@]}"; do
if [ "$_e" = "$_b" ]; then _skip=true; break; fi
done
_SAFE_SPEC=".[$(IFS=,; echo "${_SAFE_EXTRAS[*]}")]"
fi
if [ "$_skip" = false ]; then _SAFE_EXTRAS+=("$_e"); fi
done
local _SAFE_SPEC
_SAFE_SPEC=".[$(IFS=,; echo "${_SAFE_EXTRAS[*]}")]"
# Tier 3: PyPI-only extras (no git deps), still skipping broken ones.
# Mirrors the install.ps1 list but excludes [rl] / [yc-bench] / [matrix]
# (matrix needs python-olm which fails to build on some hosts).
local _PYPI_EXTRAS=(
web mcp cron cli voice messaging slack dev acp pty homeassistant sms
tts-premium honcho google bedrock dingtalk feishu modal daytona vercel
youtube
)
local _PYPI_SAFE=()
for _e in "${_PYPI_EXTRAS[@]}"; do
_skip=false
for _b in "${_BROKEN_EXTRAS[@]}"; do
if [ "$_e" = "$_b" ]; then _skip=true; break; fi
done
if [ "$_skip" = false ]; then _PYPI_SAFE+=("$_e"); fi
done
local _PYPI_SPEC
_PYPI_SPEC=".[$(IFS=,; echo "${_PYPI_SAFE[*]}")]"
local _TIER4_SPEC=".[web,mcp,cron,cli,messaging,dev]"
ALL_INSTALL_LOG=$(mktemp)
local _installed=false
@@ -1215,8 +1159,10 @@ PY
return 1
}
install_tier "all" ".[all]" \
install_tier "all (with RL/matrix extras)" ".[all]" \
|| install_tier "all minus known-broken (${_BROKEN_EXTRAS[*]:-none})" "$_SAFE_SPEC" \
|| install_tier "PyPI-only extras (no git deps)" "$_PYPI_SPEC" \
|| install_tier "dashboard + core platforms" "$_TIER4_SPEC" \
|| install_tier "core only (no extras)" "."
rm -f "$ALL_INSTALL_LOG"
-19
View File
@@ -53,15 +53,12 @@ AUTHOR_MAP = {
"421774554@qq.com": "wuli666",
"harish.kukreja@gmail.com": "counterposition",
"1046611633@qq.com": "zhengyn0001",
"db@project-aeon.com": "db-aeon",
"ahmed@abadr.net": "ahmedbadr3",
"cleo@edaphic.xyz": "curiouscleo",
"hirokazu.ogawa@kwansei.ac.jp": "hrkzogw",
"datapod.k@gmail.com": "dandacompany",
"treydong.zh@gmail.com": "TreyDong",
"kyanam.preetham@gmail.com": "pkyanam",
"127238744+teknium1@users.noreply.github.com": "teknium1",
"147827411+EloquentBrush@users.noreply.github.com": "AhmetArif0",
"hugosequier@gmail.com": "Hugo-SEQUIER",
"128259593+Gutslabs@users.noreply.github.com": "Gutslabs",
"50326054+nocturnum91@users.noreply.github.com": "nocturnum91",
@@ -140,22 +137,6 @@ AUTHOR_MAP = {
"tangyuanjc@JCdeAIfenshendeMac-mini.local": "tangyuanjc",
"leon@agentlinker.ai": "agentlinker",
"santoshhumagain1887@gmail.com": "npmisantosh",
"39641663+luarss@users.noreply.github.com": "luarss",
"16263913+zccyman@users.noreply.github.com": "zccyman",
"ahmetosrak@Ahmet-MacBook-Air.local": "Osraka",
"98612432+Osraka@users.noreply.github.com": "Osraka",
"112634774+ryptotalent@users.noreply.github.com": "ryptotalent",
"270097726+hookinglau@users.noreply.github.com": "hookinglau",
"5029547+AllynSheep@users.noreply.github.com": "AllynSheep",
"allyn0306@gmail.com": "AllynSheep",
"46887634+aqilaziz@users.noreply.github.com": "aqilaziz",
"gonzes7@gmail.com": "aqilaziz",
"6966326+laoli-no1@users.noreply.github.com": "laoli-no1",
"laoli_no1@163.com": "laoli-no1",
"39730900+NorethSea@users.noreply.github.com": "NorethSea",
"963979204@qq.com": "NorethSea",
"2283389+JamesX88@users.noreply.github.com": "JamesX88",
"JamesX88@users.noreply.github.com": "JamesX88",
"novax635@gmail.com": "novax635",
"krionex1@gmail.com": "Krionex",
"rxdxxxx@users.noreply.github.com": "rxdxxxx",
+9 -38
View File
@@ -82,22 +82,7 @@ else
echo -e "${GREEN}${NC} uv found ($UV_VERSION)"
else
echo -e "${CYAN}${NC} Installing uv..."
# Capture installer output so a failure shows the user WHY
# (network, glibc mismatch on old distros, missing curl, disk
# full, etc.) instead of "✗ Failed to install uv" with zero
# diagnostic. Two-stage to avoid `curl | sh` masking curl
# failures (sh exits 0 on empty stdin under no pipefail).
_uv_log="$(mktemp 2>/dev/null || echo "/tmp/hermes-uv-install.$$.log")"
_uv_installer="$(mktemp 2>/dev/null || echo "/tmp/hermes-uv-installer.$$.sh")"
if ! curl -LsSf https://astral.sh/uv/install.sh -o "$_uv_installer" 2>"$_uv_log"; then
echo -e "${RED}${NC} Failed to download uv installer."
sed 's/^/ /' "$_uv_log" >&2
echo -e "${CYAN}${NC} Install manually: https://docs.astral.sh/uv/"
rm -f "$_uv_log" "$_uv_installer"
exit 1
fi
if sh "$_uv_installer" >>"$_uv_log" 2>&1; then
rm -f "$_uv_installer"
if curl -LsSf https://astral.sh/uv/install.sh | sh 2>/dev/null; then
if [ -x "$HOME/.local/bin/uv" ]; then
UV_CMD="$HOME/.local/bin/uv"
elif [ -x "$HOME/.cargo/bin/uv" ]; then
@@ -105,22 +90,14 @@ else
fi
if [ -n "$UV_CMD" ]; then
rm -f "$_uv_log"
UV_VERSION=$($UV_CMD --version 2>/dev/null)
echo -e "${GREEN}${NC} uv installed ($UV_VERSION)"
else
echo -e "${RED}${NC} uv installer reported success but binary not found. Add ~/.local/bin to PATH and retry."
echo -e "${CYAN}${NC} Installer output:"
sed 's/^/ /' "$_uv_log" >&2
rm -f "$_uv_log"
echo -e "${RED}${NC} uv installed but not found. Add ~/.local/bin to PATH and retry."
exit 1
fi
else
echo -e "${RED}${NC} Failed to install uv."
echo -e "${CYAN}${NC} Installer output:"
sed 's/^/ /' "$_uv_log" >&2
echo -e "${CYAN}${NC} Install manually: https://docs.astral.sh/uv/"
rm -f "$_uv_log" "$_uv_installer"
echo -e "${RED}${NC} Failed to install uv. Visit https://docs.astral.sh/uv/"
exit 1
fi
fi
@@ -241,21 +218,15 @@ else
# (the direct deps in pyproject.toml are exact-pinned, but
# `uv pip install` re-resolves transitives fresh from PyPI).
echo -e "${CYAN}${NC} Using uv.lock for hash-verified installation..."
echo -e "${CYAN}${NC} (first run on a fresh venv can take 1-5 minutes; uv prints progress below)"
# Critical flag choice: `--extra all`, NOT `--all-extras`. The
# latter installs every [project.optional-dependencies] key,
# bypassing the curated [all] extra and pulling backends like
# [matrix] (python-olm needs make on Windows) and [rl] (git+https
# deps that fail offline). See pyproject.toml's [all] for the
# curated set, and tools/lazy_deps.py for backends that install
# at first use.
# Also: stream stderr through directly so the user sees uv's
# progress UI instead of staring at a frozen prompt.
if UV_PROJECT_ENVIRONMENT="$SCRIPT_DIR/venv" $UV_CMD sync --extra all --locked; then
_UV_SYNC_LOG=$(mktemp)
if UV_PROJECT_ENVIRONMENT="$SCRIPT_DIR/venv" $UV_CMD sync --all-extras --locked 2>"$_UV_SYNC_LOG"; then
echo -e "${GREEN}${NC} Dependencies installed (hash-verified via uv.lock)"
rm -f "$_UV_SYNC_LOG"
else
echo -e "${YELLOW}${NC} Lockfile sync failed (see uv output above)."
echo -e "${YELLOW}${NC} Lockfile sync failed (lockfile may be stale)."
echo -e "${YELLOW}${NC} Falling back to PyPI resolve — transitives will NOT be hash-verified."
head -5 "$_UV_SYNC_LOG" | sed 's/^/ /'
rm -f "$_UV_SYNC_LOG"
_try_install
echo -e "${GREEN}${NC} Dependencies installed (transitives re-resolved, not hash-verified)"
fi
+1 -4
View File
@@ -50,7 +50,6 @@ Your job description says "route, don't execute." The rules that enforce that:
- **For any concrete task, create a Kanban task and assign it.** Every single time.
- **Split multi-lane requests before creating cards.** A user prompt can contain several independent workstreams. Extract those lanes first, then create one card per lane instead of bundling unrelated work into a single implementer card.
- **Run independent lanes in parallel.** If two cards do not need each other's output, leave them unlinked so the dispatcher can fan them out. Link only true data dependencies.
- **Never create dependent work as independent ready cards.** If a card must wait for another card, pass `parents=[...]` in the original `kanban_create` call. Do not create it first and link it later, and do not rely on prose like "wait for T1" inside the body.
- **If no specialist fits the available profiles, ask the user which profile to create or which existing profile to use.** Do not invent profile names; the dispatcher will silently drop unknown assignees.
- **Decompose, route, and summarize — that's the whole job.**
@@ -68,7 +67,7 @@ Before creating anything, draft the graph out loud (in your response to the user
2. Map each lane to one of the profiles you discovered in Step 0. If a lane doesn't fit any existing profile, ask the user which to use or create.
3. Decide whether each lane is independent or gated by another lane.
4. Create independent lanes as parallel cards with no parent links.
5. Create synthesis/review/integration cards with parent links to the lanes they depend on. A child created with unfinished parents starts in `todo`; the dispatcher promotes it to `ready` only after every parent is done.
5. Create synthesis/review/integration cards with parent links to the lanes they depend on.
Examples of prompts that should fan out (using placeholder profile names — substitute whatever exists on the user's setup):
@@ -116,8 +115,6 @@ t4 = kanban_create(
`parents=[...]` gates promotion — children stay in `todo` until every parent reaches `done`, then auto-promote to `ready`. No manual coordination needed; the dispatcher and dependency engine handle it.
If the task graph has dependencies, create the parent cards first, capture their returned ids, and include those ids in the child card's `parents` list during the child `kanban_create` call. Avoid creating all cards in parallel and linking them afterward; that creates a window where the dispatcher can claim a child before its inputs exist.
### Step 4 — Complete your own task
If you were spawned as a task yourself (e.g. a planner profile was assigned `T0: "investigate Postgres migration"`), mark it done with a summary of what you created:
-108
View File
@@ -1,108 +0,0 @@
"""Integration test: LSP layer is skipped on non-local backends.
The host-side LSP server can't see files inside a Docker/Modal/SSH
sandbox. When the agent's terminal env isn't ``LocalEnvironment``,
the file_operations layer must skip both ``snapshot_baseline`` and
``get_diagnostics_sync`` calls falling back to the in-process
syntax check exactly as if LSP were disabled.
"""
from __future__ import annotations
import os
import sys
from unittest.mock import MagicMock
import pytest
from agent.lsp import eventlog
@pytest.fixture(autouse=True)
def _reset():
eventlog.reset_announce_caches()
def test_local_only_helper_returns_true_for_local_env():
from tools.environments.local import LocalEnvironment
from tools.file_operations import ShellFileOperations
fops = ShellFileOperations(LocalEnvironment(cwd="/tmp"))
assert fops._lsp_local_only() is True
def test_local_only_helper_returns_false_for_non_local_env():
"""A mocked non-local env (Docker/Modal/SSH stand-in) returns False."""
from tools.file_operations import ShellFileOperations
# Build something that's NOT a LocalEnvironment. We use a bare
# MagicMock — isinstance() against LocalEnvironment is False.
fake_env = MagicMock()
fake_env.execute = MagicMock(return_value=MagicMock(exit_code=0, stdout=""))
fake_env.cwd = "/sandbox"
fops = ShellFileOperations(fake_env)
assert fops._lsp_local_only() is False
def test_snapshot_baseline_skipped_for_non_local(monkeypatch):
"""Verify the LSP service's snapshot_baseline is NOT called when
the backend isn't local."""
from tools.file_operations import ShellFileOperations
fake_env = MagicMock()
fake_env.execute = MagicMock(return_value=MagicMock(exit_code=0, stdout=""))
fake_env.cwd = "/sandbox"
fops = ShellFileOperations(fake_env)
snapshot_called = []
class FakeService:
def snapshot_baseline(self, path):
snapshot_called.append(path)
monkeypatch.setattr("agent.lsp.get_service", lambda: FakeService())
fops._snapshot_lsp_baseline("/sandbox/x.py")
assert snapshot_called == [], "snapshot must be skipped for non-local backends"
def test_maybe_lsp_diagnostics_returns_empty_for_non_local(monkeypatch):
from tools.file_operations import ShellFileOperations
fake_env = MagicMock()
fake_env.execute = MagicMock(return_value=MagicMock(exit_code=0, stdout=""))
fake_env.cwd = "/sandbox"
fops = ShellFileOperations(fake_env)
called = []
class FakeService:
def enabled_for(self, path):
called.append(("enabled_for", path))
return True
def get_diagnostics_sync(self, path, **kw):
called.append(("get_diagnostics_sync", path))
return [{"severity": 1, "message": "should not see this"}]
monkeypatch.setattr("agent.lsp.get_service", lambda: FakeService())
result = fops._maybe_lsp_diagnostics("/sandbox/x.py")
assert result == ""
assert called == [], "service must not be queried for non-local backends"
def test_snapshot_baseline_called_for_local_env(tmp_path, monkeypatch):
from tools.environments.local import LocalEnvironment
from tools.file_operations import ShellFileOperations
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
snapshot_called = []
class FakeService:
def snapshot_baseline(self, path):
snapshot_called.append(path)
monkeypatch.setattr("agent.lsp.get_service", lambda: FakeService())
fops._snapshot_lsp_baseline(str(tmp_path / "x.py"))
assert snapshot_called == [str(tmp_path / "x.py")]
-213
View File
@@ -1,213 +0,0 @@
"""Tests for the broken-set short-circuit added to handle outer-timeout failures.
When ``snapshot_baseline`` or ``get_diagnostics_sync`` time out from the
service layer (because a language server hangs during initialize, or
the binary is wedged), the inner spawn task is cancelled but the
inner exception handler that adds to ``_broken`` never runs. Without
the service-layer fallback added in this module, every subsequent
edit re-pays the full timeout cost until the process exits.
This module verifies:
- ``_mark_broken_for_file`` adds the right key
- ``enabled_for`` short-circuits on broken keys
- a missing binary is broken-set'd after one snapshot attempt
"""
from __future__ import annotations
import os
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from agent.lsp.manager import LSPService
from agent.lsp.servers import SERVERS, ServerContext, ServerDef, SpawnSpec
from agent.lsp.workspace import clear_cache
@pytest.fixture(autouse=True)
def _clear_workspace_cache():
clear_cache()
yield
clear_cache()
def _make_git_workspace(tmp_path: Path) -> Path:
"""Build a minimal git repo with a pyproject so pyright's root resolver fires."""
repo = tmp_path / "repo"
repo.mkdir()
(repo / ".git").mkdir()
(repo / "pyproject.toml").write_text("[project]\nname='t'\n")
return repo
def test_mark_broken_for_file_adds_correct_key(tmp_path, monkeypatch):
"""``_mark_broken_for_file`` keys the broken-set on
(server_id, per_server_root) so subsequent ``enabled_for`` calls
for files in the same project skip immediately."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
src = repo / "x.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(src), RuntimeError("simulated"))
# The pyright server resolves to the repo root via pyproject.toml.
assert ("pyright", str(repo)) in svc._broken
finally:
svc.shutdown()
def test_enabled_for_returns_false_after_broken(tmp_path, monkeypatch):
"""Once a (server_id, root) pair is in the broken-set,
``enabled_for`` returns False so the file_operations layer skips
the LSP path entirely."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
src = repo / "x.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
# Initially enabled.
assert svc.enabled_for(str(src)) is True
# Mark broken.
svc._mark_broken_for_file(str(src), RuntimeError("simulated"))
# Now disabled — the broken-set short-circuits.
assert svc.enabled_for(str(src)) is False
finally:
svc.shutdown()
def test_enabled_for_other_file_in_same_project_also_skipped(tmp_path, monkeypatch):
"""The broken key is (server_id, root), so ALL files routed through
the same server in the same project are skipped not just the one
that triggered the failure."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
a = repo / "a.py"
a.write_text("")
b = repo / "b.py"
b.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(a), RuntimeError("simulated"))
# Both files in the same project skip pyright now.
assert svc.enabled_for(str(a)) is False
assert svc.enabled_for(str(b)) is False
finally:
svc.shutdown()
def test_unrelated_project_not_affected_by_broken(tmp_path, monkeypatch):
"""Marking pyright broken for project A must NOT affect project B."""
repo_a = _make_git_workspace(tmp_path)
repo_b = tmp_path / "repo-b"
repo_b.mkdir()
(repo_b / ".git").mkdir()
(repo_b / "pyproject.toml").write_text("[project]\nname='b'\n")
a_src = repo_a / "x.py"
a_src.write_text("")
b_src = repo_b / "x.py"
b_src.write_text("")
monkeypatch.chdir(str(repo_a))
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(a_src), RuntimeError("simulated"))
# Project A skipped.
assert svc.enabled_for(str(a_src)) is False
# Project B still enabled — the broken key is per-project.
monkeypatch.chdir(str(repo_b))
assert svc.enabled_for(str(b_src)) is True
finally:
svc.shutdown()
def test_mark_broken_handles_missing_server_silently(tmp_path):
"""If the file extension doesn't match any registered server,
``_mark_broken_for_file`` no-ops nothing to mark."""
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
# No registered server for .xyz; must not raise.
svc._mark_broken_for_file(str(tmp_path / "weird.xyz"), RuntimeError("x"))
assert len(svc._broken) == 0
finally:
svc.shutdown()
def test_mark_broken_handles_no_workspace_silently(tmp_path):
"""File outside any git worktree → no workspace → no key to add."""
src = tmp_path / "orphan.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
svc._mark_broken_for_file(str(src), RuntimeError("x"))
assert len(svc._broken) == 0
finally:
svc.shutdown()
def test_snapshot_failure_marks_broken_via_outer_timeout(tmp_path, monkeypatch):
"""End-to-end: ``snapshot_baseline``'s outer ``_loop.run`` timeout
triggers ``_mark_broken_for_file``, so a second call to
``enabled_for`` returns False."""
repo = _make_git_workspace(tmp_path)
monkeypatch.chdir(str(repo))
src = repo / "x.py"
src.write_text("")
svc = LSPService(
enabled=True,
wait_mode="document",
wait_timeout=2.0,
install_strategy="manual",
)
try:
# Force the inner snapshot coroutine to raise.
async def boom(_path):
raise RuntimeError("outer-timeout simulated")
with patch.object(svc, "_snapshot_async", boom):
assert svc.enabled_for(str(src)) is True
svc.snapshot_baseline(str(src))
# After the failure, the file's pair is in the broken-set and
# ``enabled_for`` skips it.
assert ("pyright", str(repo)) in svc._broken
assert svc.enabled_for(str(src)) is False
finally:
svc.shutdown()
-146
View File
@@ -1,146 +0,0 @@
"""Tests for the ``lsp_diagnostics`` field on WriteResult / PatchResult.
The field exists so the agent can read syntax errors (``lint``) and
semantic errors (``lsp_diagnostics``) as separate signals rather than
having LSP output prepended to the lint string.
"""
from __future__ import annotations
import os
import sys
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from tools.environments.local import LocalEnvironment
from tools.file_operations import (
PatchResult,
ShellFileOperations,
WriteResult,
)
# ---------------------------------------------------------------------------
# Dataclass shape
# ---------------------------------------------------------------------------
def test_writeresult_lsp_diagnostics_optional():
r = WriteResult()
assert r.lsp_diagnostics is None
def test_writeresult_to_dict_omits_field_when_none():
r = WriteResult(bytes_written=10)
assert "lsp_diagnostics" not in r.to_dict()
def test_writeresult_to_dict_includes_field_when_set():
r = WriteResult(bytes_written=10, lsp_diagnostics="<diagnostics>...</diagnostics>")
d = r.to_dict()
assert d["lsp_diagnostics"] == "<diagnostics>...</diagnostics>"
def test_patchresult_to_dict_includes_field_when_set():
r = PatchResult(success=True, lsp_diagnostics="ERROR [1:1] thing")
d = r.to_dict()
assert d["lsp_diagnostics"] == "ERROR [1:1] thing"
def test_patchresult_to_dict_omits_field_when_none():
r = PatchResult(success=True)
assert "lsp_diagnostics" not in r.to_dict()
def test_patchresult_to_dict_omits_field_when_empty_string():
"""Empty string counts as falsy — agent shouldn't see an empty field."""
r = PatchResult(success=True, lsp_diagnostics="")
assert "lsp_diagnostics" not in r.to_dict()
# ---------------------------------------------------------------------------
# Channel separation: lint and lsp_diagnostics stay independent
# ---------------------------------------------------------------------------
def test_lint_and_lsp_diagnostics_are_separate_channels():
"""A WriteResult can carry BOTH a syntax-error lint AND an LSP
diagnostic block. They belong in separate fields."""
r = WriteResult(
bytes_written=42,
lint={"status": "error", "output": "SyntaxError: ..."},
lsp_diagnostics="<diagnostics>ERROR [1:5] type mismatch</diagnostics>",
)
d = r.to_dict()
assert "lint" in d
assert "lsp_diagnostics" in d
assert d["lint"]["output"] == "SyntaxError: ..."
assert "type mismatch" in d["lsp_diagnostics"]
# ---------------------------------------------------------------------------
# write_file populates the field via _maybe_lsp_diagnostics
# ---------------------------------------------------------------------------
def test_write_file_populates_lsp_diagnostics_when_layer_returns_block(tmp_path):
"""When the LSP layer returns a non-empty block, write_file puts it
into the ``lsp_diagnostics`` field NOT into ``lint.output``."""
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "x.py"
block = "<diagnostics file=\"x.py\">\nERROR [1:1] problem\n</diagnostics>"
with patch.object(fops, "_maybe_lsp_diagnostics", return_value=block):
res = fops.write_file(str(target), "x = 1\n")
assert res.lsp_diagnostics == block
# Lint is the syntax check, which is clean for "x = 1" — must NOT
# have the LSP block folded into it.
assert res.lint == {"status": "ok", "output": ""}
def test_write_file_lsp_diagnostics_none_when_layer_returns_empty(tmp_path):
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "x.py"
with patch.object(fops, "_maybe_lsp_diagnostics", return_value=""):
res = fops.write_file(str(target), "x = 1\n")
assert res.lsp_diagnostics is None
def test_write_file_skips_lsp_when_syntax_failed(tmp_path):
"""If the syntax check finds errors, the LSP layer should not be
consulted (a file that won't parse won't yield meaningful semantic
diagnostics)."""
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "broken.py"
with patch.object(fops, "_maybe_lsp_diagnostics") as mock_lsp:
res = fops.write_file(str(target), "def x(:\n") # syntax error
assert mock_lsp.call_count == 0
assert res.lsp_diagnostics is None
assert res.lint["status"] == "error"
# ---------------------------------------------------------------------------
# patch_replace propagates the field from the inner write_file
# ---------------------------------------------------------------------------
def test_patch_replace_propagates_lsp_diagnostics(tmp_path):
"""patch_replace's internal write_file populates lsp_diagnostics —
the outer PatchResult must carry it forward."""
fops = ShellFileOperations(LocalEnvironment(cwd=str(tmp_path)))
target = tmp_path / "x.py"
target.write_text("x = 1\n")
block = "<diagnostics>ERROR [1:5] semantic issue</diagnostics>"
with patch.object(fops, "_maybe_lsp_diagnostics", return_value=block):
res = fops.patch_replace(str(target), "x = 1", "x = 2")
assert res.success is True
assert res.lsp_diagnostics == block
@@ -1,279 +0,0 @@
"""Tests for follow-up fixes to the LSP integration (PR after #24168).
Covers:
1. ``typescript-language-server`` install recipe pulls in ``typescript``
alongside the server, so the npm install command targets both.
2. ``hermes lsp status`` surfaces a ``Backend warnings`` section when
bash-language-server is installed but ``shellcheck`` is missing.
3. ``_check_lint`` returns ``skipped`` (not ``error``) when the linter
command exists on PATH but couldn't actually run — e.g. ``npx tsc``
without the typescript SDK installed. This is what unblocks the
LSP semantic tier on TypeScript files when the user doesn't also
have a project-level ``tsc``.
"""
from __future__ import annotations
import io
from contextlib import redirect_stdout
from unittest.mock import MagicMock, patch
import pytest
from agent.lsp.install import INSTALL_RECIPES
# ---------------------------------------------------------------------------
# Fix 1: typescript install recipe carries the typescript SDK
# ---------------------------------------------------------------------------
def test_typescript_recipe_includes_typescript_sdk():
recipe = INSTALL_RECIPES["typescript-language-server"]
extras = recipe.get("extra_pkgs") or []
assert "typescript" in extras, (
"typescript-language-server requires the `typescript` SDK as a "
"sibling install — without it `initialize` fails with "
"'Could not find a valid TypeScript installation'."
)
def test_install_npm_passes_extras_to_npm_command(tmp_path, monkeypatch):
"""Verify the npm subprocess is invoked with both pkg AND extras."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
captured = {}
def fake_run(cmd, **kwargs):
captured["cmd"] = cmd
# Pretend npm succeeded but binary doesn't exist — install code
# will return None, which is fine for this test.
return MagicMock(returncode=0, stderr="")
from agent.lsp import install as install_mod
monkeypatch.setattr(install_mod.subprocess, "run", fake_run)
monkeypatch.setattr(install_mod.shutil, "which", lambda c: "/usr/bin/npm" if c == "npm" else None)
install_mod._install_npm("typescript-language-server", "typescript-language-server",
extra_pkgs=["typescript"])
cmd = captured["cmd"]
assert "typescript-language-server" in cmd
assert "typescript" in cmd
# Both must come AFTER the npm flags, in install-target position
install_idx = cmd.index("install")
assert cmd.index("typescript-language-server") > install_idx
assert cmd.index("typescript") > install_idx
def test_install_npm_works_without_extras(tmp_path, monkeypatch):
"""Backwards compat: pyright-style recipes (no extras) still install."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
captured = {}
def fake_run(cmd, **kwargs):
captured["cmd"] = cmd
return MagicMock(returncode=0, stderr="")
from agent.lsp import install as install_mod
monkeypatch.setattr(install_mod.subprocess, "run", fake_run)
monkeypatch.setattr(install_mod.shutil, "which", lambda c: "/usr/bin/npm" if c == "npm" else None)
install_mod._install_npm("pyright", "pyright-langserver")
cmd = captured["cmd"]
assert "pyright" in cmd
# Should not blow up when extra_pkgs is omitted/None
install_targets = [c for c in cmd if not c.startswith("-") and c not in (
"install", "--prefix", str(install_mod.hermes_lsp_bin_dir().parent),
"/usr/bin/npm",
)]
assert install_targets == ["pyright"]
# ---------------------------------------------------------------------------
# Fix 2: ``hermes lsp status`` surfaces shellcheck-missing for bash
# ---------------------------------------------------------------------------
def test_backend_warnings_quiet_when_bash_not_installed(tmp_path, monkeypatch):
"""No bash → no warning."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from agent.lsp import cli as lsp_cli
with patch("shutil.which", return_value=None):
notes = lsp_cli._backend_warnings()
assert notes == []
def test_backend_warnings_quiet_when_bash_and_shellcheck_both_present(tmp_path, monkeypatch):
"""Both installed → no warning."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from agent.lsp import cli as lsp_cli
def which(name):
return f"/usr/bin/{name}" # both found
with patch("shutil.which", side_effect=which):
notes = lsp_cli._backend_warnings()
assert notes == []
def test_backend_warnings_fires_when_bash_installed_but_shellcheck_missing(tmp_path, monkeypatch):
"""The exact scenario from the bug report."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
from agent.lsp import cli as lsp_cli
def which(name):
if name == "bash-language-server":
return "/fake/bin/bash-language-server"
return None # shellcheck missing
with patch("shutil.which", side_effect=which):
notes = lsp_cli._backend_warnings()
assert len(notes) == 1
assert "shellcheck" in notes[0].lower()
assert "bash-language-server" in notes[0].lower()
def test_status_output_includes_backend_warnings_section(tmp_path, monkeypatch):
"""End-to-end: status command output includes the warning section."""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
# Pretend bash-language-server is installed but shellcheck is missing
def which(name):
if name == "bash-language-server":
return "/fake/bin/bash-language-server"
return None
from agent.lsp import cli as lsp_cli
buf = io.StringIO()
with patch("shutil.which", side_effect=which), redirect_stdout(buf):
lsp_cli._cmd_status(emit_json=False)
output = buf.getvalue()
assert "Backend warnings" in output
assert "shellcheck" in output
# ---------------------------------------------------------------------------
# Fix 3: tier-1 lint treats unusable linters as ``skipped``, not ``error``
# ---------------------------------------------------------------------------
def test_npx_tsc_missing_treated_as_skipped():
"""The original bug: ``npx tsc`` errors when tsc isn't installed.
Without this fix, the lint result is ``error``, which means the LSP
semantic tier (gated on ``success or skipped``) is skipped the user
gets a useless tooling-error message instead of real diagnostics.
"""
from tools.file_operations import _looks_like_linter_unusable
npx_failure_output = (
" \n"
" This is not the tsc command you are looking for \n"
" \n"
"\n"
"To get access to the TypeScript compiler, tsc, from the command line either:\n"
"- Use npm install typescript to first add TypeScript to your project before using npx\n"
)
assert _looks_like_linter_unusable("npx", npx_failure_output) is True
def test_real_lint_error_not_classified_as_unusable():
"""A genuine TypeScript type error must NOT be misclassified."""
from tools.file_operations import _looks_like_linter_unusable
real_error = (
"bad.ts:5:1 - error TS2322: Type 'number' is not assignable to type 'string'.\n"
"5 const x: string = greet(42);\n"
" ~~~~~~~~~~~~~~~\n"
)
assert _looks_like_linter_unusable("npx", real_error) is False
def test_unknown_base_cmd_returns_false():
"""Unfamiliar linters fall through and use the normal error path."""
from tools.file_operations import _looks_like_linter_unusable
assert _looks_like_linter_unusable("eslint", "any output") is False
assert _looks_like_linter_unusable("", "anything") is False
def test_check_lint_returns_skipped_when_npx_tsc_unusable(tmp_path):
"""Integration: _check_lint sees npx exit non-zero with the npx banner
and returns a ``skipped`` LintResult so LSP can still run."""
from tools.environments.local import LocalEnvironment
from tools.file_operations import ShellFileOperations
ts_file = tmp_path / "bad.ts"
ts_file.write_text("const x: string = 42;\n")
env = LocalEnvironment()
fops = ShellFileOperations(env)
# Patch _exec to simulate ``npx tsc`` failing because tsc is missing.
npx_banner = (
" \n"
" This is not the tsc command you are looking for \n"
)
def fake_exec(cmd, **kwargs):
result = MagicMock()
result.exit_code = 1
result.stdout = npx_banner
return result
with patch.object(fops, "_exec", side_effect=fake_exec), \
patch.object(fops, "_has_command", return_value=True):
lint = fops._check_lint(str(ts_file))
assert lint.skipped is True, (
f"expected skipped (so LSP runs); got success={lint.success}, "
f"output={lint.output!r}"
)
assert "not usable" in (lint.message or "")
def test_check_lint_returns_error_for_real_ts_type_errors(tmp_path):
"""Sanity: real TypeScript errors still go through the error path."""
from tools.environments.local import LocalEnvironment
from tools.file_operations import ShellFileOperations
ts_file = tmp_path / "bad.ts"
ts_file.write_text("const x: string = 42;\n")
env = LocalEnvironment()
fops = ShellFileOperations(env)
real_tsc_error = (
"bad.ts:1:7 - error TS2322: Type 'number' is not assignable to type 'string'.\n"
"1 const x: string = 42;\n"
" ~\n"
"Found 1 error.\n"
)
def fake_exec(cmd, **kwargs):
result = MagicMock()
result.exit_code = 1
result.stdout = real_tsc_error
return result
with patch.object(fops, "_exec", side_effect=fake_exec), \
patch.object(fops, "_has_command", return_value=True):
lint = fops._check_lint(str(ts_file))
assert lint.skipped is False
assert lint.success is False
assert "TS2322" in lint.output
if __name__ == "__main__": # pragma: no cover
pytest.main([__file__, "-v"])
-144
View File
@@ -1,144 +0,0 @@
"""Tests for service-singleton lifecycle: atexit handler, idempotent shutdown.
These cover the exit-cleanup behavior added to plug the language-server
process leak without the atexit hook, ``hermes chat`` exits while
pyright/gopls/etc. are still alive on the host.
"""
from __future__ import annotations
import atexit
from unittest.mock import MagicMock, patch
import pytest
from agent import lsp as lsp_module
@pytest.fixture(autouse=True)
def _reset_singleton():
"""Force a clean module state before each test.
Tests in this file share process-global state (the lazy
singleton + atexit registration flag); reset both before and
after every test so order doesn't matter.
"""
lsp_module._service = None
lsp_module._atexit_registered = False
yield
lsp_module._service = None
lsp_module._atexit_registered = False
def test_get_service_registers_atexit_handler_once(monkeypatch):
"""First call to ``get_service`` must register an atexit handler;
subsequent calls must NOT register another one (Python's ``atexit``
runs every registered callable, so a duplicate would shutdown
twice harmless but wasteful)."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = True
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
registrations = []
def fake_register(fn):
registrations.append(fn)
monkeypatch.setattr(atexit, "register", fake_register)
a = lsp_module.get_service()
b = lsp_module.get_service()
c = lsp_module.get_service()
assert a is fake_svc
assert b is fake_svc
assert c is fake_svc
assert len(registrations) == 1
# The registered callable must be our internal shutdown wrapper.
assert registrations[0] is lsp_module._atexit_shutdown
def test_atexit_shutdown_calls_shutdown_service(monkeypatch):
"""The atexit-registered wrapper invokes ``shutdown_service`` and
swallows any exception by the time atexit fires, the user has
already seen the response and a noisy traceback would be clutter."""
called = []
monkeypatch.setattr(
lsp_module, "shutdown_service", lambda: called.append("shutdown")
)
lsp_module._atexit_shutdown()
assert called == ["shutdown"]
def test_atexit_shutdown_swallows_exceptions(monkeypatch):
def boom():
raise RuntimeError("server already dead")
monkeypatch.setattr(lsp_module, "shutdown_service", boom)
# Must not raise.
lsp_module._atexit_shutdown()
def test_shutdown_service_idempotent(monkeypatch):
"""Calling shutdown twice must be safe — first call cleans up,
second call no-ops (nothing to shut down)."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = True
fake_svc.shutdown = MagicMock()
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
lsp_module.get_service()
lsp_module.shutdown_service()
lsp_module.shutdown_service() # must not raise
assert fake_svc.shutdown.call_count == 1
def test_shutdown_service_no_op_when_never_started():
"""Calling shutdown without ever creating the service is safe."""
lsp_module.shutdown_service() # must not raise
def test_shutdown_service_swallows_exception(monkeypatch):
"""An exception during ``svc.shutdown()`` must not propagate —
the caller (often atexit) has nothing useful to do with it."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = True
fake_svc.shutdown = MagicMock(side_effect=RuntimeError("kill -9 already"))
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
lsp_module.get_service()
lsp_module.shutdown_service() # must not raise
def test_get_service_returns_none_for_inactive_service(monkeypatch):
"""A service whose ``is_active()`` returns False is treated as
not running callers see ``None`` and fall back."""
fake_svc = MagicMock()
fake_svc.is_active.return_value = False
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: fake_svc)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
assert lsp_module.get_service() is None
# Subsequent call returns None too — but the inactive instance is
# cached so we don't re-build it on every check.
assert lsp_module.get_service() is None
def test_get_service_returns_none_when_create_fails(monkeypatch):
"""Service factory returning ``None`` (no config, etc.) propagates."""
monkeypatch.setattr(
lsp_module.LSPService, "create_from_config", classmethod(lambda cls: None)
)
monkeypatch.setattr(atexit, "register", lambda fn: None)
assert lsp_module.get_service() is None
-1
View File
@@ -660,7 +660,6 @@ class TestAuxiliaryPoolAwareness:
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.auxiliary_client.OpenAI") as mock_openai,
patch("hermes_cli.models.get_nous_recommended_aux_model", return_value=None),
):
from agent.auxiliary_client import _try_nous
-234
View File
@@ -473,240 +473,6 @@ class TestCodexOAuthContextLength:
assert ctx == 1_000_000, "Non-codex 1M cache entries must be respected"
# =========================================================================
# Nous Portal context-window resolution (provider="nous")
# =========================================================================
class TestNousPortalContextResolution:
"""Nous Portal /v1/models is authoritative for what Nous infra enforces
and may diverge from the OpenRouter catalog.
Invariants this class pins down:
1. Portal value wins over the OR fallback.
2. Portal-derived values are persisted to disk.
3. OR-fallback values are NEVER persisted otherwise a single portal
blip would freeze the wrong value in via step-1 cache short-circuit.
4. Pre-fix persistent-cache entries (seeded from the OR catalog) are
bypassed at step 1 and overwritten once the portal responds.
5. Pre-fix persistent-cache entries SURVIVE on disk when the portal
is unreachable no opportunistic invalidation that loses the only
value we have.
"""
def setup_method(self):
import agent.model_metadata as mm
mm._endpoint_model_metadata_cache.clear()
mm._endpoint_model_metadata_cache_time.clear()
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
@patch("agent.model_metadata.fetch_model_metadata")
def test_portal_value_wins_over_openrouter_catalog(
self, mock_or, mock_portal, tmp_path, monkeypatch
):
"""The motivating case: OR catalog says 1M for qwen3.6-plus, but
the Nous portal correctly enforces 262144. Portal must win."""
import agent.model_metadata as mm
cache_file = tmp_path / "context_length_cache.yaml"
monkeypatch.setattr(mm, "_get_context_cache_path", lambda: cache_file)
mock_portal.return_value = {
"qwen3.6-plus": {"context_length": 262_144},
}
mock_or.return_value = {
"qwen/qwen3.6-plus": {"context_length": 1_000_000},
}
ctx = mm.get_model_context_length(
model="qwen3.6-plus",
base_url="https://inference-api.nousresearch.com/v1",
api_key="fake-token",
provider="nous",
)
assert ctx == 262_144, (
f"Portal must override OR catalog; got {ctx} (OR leak?)"
)
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
@patch("agent.model_metadata.fetch_model_metadata")
def test_portal_value_is_persisted_to_disk(
self, mock_or, mock_portal, tmp_path, monkeypatch
):
"""Portal-derived value should land in the persistent cache so
cross-process callers (e.g. child agents) see the same value."""
import agent.model_metadata as mm
cache_file = tmp_path / "context_length_cache.yaml"
monkeypatch.setattr(mm, "_get_context_cache_path", lambda: cache_file)
mock_portal.return_value = {
"qwen3.6-plus": {"context_length": 262_144},
}
mock_or.return_value = {}
base_url = "https://inference-api.nousresearch.com/v1"
ctx = mm.get_model_context_length(
model="qwen3.6-plus",
base_url=base_url,
api_key="fake",
provider="nous",
)
assert ctx == 262_144
persisted = yaml.safe_load(cache_file.read_text()).get("context_lengths", {})
assert persisted.get(f"qwen3.6-plus@{base_url}") == 262_144, (
"Portal-derived value should be persisted to disk"
)
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
@patch("agent.model_metadata.fetch_model_metadata")
def test_openrouter_fallback_is_not_persisted(
self, mock_or, mock_portal, tmp_path, monkeypatch
):
"""When the portal can't resolve a model (network blip, auth glitch,
model not yet listed) we fall back to the OR catalog so the agent
keeps working but we must NOT write the OR value to disk. Once
cached on disk, step-1 short-circuits forever and the user is stuck
with the wrong number until they manually clear the cache."""
import agent.model_metadata as mm
cache_file = tmp_path / "context_length_cache.yaml"
monkeypatch.setattr(mm, "_get_context_cache_path", lambda: cache_file)
mock_portal.return_value = {} # portal unreachable / model unknown
mock_or.return_value = {
"qwen/qwen3.6-plus": {"context_length": 1_000_000},
}
base_url = "https://inference-api.nousresearch.com/v1"
ctx = mm.get_model_context_length(
model="qwen3.6-plus",
base_url=base_url,
api_key="fake",
provider="nous",
)
assert ctx == 1_000_000, "OR fallback should still serve the request"
assert not cache_file.exists() or not yaml.safe_load(
cache_file.read_text()
).get("context_lengths", {}), (
"OR-fallback values must NOT be persisted — a single portal blip "
"would otherwise freeze the wrong value in via step-1 cache hit"
)
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
@patch("agent.model_metadata.fetch_model_metadata")
def test_stale_cache_is_bypassed_and_overwritten_by_portal(
self, mock_or, mock_portal, tmp_path, monkeypatch
):
"""Users upgrading from pre-fix builds have ``qwen3.6-plus@…nous… =
1000000`` (OR-derived) sitting in their cache file. Step 1 must
NOT short-circuit on that entry step 5b reconciles against the
portal and overwrites the persistent value with 262144."""
import agent.model_metadata as mm
cache_file = tmp_path / "context_length_cache.yaml"
monkeypatch.setattr(mm, "_get_context_cache_path", lambda: cache_file)
base_url = "https://inference-api.nousresearch.com/v1"
stale_key = f"qwen3.6-plus@{base_url}"
other_key = "other-model@https://api.openai.com/v1"
cache_file.write_text(yaml.dump({"context_lengths": {
stale_key: 1_000_000, # pre-fix OR-derived value
other_key: 128_000, # unrelated, must survive
}}))
mock_portal.return_value = {
"qwen3.6-plus": {"context_length": 262_144},
}
mock_or.return_value = {}
ctx = mm.get_model_context_length(
model="qwen3.6-plus",
base_url=base_url,
api_key="fake",
provider="nous",
)
assert ctx == 262_144, (
f"Stale OR-derived cache entry should not have leaked through; got {ctx}"
)
remaining = yaml.safe_load(cache_file.read_text()).get("context_lengths", {})
assert remaining.get(stale_key) == 262_144, (
"Portal value should have overwritten the stale entry on disk"
)
assert remaining.get(other_key) == 128_000, (
"Unrelated cache entries must not be touched"
)
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
@patch("agent.model_metadata.fetch_model_metadata")
def test_stale_cache_survives_when_portal_unreachable(
self, mock_or, mock_portal, tmp_path, monkeypatch
):
"""When the portal is unreachable AND we have a (potentially stale)
on-disk cache entry, the entry must survive untouched we don't
want a transient outage to delete the only value we have. The
request itself still gets served via OR fallback for this call."""
import agent.model_metadata as mm
cache_file = tmp_path / "context_length_cache.yaml"
monkeypatch.setattr(mm, "_get_context_cache_path", lambda: cache_file)
base_url = "https://inference-api.nousresearch.com/v1"
existing_key = f"qwen3.6-plus@{base_url}"
cache_file.write_text(yaml.dump({"context_lengths": {
existing_key: 1_000_000,
}}))
mock_portal.return_value = {} # portal unreachable
mock_or.return_value = {
"qwen/qwen3.6-plus": {"context_length": 1_000_000},
}
mm.get_model_context_length(
model="qwen3.6-plus",
base_url=base_url,
api_key="fake",
provider="nous",
)
remaining = yaml.safe_load(cache_file.read_text()).get("context_lengths", {})
assert remaining.get(existing_key) == 1_000_000, (
"Persistent cache entry must survive a transient portal outage"
)
@patch("agent.model_metadata.fetch_endpoint_model_metadata")
@patch("agent.model_metadata.fetch_model_metadata")
def test_bypass_keyed_on_url_not_provider_string(
self, mock_or, mock_portal, tmp_path, monkeypatch
):
"""Some call sites pass ``provider=""`` or ``provider="openrouter"``
when the user is really on Nous Portal (e.g. cred-pool fallback).
The Nous-URL bypass must trigger off the URL host, not the provider
string, so the portal-first resolver still runs in that case."""
import agent.model_metadata as mm
cache_file = tmp_path / "context_length_cache.yaml"
monkeypatch.setattr(mm, "_get_context_cache_path", lambda: cache_file)
base_url = "https://inference-api.nousresearch.com/v1"
cache_file.write_text(yaml.dump({"context_lengths": {
f"qwen3.6-plus@{base_url}": 1_000_000, # stale
}}))
mock_portal.return_value = {
"qwen3.6-plus": {"context_length": 262_144},
}
mock_or.return_value = {}
for provider_arg in ("", "openrouter", "custom"):
mm._endpoint_model_metadata_cache.clear()
mm._endpoint_model_metadata_cache_time.clear()
ctx = mm.get_model_context_length(
model="qwen3.6-plus",
base_url=base_url,
api_key="fake",
provider=provider_arg,
)
assert ctx == 262_144, (
f"URL-based Nous detection must fire for provider={provider_arg!r}; "
f"got {ctx}"
)
# =========================================================================
# get_model_context_length — resolution order
# =========================================================================
-34
View File
@@ -190,37 +190,3 @@ def test_custom_endpoint_models_api_pricing_is_supported(monkeypatch):
assert float(entry.input_cost_per_million) == 0.5
assert float(entry.output_cost_per_million) == 2.0
def test_deepseek_v4_pro_pricing_entry_exists():
"""Regression test: deepseek-v4-pro must have a pricing entry.
Before this fix, deepseek-v4-pro sessions showed as unknown cost
in hermes insights because the _OFFICIAL_DOCS_PRICING table had no
entry for that model. See #24218.
"""
entry = get_pricing_entry(
"deepseek-v4-pro",
provider="deepseek",
)
assert entry is not None
assert entry.input_cost_per_million is not None
assert entry.output_cost_per_million is not None
assert float(entry.input_cost_per_million) == 1.74
assert float(entry.output_cost_per_million) == 3.48
assert float(entry.cache_read_cost_per_million) == 0.0145
def test_deepseek_v4_pro_estimate_usage_cost():
"""Ensure deepseek-v4-pro sessions get a dollar estimate, not unknown."""
result = estimate_usage_cost(
"deepseek-v4-pro",
CanonicalUsage(input_tokens=1000000, output_tokens=500000),
provider="deepseek",
)
assert result.status == "estimated"
assert result.amount_usd is not None
# 1M input × $1.74/M + 500K output × $3.48/M = $1.74 + $1.74 = $3.48
assert float(result.amount_usd) == 3.48
-43
View File
@@ -1,43 +0,0 @@
from unittest.mock import MagicMock, patch
from cli import HermesCLI
class _InsightsEngineStub:
calls = []
def __init__(self, db):
self.db = db
def generate(self, *, days=30, source=None):
self.calls.append({"days": days, "source": source})
return {"days": days, "source": source}
def format_terminal(self, report):
return f"days={report['days']} source={report['source']}"
def _run_show_insights(command: str):
cli_obj = HermesCLI.__new__(HermesCLI)
db = MagicMock()
_InsightsEngineStub.calls = []
with patch("hermes_state.SessionDB", return_value=db), \
patch("agent.insights.InsightsEngine", _InsightsEngineStub):
cli_obj._show_insights(command)
return _InsightsEngineStub.calls, db
def test_cli_insights_accepts_positional_days(capsys):
calls, db = _run_show_insights("/insights 7")
assert calls == [{"days": 7, "source": None}]
db.close.assert_called_once()
assert "days=7 source=None" in capsys.readouterr().out
def test_cli_insights_keeps_days_flag_and_source(capsys):
calls, db = _run_show_insights("/insights --days 14 --source discord")
assert calls == [{"days": 14, "source": "discord"}]
db.close.assert_called_once()
assert "days=14 source=discord" in capsys.readouterr().out
-3
View File
@@ -222,9 +222,6 @@ def make_runner(platform: Platform, session_entry: SessionEntry = None) -> "Gate
runner._capture_gateway_honcho_if_configured = lambda *a, **kw: None
runner._emit_gateway_run_progress = AsyncMock()
# Disable destructive slash confirm gate so /new executes immediately
runner._read_user_config = lambda: {"approvals": {"destructive_slash_confirm": False}}
runner.pairing_store = MagicMock()
runner.pairing_store._is_rate_limited = MagicMock(return_value=False)
runner.pairing_store.generate_code = MagicMock(return_value="ABC123")
+1 -96
View File
@@ -681,56 +681,6 @@ class TestChatCompletionsEndpoint:
assert "[DONE]" in body
assert "Hello!" in body
@pytest.mark.asyncio
async def test_stream_task_done_callback_enqueues_eos_for_chat_completions(self, adapter):
"""Regression guard for #24451: completion callback must signal SSE EOS."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
class _FakeTask:
def __init__(self):
self.callbacks = []
def add_done_callback(self, cb):
self.callbacks.append(cb)
fake_task = _FakeTask()
def _fake_ensure_future(coro):
# We short-circuit task scheduling in this unit test.
coro.close()
return fake_task
with (
patch.object(
adapter,
"_run_agent",
new=AsyncMock(
return_value=(
{"final_response": "ok", "messages": [], "api_calls": 1},
{"input_tokens": 1, "output_tokens": 1, "total_tokens": 2},
)
),
),
patch("gateway.platforms.api_server.asyncio.ensure_future", side_effect=_fake_ensure_future),
patch.object(adapter, "_write_sse_chat_completion", new_callable=AsyncMock) as mock_write_sse,
):
mock_write_sse.return_value = web.Response(status=200, text="ok")
resp = await cli.post(
"/v1/chat/completions",
json={
"model": "test",
"messages": [{"role": "user", "content": "hi"}],
"stream": True,
},
)
assert resp.status == 200
assert len(fake_task.callbacks) == 1
stream_q = mock_write_sse.call_args.args[4]
assert stream_q.empty()
fake_task.callbacks[0](fake_task)
assert stream_q.get_nowait() is None
@pytest.mark.asyncio
async def test_stream_sends_keepalive_during_quiet_tool_gap(self, adapter):
"""Idle SSE streams should send keepalive comments while tools run silently."""
@@ -1726,52 +1676,6 @@ class TestResponsesStreaming:
assert "Hello" in body
assert " world" in body
@pytest.mark.asyncio
async def test_stream_task_done_callback_enqueues_eos_for_responses(self, adapter):
"""Regression guard for #24451 on /v1/responses streaming path."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
class _FakeTask:
def __init__(self):
self.callbacks = []
def add_done_callback(self, cb):
self.callbacks.append(cb)
fake_task = _FakeTask()
def _fake_ensure_future(coro):
# We short-circuit task scheduling in this unit test.
coro.close()
return fake_task
with (
patch.object(
adapter,
"_run_agent",
new=AsyncMock(
return_value=(
{"final_response": "ok", "messages": [], "api_calls": 1},
{"input_tokens": 1, "output_tokens": 1, "total_tokens": 2},
)
),
),
patch("gateway.platforms.api_server.asyncio.ensure_future", side_effect=_fake_ensure_future),
patch.object(adapter, "_write_sse_responses", new_callable=AsyncMock) as mock_write_sse,
):
mock_write_sse.return_value = web.Response(status=200, text="ok")
resp = await cli.post(
"/v1/responses",
json={"model": "hermes-agent", "input": "hi", "stream": True},
)
assert resp.status == 200
assert len(fake_task.callbacks) == 1
stream_q = mock_write_sse.call_args.kwargs["stream_q"]
assert stream_q.empty()
fake_task.callbacks[0](fake_task)
assert stream_q.get_nowait() is None
@pytest.mark.asyncio
async def test_stream_emits_function_call_and_output_items(self, adapter):
app = _create_app(adapter)
@@ -3157,3 +3061,4 @@ class TestSessionKeyHeader:
assert resp.status == 200
data = await resp.json()
assert data["features"]["session_key_header"] == "X-Hermes-Session-Key"
+2 -2
View File
@@ -176,8 +176,8 @@ class TestStreamingConfig:
"fresh_final_after_seconds": "oops",
}
)
assert restored.edit_interval == 0.8
assert restored.buffer_threshold == 24
assert restored.edit_interval == 1.0
assert restored.buffer_threshold == 40
assert restored.fresh_final_after_seconds == 60.0
-130
View File
@@ -444,93 +444,6 @@ class TestScopedLocks:
assert acquired is False
assert existing["pid"] == 99999
def test_acquire_scoped_lock_replaces_pid_reused_by_unrelated_process(self, tmp_path, monkeypatch):
"""macOS regression: PID reused by an unrelated process with start_time=None.
On macOS /proc is unavailable, so both the lock record and the live
process report start_time=None. The live PID is alive (os.kill
succeeds) but belongs to a completely different program. The lock
must be treated as stale.
"""
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 873,
"start_time": None,
"kind": "hermes-gateway",
"argv": ["/Users/user/.hermes/hermes-agent/hermes_cli/main.py", "gateway", "run", "--replace"],
}))
# Post-#21561 the liveness probe routes through
# ``gateway.status._pid_exists`` (psutil-first, safe on Windows),
# not ``os.kill``.
monkeypatch.setattr(status, "_pid_exists", lambda pid: True)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: None)
monkeypatch.setattr(status, "_looks_like_gateway_process", lambda pid: False)
# On macOS ``ps`` is available, so _read_process_cmdline returns the
# unrelated process's name. This confirms the PID was reused.
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: "/usr/libexec/bluetoothuserd")
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is True
payload = json.loads(lock_path.read_text())
assert payload["pid"] == os.getpid()
assert payload["metadata"]["platform"] == "telegram"
def test_acquire_scoped_lock_keeps_lock_when_cmdline_unreadable_but_record_is_gateway(self, tmp_path, monkeypatch):
"""Windows regression: ps unavailable so cmdline cannot be read.
When start_time is None on both sides and _looks_like_gateway_process
returns False because ps is missing (not because the PID belongs to an
unrelated process), the stale check must not delete a valid gateway
lock. Fall back to the lock record's own argv — written by the
gateway at startup before declaring the lock stale.
"""
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 99999,
"start_time": None,
"kind": "hermes-gateway",
"argv": ["hermes_cli/main.py", "gateway", "run"],
}))
monkeypatch.setattr(status, "_pid_exists", lambda pid: True)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: None)
# Windows: ps not available, so _read_process_cmdline returns None
# and _looks_like_gateway_process returns False for every process.
monkeypatch.setattr(status, "_looks_like_gateway_process", lambda pid: False)
monkeypatch.setattr(status, "_read_process_cmdline", lambda pid: None)
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is False
assert existing["pid"] == 99999
def test_acquire_scoped_lock_keeps_lock_when_pid_reused_by_gateway(self, tmp_path, monkeypatch):
"""When start_time is None but the live PID still looks like a gateway, keep the lock."""
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
lock_path.parent.mkdir(parents=True, exist_ok=True)
lock_path.write_text(json.dumps({
"pid": 99999,
"start_time": None,
"kind": "hermes-gateway",
"argv": ["/Users/user/.hermes/hermes-agent/hermes_cli/main.py", "gateway", "run", "--replace"],
}))
monkeypatch.setattr(status, "_pid_exists", lambda pid: True)
monkeypatch.setattr(status, "_get_process_start_time", lambda pid: None)
monkeypatch.setattr(status, "_looks_like_gateway_process", lambda pid: True)
acquired, existing = status.acquire_scoped_lock("telegram-bot-token", "secret", metadata={"platform": "telegram"})
assert acquired is False
assert existing["pid"] == 99999
def test_acquire_scoped_lock_replaces_stale_record(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_GATEWAY_LOCK_DIR", str(tmp_path / "locks"))
lock_path = tmp_path / "locks" / "telegram-bot-token-2bb80d537b1da3e3.lock"
@@ -898,46 +811,3 @@ class TestPlannedStopMarker:
ok = status.write_planned_stop_marker(target_pid=12345)
assert ok is False
class TestReadProcessCmdlinePsFallback:
"""Tests for _read_process_cmdline falling back to ps on non-Linux."""
def test_ps_fallback_when_proc_unavailable(self, monkeypatch):
monkeypatch.setattr(status.Path, "read_bytes", lambda self: (_ for _ in ()).throw(FileNotFoundError))
monkeypatch.setattr(
status.subprocess, "run",
lambda args, **kwargs: SimpleNamespace(returncode=0, stdout="/usr/libexec/bluetoothuserd\n"),
)
result = status._read_process_cmdline(873)
assert result == "/usr/libexec/bluetoothuserd"
def test_ps_fallback_returns_none_on_failure(self, monkeypatch):
monkeypatch.setattr(status.Path, "read_bytes", lambda self: (_ for _ in ()).throw(FileNotFoundError))
monkeypatch.setattr(
status.subprocess, "run",
lambda args, **kwargs: SimpleNamespace(returncode=1, stdout=""),
)
result = status._read_process_cmdline(99999)
assert result is None
def test_proc_cmdline_takes_priority_over_ps(self, monkeypatch):
calls = []
def fake_read_bytes(self):
calls.append("proc")
return b"python\x00hermes_cli/main.py\x00gateway\x00"
monkeypatch.setattr(status.Path, "read_bytes", fake_read_bytes)
result = status._read_process_cmdline(12345)
assert "hermes_cli/main.py" in result
assert calls == ["proc"]
def test_ps_fallback_used_when_proc_returns_empty(self, monkeypatch):
monkeypatch.setattr(status.Path, "read_bytes", lambda self: b"")
monkeypatch.setattr(
status.subprocess, "run",
lambda args, **kwargs: SimpleNamespace(returncode=0, stdout="python hermes_cli/main.py gateway run\n"),
)
result = status._read_process_cmdline(12345)
assert "hermes_cli/main.py" in result
@@ -1,451 +0,0 @@
"""Tests for Telegram inline keyboard clarify buttons.
Mirrors test_telegram_approval_buttons.py for the new ``send_clarify`` and
``cl:`` callback dispatch added in feat/clarify-gateway-buttons.
"""
import asyncio
import os
import sys
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Ensure the repo root is importable
# ---------------------------------------------------------------------------
_repo = str(Path(__file__).resolve().parents[2])
if _repo not in sys.path:
sys.path.insert(0, _repo)
# ---------------------------------------------------------------------------
# Minimal Telegram mock so TelegramAdapter can be imported (mirrors
# test_telegram_approval_buttons.py)
# ---------------------------------------------------------------------------
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
mod.constants.ParseMode.MARKDOWN = "Markdown"
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
mod.constants.ParseMode.HTML = "HTML"
mod.constants.ChatType.PRIVATE = "private"
mod.constants.ChatType.GROUP = "group"
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
mod.error.NetworkError = type("NetworkError", (OSError,), {})
mod.error.TimedOut = type("TimedOut", (OSError,), {})
mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
sys.modules.setdefault("telegram.error", mod.error)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter
from gateway.config import Platform, PlatformConfig
def _make_adapter(extra=None):
config = PlatformConfig(enabled=True, token="test-token", extra=extra or {})
adapter = TelegramAdapter(config)
adapter._bot = AsyncMock()
adapter._app = MagicMock()
return adapter
def _clear_clarify_state():
from tools import clarify_gateway as cm
with cm._lock:
cm._entries.clear()
cm._session_index.clear()
cm._notify_cbs.clear()
# ===========================================================================
# send_clarify — render
# ===========================================================================
class TestTelegramSendClarify:
"""Verify the rendered prompt has buttons or none, and stores state."""
def setup_method(self):
_clear_clarify_state()
@pytest.mark.asyncio
async def test_multi_choice_renders_buttons_and_other(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 100
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
result = await adapter.send_clarify(
chat_id="12345",
question="Which option?",
choices=["alpha", "beta", "gamma"],
clarify_id="cid1",
session_key="sk1",
)
assert result.success is True
assert result.message_id == "100"
kwargs = adapter._bot.send_message.call_args[1]
assert kwargs["chat_id"] == 12345
assert "Which option?" in kwargs["text"]
# InlineKeyboardMarkup with N+1 buttons (3 choices + Other)
markup = kwargs["reply_markup"]
assert markup is not None
# Mocked InlineKeyboardMarkup — just verify it was constructed
# with rows. We check state instead of poking the mock structure.
assert "cid1" in adapter._clarify_state
assert adapter._clarify_state["cid1"] == "sk1"
@pytest.mark.asyncio
async def test_open_ended_no_keyboard(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 101
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
result = await adapter.send_clarify(
chat_id="12345",
question="What is your name?",
choices=None,
clarify_id="cid2",
session_key="sk2",
)
assert result.success is True
kwargs = adapter._bot.send_message.call_args[1]
# No reply_markup means no buttons — open-ended path
assert "reply_markup" not in kwargs
assert "What is your name?" in kwargs["text"]
assert adapter._clarify_state["cid2"] == "sk2"
@pytest.mark.asyncio
async def test_not_connected(self):
adapter = _make_adapter()
adapter._bot = None
result = await adapter.send_clarify(
chat_id="12345",
question="?",
choices=["a"],
clarify_id="cid3",
session_key="sk3",
)
assert result.success is False
@pytest.mark.asyncio
async def test_truncates_long_choice_label(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 102
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
long_choice = "x" * 200 # > 60 char cap
result = await adapter.send_clarify(
chat_id="12345",
question="?",
choices=[long_choice],
clarify_id="cid4",
session_key="sk4",
)
assert result.success is True
# The truncation logic replaces with "..." past 57 chars; we don't
# inspect the mock's button labels directly (auto-MagicMock), but
# we can verify the call didn't raise on absurdly long input.
@pytest.mark.asyncio
async def test_html_escapes_question(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 103
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
await adapter.send_clarify(
chat_id="12345",
question="<script>alert(1)</script>",
choices=["x"],
clarify_id="cid5",
session_key="sk5",
)
kwargs = adapter._bot.send_message.call_args[1]
# Must NOT contain raw <script> — html.escape should have neutralized
assert "<script>" not in kwargs["text"]
assert "&lt;script&gt;" in kwargs["text"]
# ===========================================================================
# Callback dispatch — _handle_callback_query routing for cl:* prefixes
# ===========================================================================
class TestTelegramClarifyCallback:
"""Verify clicking a button resolves the clarify primitive."""
def setup_method(self):
_clear_clarify_state()
@pytest.mark.asyncio
async def test_numeric_choice_resolves_with_choice_text(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
# Pre-register a clarify entry so the callback can look up the choice text
cm.register("cidA", "sk-cb", "Pick", ["red", "green", "blue"])
adapter._clarify_state["cidA"] = "sk-cb"
query = AsyncMock()
query.data = "cl:cidA:1" # green
query.message = MagicMock()
query.message.chat_id = 12345
query.message.text = "Pick"
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
# State popped
assert "cidA" not in adapter._clarify_state
# Wait shouldn't be needed — resolve_gateway_clarify is sync.
# The entry's response should be set.
# We test by reading the entry's response directly.
with cm._lock:
entry = cm._entries.get("cidA")
# Entry might be popped by wait_for_response, but here we never
# called wait — so it's still in _entries with response set.
assert entry is not None
assert entry.response == "green"
assert entry.event.is_set()
query.answer.assert_called_once()
query.edit_message_text.assert_called_once()
@pytest.mark.asyncio
async def test_other_button_flips_to_text_mode(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
cm.register("cidB", "sk-cb-other", "Pick", ["x", "y"])
adapter._clarify_state["cidB"] = "sk-cb-other"
query = AsyncMock()
query.data = "cl:cidB:other"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.text = "Pick"
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
# Entry should now be in text-capture mode
pending = cm.get_pending_for_session("sk-cb-other")
assert pending is not None
assert pending.clarify_id == "cidB"
assert pending.awaiting_text is True
# State NOT popped — the user still needs to type their answer
assert "cidB" in adapter._clarify_state
# Entry NOT yet resolved
with cm._lock:
entry = cm._entries.get("cidB")
assert entry is not None
assert not entry.event.is_set()
@pytest.mark.asyncio
async def test_already_resolved(self):
adapter = _make_adapter()
# No state for cidGone
query = AsyncMock()
query.data = "cl:cidGone:0"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
query.answer.assert_called_once()
# Should NOT resolve anything
assert "already" in query.answer.call_args[1]["text"].lower()
@pytest.mark.asyncio
async def test_unauthorized_user_rejected(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
cm.register("cidC", "sk-auth", "Pick", ["a", "b"])
adapter._clarify_state["cidC"] = "sk-auth"
# Hook up a runner that says NOT authorized
class _DenyRunner:
async def _handle_message(self, event):
return None
def _is_user_authorized(self, source):
return False
adapter._message_handler = _DenyRunner()._handle_message
query = AsyncMock()
query.data = "cl:cidC:0"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.chat.type = "private"
query.message.text = "Pick"
query.from_user = MagicMock()
query.from_user.id = "999"
query.from_user.first_name = "Mallory"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
await adapter._handle_callback_query(update, context)
# Must not resolve, must answer with not-authorized message
with cm._lock:
entry = cm._entries.get("cidC")
assert entry is not None
assert not entry.event.is_set()
query.answer.assert_called_once()
assert "not authorized" in query.answer.call_args[1]["text"].lower()
# State preserved
assert adapter._clarify_state["cidC"] == "sk-auth"
@pytest.mark.asyncio
async def test_invalid_choice_token(self):
from tools import clarify_gateway as cm
adapter = _make_adapter()
cm.register("cidD", "sk-inv", "Q?", ["a"])
adapter._clarify_state["cidD"] = "sk-inv"
query = AsyncMock()
query.data = "cl:cidD:not-a-number"
query.message = MagicMock()
query.message.chat_id = 12345
query.message.text = "Q?"
query.from_user = MagicMock()
query.from_user.id = "777"
query.from_user.first_name = "Tester"
query.answer = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch.dict(os.environ, {"TELEGRAM_ALLOWED_USERS": "*"}, clear=False):
await adapter._handle_callback_query(update, context)
with cm._lock:
entry = cm._entries.get("cidD")
assert entry is not None
assert not entry.event.is_set()
query.answer.assert_called_once()
assert "invalid" in query.answer.call_args[1]["text"].lower()
# ===========================================================================
# Base adapter fallback render — text numbered list
# ===========================================================================
class TestBaseAdapterClarifyFallback:
"""Adapters without button overrides should render numbered text."""
@pytest.mark.asyncio
async def test_numbered_text_fallback(self):
from gateway.platforms.base import BasePlatformAdapter, SendResult
# Subclass just enough to instantiate
class _Stub(BasePlatformAdapter):
name = "stub"
def __init__(self):
# Skip base __init__ — we're not exercising it
self.sent: list = []
async def connect(self): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kw):
self.sent.append({"chat_id": chat_id, "content": content})
return SendResult(success=True, message_id="1")
async def edit(self, *a, **k): return SendResult(success=False)
async def get_history(self, *a, **k): return []
async def get_chat_info(self, *a, **k): return {}
adapter = _Stub()
result = await adapter.send_clarify(
chat_id="c",
question="Pick a fruit",
choices=["apple", "banana"],
clarify_id="x",
session_key="s",
)
assert result.success is True
assert len(adapter.sent) == 1
text = adapter.sent[0]["content"]
assert "Pick a fruit" in text
assert "1." in text and "apple" in text
assert "2." in text and "banana" in text
@pytest.mark.asyncio
async def test_open_ended_fallback_renders_question_only(self):
from gateway.platforms.base import BasePlatformAdapter, SendResult
class _Stub(BasePlatformAdapter):
name = "stub"
def __init__(self):
self.sent: list = []
async def connect(self): pass
async def disconnect(self): pass
async def send(self, chat_id, content, **kw):
self.sent.append(content)
return SendResult(success=True, message_id="1")
async def edit(self, *a, **k): return SendResult(success=False)
async def get_history(self, *a, **k): return []
async def get_chat_info(self, *a, **k): return {}
adapter = _Stub()
await adapter.send_clarify(
chat_id="c",
question="Free form?",
choices=None,
clarify_id="x",
session_key="s",
)
assert "Free form?" in adapter.sent[0]
# No numbered list — choices were empty
assert "1." not in adapter.sent[0]
+2 -47
View File
@@ -218,62 +218,17 @@ async def test_on_processing_complete_skipped_when_disabled(monkeypatch):
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_clears_reaction(monkeypatch):
"""Cancelled processing should clear the in-progress reaction.
Without this clear, the 👀 reaction lingers on the user's message
indefinitely (until another agent run swaps it for 👍/👎). On a
``/stop`` that ends a session, that reaction never gets cleaned up.
"""
async def test_on_processing_complete_cancelled_keeps_existing_reaction(monkeypatch):
"""Expected cancellation should not replace the in-progress reaction."""
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
# set_message_reaction with reaction=None clears all reactions on the
# message (Bot API documented semantics; equivalent to Bot API 10.0's
# deleteMessageReaction but works on PTB 22.6 already).
adapter._bot.set_message_reaction.assert_awaited_once_with(
chat_id=123,
message_id=456,
reaction=None,
)
@pytest.mark.asyncio
async def test_on_processing_complete_cancelled_skipped_when_disabled(monkeypatch):
"""Cancelled processing should not call the API when reactions are off."""
monkeypatch.delenv("TELEGRAM_REACTIONS", raising=False)
adapter = _make_adapter()
event = _make_event()
await adapter.on_processing_complete(event, ProcessingOutcome.CANCELLED)
adapter._bot.set_message_reaction.assert_not_awaited()
@pytest.mark.asyncio
async def test_clear_reactions_handles_api_error_gracefully(monkeypatch):
"""API errors during clear should not propagate."""
monkeypatch.setenv("TELEGRAM_REACTIONS", "true")
adapter = _make_adapter()
adapter._bot.set_message_reaction = AsyncMock(side_effect=RuntimeError("no perms"))
result = await adapter._clear_reactions("123", "456")
assert result is False
@pytest.mark.asyncio
async def test_clear_reactions_returns_false_without_bot(monkeypatch):
"""_clear_reactions should return False when bot is not available."""
adapter = _make_adapter()
adapter._bot = None
result = await adapter._clear_reactions("123", "456")
assert result is False
# ── config.py bridging ───────────────────────────────────────────────
+4 -14
View File
@@ -8,7 +8,7 @@ only renders as a voice bubble when explicitly flagged) and via
"""
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock
import pytest
@@ -106,16 +106,6 @@ async def test_base_adapter_routes_voice_tagged_telegram_ogg_media_tag_to_voice_
adapter.send_document.assert_not_awaited()
def _fake_runner(thread_meta):
"""Build a fake GatewayRunner-like object with the helper methods needed by
_deliver_media_from_response."""
runner = SimpleNamespace(
_thread_metadata_for_source=lambda source, anchor=None: thread_meta,
_reply_anchor_for_event=lambda event: None,
)
return runner
@pytest.mark.asyncio
async def test_streaming_delivery_routes_telegram_flac_media_tag_to_document_sender():
event = _event(thread_id="topic-1")
@@ -131,7 +121,7 @@ async def test_streaming_delivery_routes_telegram_flac_media_tag_to_document_sen
)
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
object(),
"MEDIA:/tmp/speech.flac",
event,
adapter,
@@ -160,7 +150,7 @@ async def test_streaming_delivery_routes_non_voice_telegram_ogg_media_tag_to_doc
)
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
object(),
"MEDIA:/tmp/speech.ogg",
event,
adapter,
@@ -191,7 +181,7 @@ async def test_streaming_delivery_routes_telegram_mp3_media_tag_to_voice_sender(
)
await GatewayRunner._deliver_media_from_response(
_fake_runner({"thread_id": "topic-1"}),
object(),
"MEDIA:/tmp/speech.mp3",
event,
adapter,
-3
View File
@@ -45,9 +45,6 @@ def _make_runner(hermes_home=None):
runner._pending_messages = {}
runner._pending_approvals = {}
runner._failed_platforms = {}
# config is accessed by _check_slash_access and quick_commands lookup;
# None makes policy_for_source return a disabled (allow-all) policy.
runner.config = None
# Bypass the destructive-slash confirm gate — this test exercises
# update-prompt interception, not the confirm prompt.
runner._read_user_config = lambda: {
+7 -7
View File
@@ -129,7 +129,7 @@ class TestVerboseCommand:
@pytest.mark.asyncio
async def test_defaults_to_all_when_no_tool_progress_set(self, tmp_path, monkeypatch):
"""When tool_progress is not in config, defaults to platform default then cycles."""
"""When tool_progress is not in config, defaults to 'all' then cycles to verbose."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
@@ -143,17 +143,17 @@ class TestVerboseCommand:
runner = _make_runner()
result = await runner._handle_verbose_command(_make_event())
# Telegram platform default is "new" → cycles to "all"
assert "ALL" in result
# Telegram default is "all" (high tier) → cycles to verbose
assert "VERBOSE" in result
saved = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert saved["display"]["platforms"]["telegram"]["tool_progress"] == "all"
assert saved["display"]["platforms"]["telegram"]["tool_progress"] == "verbose"
@pytest.mark.asyncio
async def test_per_platform_isolation(self, tmp_path, monkeypatch):
"""Cycling /verbose on Telegram doesn't change Slack's setting.
Without a global tool_progress, each platform uses its built-in
default: Telegram = 'new' (overridden high tier), Slack = 'off' (quiet Slack default).
default: Telegram = 'all' (high tier), Slack = 'off' (quiet Slack default).
"""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
@@ -178,8 +178,8 @@ class TestVerboseCommand:
saved = yaml.safe_load(config_path.read_text(encoding="utf-8"))
platforms = saved["display"]["platforms"]
# Telegram: new -> all (platform default = new)
assert platforms["telegram"]["tool_progress"] == "all"
# Telegram: all -> verbose (high tier default = all)
assert platforms["telegram"]["tool_progress"] == "verbose"
# Slack: off -> new (first /verbose cycle from quiet default)
assert platforms["slack"]["tool_progress"] == "new"
+5 -7
View File
@@ -242,14 +242,12 @@ class TestTelegramBotCommands:
tg_name = cmd.name.replace("-", "_")
assert tg_name not in names
def test_includes_builtin_commands_with_required_args(self):
"""Built-in arg-taking commands (e.g. /queue, /steer, /background)
are now included because their handlers return usage text when
invoked without arguments issue #24312."""
def test_excludes_commands_with_required_args(self):
names = {name for name, _ in telegram_bot_commands()}
assert "background" in names
assert "queue" in names
assert "steer" in names
assert "background" not in names
assert "queue" not in names
assert "steer" not in names
assert "background" in GATEWAY_KNOWN_COMMANDS
class TestSlackSubcommandMap:
@@ -2,11 +2,10 @@
from pathlib import Path
def test_profiles_nav_label_uses_short_copy():
def test_profiles_nav_label_uses_short_multi_agents_copy():
en_i18n = Path(__file__).resolve().parents[2] / "web" / "src" / "i18n" / "en.ts"
content = en_i18n.read_text(encoding="utf-8")
# Nav label should be the clean short form, not the old verbose string
assert 'profiles: "Profiles"' in content
assert "profiles : multi agents" not in content
assert 'profiles: "profiles : multi agents"' in content
assert "Profiles: Running Multiple Agents" not in content
@@ -1,61 +0,0 @@
"""Host-specific gating in ``hermes_cli.gateway._all_platforms()``.
Some messaging platforms can't function on every host. The gate lives
in one place ``_all_platforms()`` so the setup wizard, the curses
gateway-config menu, and any future picker all see the same filtered
list.
Currently:
- Matrix is hidden on Windows. The ``[matrix]`` extra pulls
``mautrix[encryption]`` -> ``python-olm``, which has no Windows wheel
and needs ``make`` + libolm to build from sdist. There's no native
Windows path that works.
"""
import sys
class TestMatrixHiddenOnWindows:
def test_matrix_present_on_linux(self, monkeypatch):
"""Sanity: matrix is still in the picker on Linux/macOS."""
import hermes_cli.gateway as gateway_mod
monkeypatch.setattr(gateway_mod.sys, "platform", "linux")
platforms = gateway_mod._all_platforms()
keys = {p["key"] for p in platforms}
assert "matrix" in keys, "matrix must be available on Linux"
def test_matrix_present_on_macos(self, monkeypatch):
import hermes_cli.gateway as gateway_mod
monkeypatch.setattr(gateway_mod.sys, "platform", "darwin")
platforms = gateway_mod._all_platforms()
keys = {p["key"] for p in platforms}
assert "matrix" in keys, "matrix must be available on macOS"
def test_matrix_hidden_on_windows(self, monkeypatch):
"""The actual gate: matrix must NOT appear on Windows."""
import hermes_cli.gateway as gateway_mod
monkeypatch.setattr(gateway_mod.sys, "platform", "win32")
platforms = gateway_mod._all_platforms()
keys = {p["key"] for p in platforms}
assert "matrix" not in keys, (
"matrix must be hidden on Windows — python-olm has no "
"Windows wheel and no native build path"
)
def test_other_platforms_unaffected_on_windows(self, monkeypatch):
"""Gating must only drop matrix, not collateral damage."""
import hermes_cli.gateway as gateway_mod
monkeypatch.setattr(gateway_mod.sys, "platform", "win32")
platforms = gateway_mod._all_platforms()
keys = {p["key"] for p in platforms}
# A representative sample of platforms that have no Windows
# blockers — picker should still surface them.
for must_have in ("telegram", "discord", "slack", "mattermost"):
assert must_have in keys, (
f"{must_have} disappeared from Windows picker — gate is "
"over-filtering"
)
-142
View File
@@ -7,7 +7,6 @@ from hermes_cli.models import (
is_nous_free_tier, partition_nous_models_by_tier,
check_nous_free_tier, _FREE_TIER_CACHE_TTL,
union_with_portal_free_recommendations,
union_with_portal_paid_recommendations,
)
import hermes_cli.models as _models_mod
@@ -507,147 +506,6 @@ class TestUnionWithPortalFreeRecommendations:
assert p["qwen/qwen3.6-plus"] == self._FREE
class TestUnionWithPortalPaidRecommendations:
"""Tests for union_with_portal_paid_recommendations.
Mirror of TestUnionWithPortalFreeRecommendations: the Portal's
paidRecommendedModels endpoint is the source of truth for what's a
blessed paid model *right now*. The in-repo curated list and
docs-hosted manifest can lag this helper guarantees newly-launched
paid models surface in the picker for paid-tier users without a CLI
release.
"""
_PAID = {"prompt": "0.000003", "completion": "0.000015"}
_FREE = {"prompt": "0", "completion": "0"}
def _payload(self, paid_models: list[str]) -> dict:
return {
"paidRecommendedModels": [
{"modelName": mid, "displayName": mid} for mid in paid_models
],
}
def test_adds_portal_paid_model_missing_from_curated(self):
"""A Portal-advertised paid model not in curated is prepended."""
curated = ["anthropic/claude-opus-4.6"]
pricing = {"anthropic/claude-opus-4.6": self._PAID}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
return_value=self._payload(["openai/gpt-5.4"]),
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids[0] == "openai/gpt-5.4" # prepended
assert "anthropic/claude-opus-4.6" in ids
# Existing pricing untouched
assert p["anthropic/claude-opus-4.6"] == self._PAID
def test_does_not_synthesize_pricing_for_paid_models(self):
"""Paid recommendations missing from live pricing get no synthetic entry.
Synthesizing zero pricing (like the free helper does) would mislead
:func:`partition_nous_models_by_tier` into treating them as free;
synthesizing a non-zero placeholder would lie to the user. The
right thing is to leave pricing absent so the picker shows a blank
column until the live pricing endpoint catches up.
"""
curated = ["anthropic/claude-opus-4.6"]
pricing = {"anthropic/claude-opus-4.6": self._PAID}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
return_value=self._payload(["openai/gpt-5.4"]),
):
_, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert "openai/gpt-5.4" not in p
assert p["anthropic/claude-opus-4.6"] == self._PAID
def test_does_not_duplicate_curated_entries(self):
"""A Portal paid model already in curated is not duplicated."""
curated = ["openai/gpt-5.4", "anthropic/claude-opus-4.6"]
pricing = {
"openai/gpt-5.4": self._PAID,
"anthropic/claude-opus-4.6": self._PAID,
}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
return_value=self._payload(["openai/gpt-5.4"]),
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == curated
assert p == pricing
def test_empty_payload_returns_inputs_unchanged(self):
"""Empty Portal response leaves curated + pricing untouched."""
curated = ["a", "b"]
pricing = {"a": self._PAID}
with patch("hermes_cli.models.fetch_nous_recommended_models", return_value={}):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == curated
assert p == pricing
def test_missing_paidRecommendedModels_key(self):
"""Portal payload without paidRecommendedModels degrades gracefully."""
curated = ["a"]
pricing = {"a": self._PAID}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
return_value={"freeRecommendedModels": [{"modelName": "x"}]},
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == curated
assert p == pricing
def test_fetch_failure_returns_inputs(self):
"""Network failures don't blow up the picker."""
curated = ["a"]
pricing = {"a": self._PAID}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
side_effect=RuntimeError("network down"),
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == curated
assert p == pricing
def test_invalid_entries_skipped(self):
"""Non-dict / missing-modelName entries are filtered out."""
curated = ["a"]
pricing = {"a": self._PAID}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
return_value={
"paidRecommendedModels": [
"not-a-dict",
{"displayName": "no-modelName"},
{"modelName": ""},
{"modelName": "openai/gpt-5.4"},
]
},
):
ids, p = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == ["openai/gpt-5.4", "a"]
# No synthetic entry — pricing is untouched.
assert "openai/gpt-5.4" not in p
def test_preserves_relative_order_of_new_paid_models(self):
"""Multiple new paid models are prepended in payload order."""
curated = ["anthropic/claude-opus-4.6"]
pricing = {"anthropic/claude-opus-4.6": self._PAID}
with patch(
"hermes_cli.models.fetch_nous_recommended_models",
return_value=self._payload(["openai/gpt-5.4", "openai/gpt-5.5"]),
):
ids, _ = union_with_portal_paid_recommendations(curated, pricing, "")
assert ids == [
"openai/gpt-5.4",
"openai/gpt-5.5",
"anthropic/claude-opus-4.6",
]
class TestCheckNousFreeTierCache:
"""Tests for the TTL cache on check_nous_free_tier()."""
@@ -6,7 +6,6 @@ rather than leaving zombie processes or telling users to manually restart
when launchd will auto-respawn.
"""
import os
import subprocess
from types import SimpleNamespace
from unittest.mock import patch, MagicMock
@@ -1069,18 +1068,13 @@ class TestFindGatewayPidsExclude:
def test_excludes_specified_pids(self, monkeypatch):
monkeypatch.setattr(gateway_cli, "is_windows", lambda: False)
# Bypass /proc scan so the subprocess (ps) fallback is used
_real_isdir = os.path.isdir
monkeypatch.setattr("os.path.isdir", lambda p: False if p == "/proc" else _real_isdir(p))
monkeypatch.setattr(gateway_cli, "_get_service_pids", lambda: set())
monkeypatch.setattr(gateway_cli, "_get_ancestor_pids", lambda: {999})
def fake_run(cmd, **kwargs):
return subprocess.CompletedProcess(
cmd, 0,
stdout=(
"100 python gateway/run.py\n"
"200 python gateway/run.py\n"
"user 100 0.0 0.0 0 0 ? S 00:00 0:00 python gateway/run.py\n"
"user 200 0.0 0.0 0 0 ? S 00:00 0:00 python gateway/run.py\n"
),
stderr="",
)
@@ -1088,24 +1082,19 @@ class TestFindGatewayPidsExclude:
monkeypatch.setattr(gateway_cli.subprocess, "run", fake_run)
monkeypatch.setattr("os.getpid", lambda: 999)
pids = gateway_cli.find_gateway_pids(exclude_pids={100}, all_profiles=True)
pids = gateway_cli.find_gateway_pids(exclude_pids={100})
assert 100 not in pids
assert 200 in pids
def test_no_exclude_returns_all(self, monkeypatch):
monkeypatch.setattr(gateway_cli, "is_windows", lambda: False)
# Bypass /proc scan so the subprocess (ps) fallback is used
_real_isdir = os.path.isdir
monkeypatch.setattr("os.path.isdir", lambda p: False if p == "/proc" else _real_isdir(p))
monkeypatch.setattr(gateway_cli, "_get_service_pids", lambda: set())
monkeypatch.setattr(gateway_cli, "_get_ancestor_pids", lambda: {999})
def fake_run(cmd, **kwargs):
return subprocess.CompletedProcess(
cmd, 0,
stdout=(
"100 python gateway/run.py\n"
"200 python gateway/run.py\n"
"user 100 0.0 0.0 0 0 ? S 00:00 0:00 python gateway/run.py\n"
"user 200 0.0 0.0 0 0 ? S 00:00 0:00 python gateway/run.py\n"
),
stderr="",
)
@@ -1113,7 +1102,7 @@ class TestFindGatewayPidsExclude:
monkeypatch.setattr(gateway_cli.subprocess, "run", fake_run)
monkeypatch.setattr("os.getpid", lambda: 999)
pids = gateway_cli.find_gateway_pids(all_profiles=True)
pids = gateway_cli.find_gateway_pids()
assert 100 in pids
assert 200 in pids
@@ -1122,10 +1111,6 @@ class TestFindGatewayPidsExclude:
profile_dir.mkdir(parents=True)
monkeypatch.setattr(gateway_cli, "is_windows", lambda: False)
monkeypatch.setattr(gateway_cli, "get_hermes_home", lambda: profile_dir)
# Bypass /proc scan so the subprocess (ps) fallback is used
_real_isdir = os.path.isdir
monkeypatch.setattr("os.path.isdir", lambda p: False if p == "/proc" else _real_isdir(p))
monkeypatch.setattr(gateway_cli, "_get_ancestor_pids", lambda: {999})
def fake_run(cmd, **kwargs):
return subprocess.CompletedProcess(
@@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""A minimal in-process LSP server used by tests.
Speaks just enough LSP to drive :class:`agent.lsp.client.LSPClient`
Speaks just enough LSP to drive :class:`plugins.lsp.client.LSPClient`
through a full lifecycle: ``initialize``, ``initialized``,
``textDocument/didOpen``, ``textDocument/didChange``, then a
``textDocument/publishDiagnostics`` notification followed by
+154
View File
@@ -0,0 +1,154 @@
"""Integration test: LSP plugin skips non-local paths.
The host-side LSP server can't see files inside Docker/Modal/SSH
sandboxes. The plugin's ``_pre_tool_call`` uses ``os.path.exists``
on the parent directory as a heuristic local-only gate. These tests
verify the plugin hooks skip when the path clearly doesn't exist on
the host filesystem.
"""
from __future__ import annotations
import os
import sys
from unittest.mock import patch
import pytest
@pytest.fixture(autouse=True)
def _isolate_plugin_state():
"""Reset plugin module state between tests."""
# Import the plugin and clear any service state
from plugins.lsp import _baselines
_baselines.clear()
yield
_baselines.clear()
def test_pre_tool_call_skips_nonexistent_parent_dir():
"""pre_tool_call returns early when the path's parent dir doesn't exist (Docker/SSH heuristic)."""
from plugins import lsp as lsp_plugin
# Simulate a path that doesn't exist on host (e.g., inside Docker)
fake_path = "/nonexistent-docker-container-fs/app/main.py"
# Mock _ensure_service to return a mock service
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"snapshot_baseline": lambda self, p: None,
})()
with patch.object(lsp_plugin, "_service", mock_service):
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": fake_path},
session_id="test-session",
tool_call_id="call-1",
)
# Baseline should NOT be captured because parent dir doesn't exist
assert ("test-session", os.path.normpath(fake_path)) not in lsp_plugin._baselines
def test_pre_tool_call_proceeds_for_local_path(tmp_path):
"""pre_tool_call captures baseline when path exists locally."""
from plugins import lsp as lsp_plugin
# Create a real file so the parent-dir check passes
test_file = tmp_path / "test.py"
test_file.write_text("x = 1\n")
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"snapshot_baseline": lambda self, p: None,
})()
with patch.object(lsp_plugin, "_service", mock_service):
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": str(test_file)},
session_id="test-session",
tool_call_id="call-2",
)
# Baseline SHOULD be captured because the local path exists
assert ("test-session", str(test_file)) in lsp_plugin._baselines
def test_pre_tool_call_skips_non_write_tools():
"""pre_tool_call is a no-op for tools other than write_file/patch."""
from plugins import lsp as lsp_plugin
lsp_plugin._pre_tool_call(
tool_name="terminal",
args={"command": "ls"},
session_id="test-session",
tool_call_id="call-3",
)
assert len(lsp_plugin._baselines) == 0
def test_pre_tool_call_skips_v4a_patch():
"""pre_tool_call skips V4A multi-file patches (has 'patch' key, no 'path' key)."""
from plugins import lsp as lsp_plugin
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"snapshot_baseline": lambda self, p: None,
})()
with patch.object(lsp_plugin, "_service", mock_service):
lsp_plugin._pre_tool_call(
tool_name="patch",
args={"patch": "*** Begin Patch\n*** Update File: foo.py\n..."},
session_id="test-session",
tool_call_id="call-4",
)
assert len(lsp_plugin._baselines) == 0
def test_transform_tool_result_injects_diagnostics(tmp_path):
"""transform_tool_result appends lsp_diagnostics field to JSON result."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "test.py"
abs_path = str(test_file)
# Pre-populate a baseline entry (simulating pre_tool_call ran)
lsp_plugin._baselines.add(("test-session", abs_path))
# Mock service that returns a diagnostic
mock_service = type("MockService", (), {
"is_active": lambda self: True,
"enabled_for": lambda self, p: True,
"get_diagnostics_sync": lambda self, p, delta=True, timeout=3.0: [
{
"severity": 1,
"range": {"start": {"line": 1, "character": 4}},
"message": "Type error: str is not int",
"code": "reportReturnType",
"source": "Pyright",
}
],
})()
with patch.object(lsp_plugin, "_service", mock_service):
result = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path},
result='{"bytes_written": 42, "dirs_created": false}',
session_id="test-session",
tool_call_id="call-5",
)
assert result is not None
import json
data = json.loads(result)
assert "lsp_diagnostics" in data
assert "reportReturnType" in data["lsp_diagnostics"]
assert "bytes_written" in data # Original fields preserved
@@ -14,7 +14,13 @@ from pathlib import Path
import pytest
from agent.lsp.client import LSPClient
from plugins.lsp.client import LSPClient
# These tests spawn a real subprocess (mock LSP server) and terminate it
# via SIGTERM on shutdown. The conftest live-system guard blocks os.kill
# for PIDs outside the test process subtree; bypass it here because this
# is intentional subprocess lifecycle management.
pytestmark = pytest.mark.live_system_guard_bypass
MOCK_SERVER = str(Path(__file__).parent / "_mock_lsp_server.py")
@@ -11,7 +11,7 @@ import logging
import pytest
from agent.lsp import eventlog
from plugins.lsp import eventlog
@pytest.fixture(autouse=True)
+203
View File
@@ -0,0 +1,203 @@
"""Integration test: full hook flow pre_tool_call → write → transform_tool_result.
Verifies that the plugin hook wiring correctly:
1. Captures a baseline in pre_tool_call
2. Passes through a write (no interference)
3. Injects diagnostics in transform_tool_result
Uses a mocked LSP service to avoid requiring pyright/gopls in CI.
"""
from __future__ import annotations
import json
import os
from unittest.mock import patch
import pytest
@pytest.fixture(autouse=True)
def _isolate():
"""Clear plugin state between tests."""
from plugins import lsp as lsp_plugin
lsp_plugin._baselines.clear()
old_service = lsp_plugin._service
yield
lsp_plugin._baselines.clear()
lsp_plugin._service = old_service
class FakeLSPService:
"""Minimal LSP service mock that returns canned diagnostics."""
def __init__(self, diagnostics=None):
self._diagnostics = diagnostics or []
def is_active(self):
return True
def enabled_for(self, path):
return path.endswith(".py") or path.endswith(".ts")
def snapshot_baseline(self, path):
pass # no-op, just marks that we visited
def get_diagnostics_sync(self, path, delta=True, timeout=3.0):
return self._diagnostics
def shutdown(self):
pass
def test_full_hook_flow_produces_diagnostics(tmp_path):
"""Exercise pre_tool_call → (write) → transform_tool_result end-to-end."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "broken.py"
test_file.write_text("x: int = 'oops'\n")
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[
{
"severity": 1,
"range": {"start": {"line": 0, "character": 9}},
"message": 'Expression of type "str" is incompatible with declared type "int"',
"code": "reportAssignmentType",
"source": "Pyright",
}
])
with patch.object(lsp_plugin, "_service", fake_service):
# Step 1: pre_tool_call captures baseline
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 'oops'\n"},
session_id="test-session",
tool_call_id="call-001",
)
assert ("test-session", abs_path) in lsp_plugin._baselines
# Step 2: simulate the write completing (tool output)
tool_result = json.dumps({
"bytes_written": 16,
"dirs_created": False,
"lint": None,
})
# Step 3: transform_tool_result injects diagnostics
transformed = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 'oops'\n"},
result=tool_result,
session_id="test-session",
tool_call_id="call-001",
)
# Verify: result is valid JSON with lsp_diagnostics field
assert transformed is not None
data = json.loads(transformed)
assert "lsp_diagnostics" in data
assert "reportAssignmentType" in data["lsp_diagnostics"]
assert "Pyright" in data["lsp_diagnostics"]
# Original fields preserved
assert data["bytes_written"] == 16
assert data["dirs_created"] is False
# Baseline consumed (removed after use)
assert ("test-session", abs_path) not in lsp_plugin._baselines
def test_hook_flow_returns_none_when_no_diagnostics(tmp_path):
"""transform_tool_result returns None (no modification) when LSP is clean."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "clean.py"
test_file.write_text("x: int = 42\n")
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[]) # Clean — no errors
with patch.object(lsp_plugin, "_service", fake_service):
lsp_plugin._pre_tool_call(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 42\n"},
session_id="test-session",
tool_call_id="call-002",
)
transformed = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path, "content": "x: int = 42\n"},
result='{"bytes_written": 12}',
session_id="test-session",
tool_call_id="call-002",
)
# No diagnostics → return None → result unchanged
assert transformed is None
def test_hook_flow_no_baseline_means_no_injection(tmp_path):
"""transform_tool_result does nothing if pre_tool_call didn't fire."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "no_baseline.py"
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[
{"severity": 1, "range": {"start": {"line": 0, "character": 0}},
"message": "error", "code": "E1", "source": "test"}
])
with patch.object(lsp_plugin, "_service", fake_service):
# Skip pre_tool_call — simulate a case where it didn't fire
transformed = lsp_plugin._transform_tool_result(
tool_name="write_file",
args={"path": abs_path},
result='{"bytes_written": 5}',
session_id="test-session",
tool_call_id="call-003",
)
# No baseline was captured, so no injection
assert transformed is None
def test_hook_flow_patch_tool(tmp_path):
"""Hook flow works for patch tool (single-path mode)."""
from plugins import lsp as lsp_plugin
test_file = tmp_path / "patched.py"
test_file.write_text("def f() -> int:\n return 'wrong'\n")
abs_path = str(test_file)
fake_service = FakeLSPService(diagnostics=[
{
"severity": 1,
"range": {"start": {"line": 1, "character": 11}},
"message": 'Cannot return "str" from function with return type "int"',
"code": "reportReturnType",
"source": "Pyright",
}
])
with patch.object(lsp_plugin, "_service", fake_service):
lsp_plugin._pre_tool_call(
tool_name="patch",
args={"path": abs_path, "old_string": "return 42", "new_string": "return 'wrong'"},
session_id="test-session",
tool_call_id="call-004",
)
transformed = lsp_plugin._transform_tool_result(
tool_name="patch",
args={"path": abs_path, "old_string": "return 42", "new_string": "return 'wrong'"},
result='{"success": true, "diff": "..."}',
session_id="test-session",
tool_call_id="call-004",
)
assert transformed is not None
data = json.loads(transformed)
assert "lsp_diagnostics" in data
assert "reportReturnType" in data["lsp_diagnostics"]
@@ -15,7 +15,7 @@ import asyncio
import json
import pytest
from agent.lsp.protocol import (
from plugins.lsp.protocol import (
ERROR_CONTENT_MODIFIED,
ERROR_METHOD_NOT_FOUND,
LSPProtocolError,
@@ -1,7 +1,7 @@
"""Tests for the diagnostic reporter (formatting layer)."""
from __future__ import annotations
from agent.lsp.reporter import (
from plugins.lsp.reporter import (
DEFAULT_SEVERITIES,
MAX_PER_FILE,
format_diagnostic,
@@ -13,8 +13,8 @@ from pathlib import Path
import pytest
from agent.lsp.manager import LSPService
from agent.lsp.servers import (
from plugins.lsp.manager import LSPService
from plugins.lsp.servers import (
SERVERS,
ServerContext,
ServerDef,
@@ -6,7 +6,7 @@ from pathlib import Path
import pytest
from agent.lsp.workspace import (
from plugins.lsp.workspace import (
clear_cache,
find_git_worktree,
is_inside_workspace,
@@ -372,36 +372,29 @@ class TestSupportsLongLivedAnthropicCache:
)
assert agent._supports_long_lived_anthropic_cache() is True
def test_nous_portal_qwen_NOT_long_lived(self):
# Portal Qwen still gets cache_control markers via the standard
# system_and_3 5m layout (see _anthropic_prompt_cache_policy
# tests above), but it must NOT ride the prefix_and_2 1h layout.
# Alibaba DashScope (the upstream for every Qwen route, incl.
# Portal -> OpenRouter -> Alibaba) only supports a single
# ``ephemeral`` TTL of 5 minutes; ttl="1h" markers are silently
# ignored, so the high-value tools[-1] + system-prefix
# breakpoints don't land. Stay on system_and_3 instead.
def test_nous_portal_qwen_supported(self):
# Portal Qwen rides the same OpenRouter-equivalent transport as
# Portal Claude; long-lived (1h cross-session) cache_control
# markers apply identically.
agent = _make_agent(
provider="nous",
base_url="https://inference-api.nousresearch.com/v1",
api_mode="chat_completions",
model="qwen3.6-plus",
)
assert agent._supports_long_lived_anthropic_cache() is False
assert agent._supports_long_lived_anthropic_cache() is True
def test_nous_portal_qwen_vendored_slug_NOT_long_lived(self):
def test_nous_portal_qwen_vendored_slug_supported(self):
agent = _make_agent(
provider="nous",
base_url="https://inference-api.nousresearch.com/v1",
api_mode="chat_completions",
model="qwen/qwen3.6-plus",
)
assert agent._supports_long_lived_anthropic_cache() is False
assert agent._supports_long_lived_anthropic_cache() is True
def test_nous_portal_non_claude_rejected(self):
# Portal long-lived cache scope is now Claude-only. Qwen
# rejection is covered by the dedicated tests above; this
# covers everything else (gpt, etc.).
def test_nous_portal_non_claude_non_qwen_rejected(self):
# Portal long-lived cache scope mirrors policy: Claude or Qwen only.
agent = _make_agent(
provider="nous",
base_url="https://inference-api.nousresearch.com/v1",
@@ -182,7 +182,7 @@ class TestClientCacheBoundedGrowth:
_get_cached_client,
)
key = ("test_replace", True, "", "", "", (), False, "")
key = ("test_replace", True, "", "", "", (), False)
# Simulate a stale entry from a closed loop
old_loop = asyncio.new_event_loop()
@@ -1,308 +0,0 @@
"""Tests for the per-turn file-mutation verifier footer.
Covers the three moving pieces:
1. ``_extract_file_mutation_targets`` pulls file paths from write_file /
patch (replace + V4A) tool-call argument dicts.
2. ``AIAgent._record_file_mutation_result`` builds the per-turn state
dict, removing entries when a later success supersedes an earlier
failure for the same path.
3. ``AIAgent._format_file_mutation_failure_footer`` renders the dict
as a user-visible advisory.
Regression target: the "Ben Eng llm-wiki" session where grok-4.1-fast
batched parallel patches, half failed, and the model summarised the
turn claiming every file was edited. This verifier makes over-claiming
structurally impossible past the model: the user always sees the real
list of files that did NOT change.
"""
from __future__ import annotations
import json
import pytest
from run_agent import (
AIAgent,
_FILE_MUTATING_TOOLS,
_extract_error_preview,
_extract_file_mutation_targets,
)
# ---------------------------------------------------------------------------
# _extract_file_mutation_targets
# ---------------------------------------------------------------------------
class TestExtractFileMutationTargets:
def test_non_mutating_tool_returns_empty(self):
assert _extract_file_mutation_targets("read_file", {"path": "/x"}) == []
assert _extract_file_mutation_targets("terminal", {"command": "ls"}) == []
def test_write_file_returns_single_path(self):
out = _extract_file_mutation_targets("write_file", {"path": "/tmp/a.md", "content": "x"})
assert out == ["/tmp/a.md"]
def test_write_file_missing_path_returns_empty(self):
assert _extract_file_mutation_targets("write_file", {"content": "x"}) == []
def test_patch_replace_mode_returns_path(self):
args = {"mode": "replace", "path": "/tmp/a.md", "old_string": "x", "new_string": "y"}
assert _extract_file_mutation_targets("patch", args) == ["/tmp/a.md"]
def test_patch_default_mode_is_replace(self):
# Mode omitted — schema default is ``replace``.
args = {"path": "/tmp/a.md", "old_string": "x", "new_string": "y"}
assert _extract_file_mutation_targets("patch", args) == ["/tmp/a.md"]
def test_patch_v4a_single_file(self):
body = (
"*** Begin Patch\n"
"*** Update File: /tmp/a.md\n"
"@@ ctx @@\n"
" line1\n"
"-bad\n"
"+good\n"
"*** End Patch\n"
)
args = {"mode": "patch", "patch": body}
assert _extract_file_mutation_targets("patch", args) == ["/tmp/a.md"]
def test_patch_v4a_multi_file(self):
body = (
"*** Begin Patch\n"
"*** Update File: /tmp/a.md\n"
"@@ @@\n-a\n+b\n"
"*** Add File: /tmp/new.md\n"
"+fresh\n"
"*** Delete File: /tmp/old.md\n"
"*** End Patch\n"
)
args = {"mode": "patch", "patch": body}
paths = _extract_file_mutation_targets("patch", args)
assert paths == ["/tmp/a.md", "/tmp/new.md", "/tmp/old.md"]
def test_patch_v4a_missing_body_returns_empty(self):
assert _extract_file_mutation_targets("patch", {"mode": "patch"}) == []
assert _extract_file_mutation_targets("patch", {"mode": "patch", "patch": ""}) == []
# ---------------------------------------------------------------------------
# _extract_error_preview
# ---------------------------------------------------------------------------
class TestExtractErrorPreview:
def test_json_error_field_preferred(self):
raw = json.dumps({"success": False, "error": "Could not find old_string in /tmp/x"})
assert _extract_error_preview(raw) == "Could not find old_string in /tmp/x"
def test_plain_string_falls_through(self):
assert _extract_error_preview("Error executing tool: boom") == "Error executing tool: boom"
def test_long_preview_truncated(self):
long = "x" * 500
out = _extract_error_preview(long, max_len=50)
assert len(out) <= 50
assert out.endswith("")
def test_none_returns_empty(self):
assert _extract_error_preview(None) == ""
# ---------------------------------------------------------------------------
# _record_file_mutation_result — state transitions
# ---------------------------------------------------------------------------
def _bare_agent() -> AIAgent:
"""Skip __init__ and only attach the per-turn state dict.
AIAgent.__init__ takes ~60 parameters and touches network, auth, and
the filesystem. For these tests we only need the two methods
``_record_file_mutation_result`` and ``_format_file_mutation_failure_footer``.
Using ``object.__new__`` mirrors the gateway-test pattern documented in
the agent pitfalls list.
"""
agent = object.__new__(AIAgent)
agent._turn_failed_file_mutations = {}
return agent
class TestRecordFileMutationResult:
def test_non_mutating_tool_ignored(self):
agent = _bare_agent()
agent._record_file_mutation_result(
"read_file", {"path": "/tmp/x"}, "{}", is_error=True,
)
assert agent._turn_failed_file_mutations == {}
def test_failure_recorded(self):
agent = _bare_agent()
result = json.dumps({"success": False, "error": "Could not find old_string"})
agent._record_file_mutation_result(
"patch", {"mode": "replace", "path": "/tmp/a.md", "old_string": "x", "new_string": "y"},
result, is_error=True,
)
state = agent._turn_failed_file_mutations
assert "/tmp/a.md" in state
assert state["/tmp/a.md"]["tool"] == "patch"
assert "Could not find old_string" in state["/tmp/a.md"]["error_preview"]
def test_success_removes_prior_failure(self):
agent = _bare_agent()
# First attempt fails
agent._record_file_mutation_result(
"patch", {"mode": "replace", "path": "/tmp/a.md", "old_string": "x", "new_string": "y"},
json.dumps({"error": "not found"}), is_error=True,
)
assert "/tmp/a.md" in agent._turn_failed_file_mutations
# Second attempt with corrected old_string succeeds
agent._record_file_mutation_result(
"patch", {"mode": "replace", "path": "/tmp/a.md", "old_string": "real", "new_string": "fixed"},
json.dumps({"success": True, "diff": "..."}), is_error=False,
)
assert agent._turn_failed_file_mutations == {}
def test_repeated_failure_keeps_first_error(self):
agent = _bare_agent()
agent._record_file_mutation_result(
"patch", {"mode": "replace", "path": "/tmp/a.md", "old_string": "v1", "new_string": "y"},
json.dumps({"error": "first error"}), is_error=True,
)
agent._record_file_mutation_result(
"patch", {"mode": "replace", "path": "/tmp/a.md", "old_string": "v2", "new_string": "y"},
json.dumps({"error": "second error"}), is_error=True,
)
# Keep the original error — swapping to the latest would obscure
# the initial root cause.
assert "first error" in agent._turn_failed_file_mutations["/tmp/a.md"]["error_preview"]
def test_v4a_multi_file_all_tracked(self):
agent = _bare_agent()
body = (
"*** Begin Patch\n"
"*** Update File: /tmp/a.md\n@@ @@\n-a\n+b\n"
"*** Update File: /tmp/b.md\n@@ @@\n-a\n+b\n"
"*** End Patch\n"
)
agent._record_file_mutation_result(
"patch", {"mode": "patch", "patch": body},
json.dumps({"error": "parse failure"}), is_error=True,
)
assert set(agent._turn_failed_file_mutations) == {"/tmp/a.md", "/tmp/b.md"}
def test_no_state_dict_silent_noop(self):
"""When called outside run_conversation the state dict is absent.
The record helper must never raise a tool dispatched from, say,
a direct ``chat()`` call should not blow up the call site just
because the verifier state hasn't been initialised.
"""
agent = object.__new__(AIAgent) # no state attached
# Should not raise
agent._record_file_mutation_result(
"patch", {"mode": "replace", "path": "/tmp/a.md"},
json.dumps({"error": "x"}), is_error=True,
)
def test_missing_path_arg_recorded_nowhere(self):
agent = _bare_agent()
agent._record_file_mutation_result(
"patch", {"mode": "replace"}, # no path
json.dumps({"error": "path required"}), is_error=True,
)
# No path → nothing to key on, state stays empty. The per-turn
# state is about file paths, not individual tool-call IDs.
assert agent._turn_failed_file_mutations == {}
# ---------------------------------------------------------------------------
# _format_file_mutation_failure_footer
# ---------------------------------------------------------------------------
class TestFormatFooter:
def test_empty_returns_empty_string(self):
assert AIAgent._format_file_mutation_failure_footer({}) == ""
def test_single_failure(self):
out = AIAgent._format_file_mutation_failure_footer(
{"/tmp/a.md": {"tool": "patch", "error_preview": "Could not find old_string"}},
)
assert "1 file(s) were NOT modified" in out
assert "/tmp/a.md" in out
assert "Could not find old_string" in out
assert "git status" in out # user-actionable hint
def test_truncation_at_10_entries(self):
failed = {
f"/tmp/f{i}.md": {"tool": "patch", "error_preview": "err"}
for i in range(15)
}
out = AIAgent._format_file_mutation_failure_footer(failed)
assert "15 file(s) were NOT modified" in out
assert "… and 5 more" in out
# Ten file bullets + header + "and X more" line
lines = out.split("\n")
bullet_lines = [ln for ln in lines if ln.lstrip().startswith("")]
assert len(bullet_lines) == 11 # 10 shown + 1 summary
# ---------------------------------------------------------------------------
# _file_mutation_verifier_enabled — env + config precedence
# ---------------------------------------------------------------------------
class TestVerifierEnabled:
def test_default_is_enabled(self, monkeypatch):
monkeypatch.delenv("HERMES_FILE_MUTATION_VERIFIER", raising=False)
agent = _bare_agent()
# With no env and no config present, safe default is True.
# load_config may surface a user config.yaml in some envs — stub it.
import hermes_cli.config as _cfg_mod
monkeypatch.setattr(_cfg_mod, "load_config", lambda: {})
assert agent._file_mutation_verifier_enabled() is True
@pytest.mark.parametrize("value", ["0", "false", "FALSE", "no", "off"])
def test_env_disables(self, monkeypatch, value):
monkeypatch.setenv("HERMES_FILE_MUTATION_VERIFIER", value)
agent = _bare_agent()
assert agent._file_mutation_verifier_enabled() is False
def test_env_enables_over_config(self, monkeypatch):
monkeypatch.setenv("HERMES_FILE_MUTATION_VERIFIER", "1")
import hermes_cli.config as _cfg_mod
monkeypatch.setattr(
_cfg_mod, "load_config",
lambda: {"display": {"file_mutation_verifier": False}},
)
agent = _bare_agent()
assert agent._file_mutation_verifier_enabled() is True
def test_config_disables_when_no_env(self, monkeypatch):
monkeypatch.delenv("HERMES_FILE_MUTATION_VERIFIER", raising=False)
import hermes_cli.config as _cfg_mod
monkeypatch.setattr(
_cfg_mod, "load_config",
lambda: {"display": {"file_mutation_verifier": False}},
)
agent = _bare_agent()
assert agent._file_mutation_verifier_enabled() is False
# ---------------------------------------------------------------------------
# Module-level invariants
# ---------------------------------------------------------------------------
def test_file_mutating_tools_set_shape():
"""write_file + patch are the only tools the verifier tracks.
Guard rail: if someone adds a third file-mutating tool (e.g. a new
``append_file``), they should also audit whether the verifier should
track it. This test fails loudly on unilateral additions.
"""
assert _FILE_MUTATING_TOOLS == frozenset({"write_file", "patch"})
+1 -2
View File
@@ -945,8 +945,7 @@ class TestAuxiliaryClientProviderPriority:
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
from agent.auxiliary_client import get_text_auxiliary_client
with patch("agent.auxiliary_client._read_nous_auth", return_value={"access_token": "nous-tok"}), \
patch("agent.auxiliary_client.OpenAI") as mock, \
patch("hermes_cli.models.get_nous_recommended_aux_model", return_value=None):
patch("agent.auxiliary_client.OpenAI") as mock:
client, model = get_text_auxiliary_client()
assert model == "google/gemini-3-flash-preview"
-1
View File
@@ -169,7 +169,6 @@ class TestEphemeralMaxOutputTokens:
agent.reasoning_config = None
agent._is_anthropic_oauth = False
agent._ephemeral_max_output_tokens = None
agent._use_long_lived_prefix_cache = False
compressor = MagicMock()
compressor.context_length = 200_000
+12 -63
View File
@@ -11,73 +11,22 @@ def _load_optional_dependencies():
return project["optional-dependencies"]
def test_matrix_extra_not_in_all():
"""The [matrix] extra pulls `mautrix[encryption]` -> `python-olm`,
which has Linux-only wheels and no native build path on Windows or
modern macOS (archived libolm, C++ errors with Clang 21+).
With matrix in [all], `uv sync --locked` on Windows tried to build
python-olm from sdist and failed on `make`. As of 2026-05-12 the
[matrix] extra is excluded from [all] entirely and routed through
`tools/lazy_deps.py` (LAZY_DEPS["platform.matrix"]) installs at
first use, where the user is expected to have a toolchain.
"""
def test_matrix_extra_linux_only_in_all():
"""mautrix[encryption] depends on python-olm which is upstream-broken on
modern macOS (archived libolm, C++ errors with Clang 21+). The [matrix]
extra is included in [all] but gated to Linux via a platform marker so
that ``hermes update`` doesn't fail on macOS."""
optional_dependencies = _load_optional_dependencies()
assert "matrix" in optional_dependencies, "[matrix] extra must still exist for explicit `pip install hermes-agent[matrix]`"
# Must NOT appear in [all] in any form — neither unconditional nor
# platform-gated. Lazy-install handles it.
matrix_in_all = [
assert "matrix" in optional_dependencies
# Must NOT be unconditional — python-olm has no macOS wheels.
assert "hermes-agent[matrix]" not in optional_dependencies["all"]
# Must be present with a Linux platform marker.
linux_gated = [
dep for dep in optional_dependencies["all"]
if "matrix" in dep
if "matrix" in dep and "linux" in dep
]
assert not matrix_in_all, (
"matrix must not appear in [all] — it's lazy-installed via "
"tools/lazy_deps.py LAZY_DEPS['platform.matrix']. Found: "
f"{matrix_in_all}"
)
def test_lazy_installable_extras_excluded_from_all():
"""Policy (2026-05-12): every extra that has a `LAZY_DEPS` entry
in `tools/lazy_deps.py` must be excluded from [all].
The lazy-install system exists so one quarantined PyPI release
(e.g. mistralai 2.4.6) can't break every fresh install. Putting a
backend in BOTH [all] and LAZY_DEPS defeats that fresh installs
eager-install it and inherit whatever's broken upstream.
If you're tempted to add an opt-in backend to [all] for "convenience,"
add it to `LAZY_DEPS` instead so it installs at first use.
"""
optional_dependencies = _load_optional_dependencies()
# Hard-coded mirror of the extras that are in LAZY_DEPS as of
# 2026-05-12. This list intentionally duplicates rather than
# imports tools/lazy_deps.py so the test stays a contract — if
# someone adds a new lazy-install backend, they have to update
# this list AND verify [all] doesn't contain it.
lazy_covered_extras = {
"anthropic", "bedrock",
"exa", "firecrawl", "parallel-web",
"fal",
"edge-tts", "tts-premium",
"voice", # faster-whisper / sounddevice / numpy
"modal", "daytona", "vercel",
"messaging", "slack", "matrix", "dingtalk", "feishu",
"honcho", "hindsight",
}
all_extra_specs = optional_dependencies["all"]
for extra in lazy_covered_extras:
offending = [
spec for spec in all_extra_specs
if f"hermes-agent[{extra}]" in spec
]
assert not offending, (
f"[{extra}] is in [all] but also in LAZY_DEPS. "
f"Remove it from [all] in pyproject.toml — it lazy-installs "
f"at first use. Found in [all]: {offending}"
)
assert linux_gated, "expected hermes-agent[matrix] with sys_platform=='linux' marker in [all]"
def test_messaging_extra_includes_qrcode_for_weixin_setup():
@@ -193,118 +193,6 @@ class TestManagedPersistenceMode:
assert tab_requests[0]["userId"] == tab_requests[1]["userId"]
class TestConfiguredCamofoxIdentity:
"""Externally managed Camofox sessions can provide their own identity."""
def test_env_identity_overrides_default_identity(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
monkeypatch.setenv("CAMOFOX_USER_ID", "shared-camofox")
monkeypatch.setenv("CAMOFOX_SESSION_KEY", "visible-tab")
monkeypatch.setenv("CAMOFOX_ADOPT_EXISTING_TAB", "true")
with patch("tools.browser_camofox._get", return_value={"tabs": []}) as mock_get:
session = _get_session("task-1")
assert session["user_id"] == "shared-camofox"
assert session["session_key"] == "visible-tab"
assert session["managed"] is True
assert session["adopt_existing_tab"] is True
mock_get.assert_called_once_with(
"/tabs",
params={"userId": "shared-camofox"},
timeout=5,
)
def test_config_identity_is_used_when_env_is_absent(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
config = {
"browser": {
"camofox": {
"user_id": "config-user",
"session_key": "config-session",
"adopt_existing_tab": False,
}
}
}
with patch("tools.browser_camofox.load_config", return_value=config):
session = _get_session("task-1")
assert session["user_id"] == "config-user"
assert session["session_key"] == "config-session"
assert session["adopt_existing_tab"] is False
def test_env_identity_takes_precedence_over_config(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
monkeypatch.setenv("CAMOFOX_USER_ID", "env-user")
monkeypatch.setenv("CAMOFOX_SESSION_KEY", "env-session")
monkeypatch.setenv("CAMOFOX_ADOPT_EXISTING_TAB", "false")
config = {
"browser": {
"camofox": {
"user_id": "config-user",
"session_key": "config-session",
"adopt_existing_tab": True,
}
}
}
with patch("tools.browser_camofox.load_config", return_value=config):
session = _get_session("task-1")
assert session["user_id"] == "env-user"
assert session["session_key"] == "env-session"
assert session["adopt_existing_tab"] is False
def test_adopts_existing_tab_matching_session_key(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
monkeypatch.setenv("CAMOFOX_USER_ID", "shared-camofox")
monkeypatch.setenv("CAMOFOX_SESSION_KEY", "visible-tab")
monkeypatch.setenv("CAMOFOX_ADOPT_EXISTING_TAB", "true")
tabs = {
"tabs": [
{"tabId": "tab-other", "listItemId": "other"},
{"tabId": "tab-visible", "listItemId": "visible-tab"},
]
}
with patch("tools.browser_camofox._get", return_value=tabs):
session = _get_session("task-1")
assert session["tab_id"] == "tab-visible"
def test_managed_persistence_can_opt_into_tab_adoption(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
config = {"browser": {"camofox": {"managed_persistence": True, "adopt_existing_tab": True}}}
with (
patch("tools.browser_camofox.load_config", return_value=config),
patch("tools.browser_camofox._get", return_value={"tabs": [{"tabId": "tab-1"}]}),
):
session = _get_session("task-1")
assert session["tab_id"] == "tab-1"
def test_soft_cleanup_preserves_externally_managed_session(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
monkeypatch.setenv("CAMOFOX_URL", "http://localhost:9377")
monkeypatch.setenv("CAMOFOX_USER_ID", "shared-camofox")
with patch("tools.browser_camofox._get", return_value={"tabs": []}):
_get_session("task-1")
result = camofox_soft_cleanup("task-1")
assert result is True
import tools.browser_camofox as mod
with mod._sessions_lock:
assert "task-1" not in mod._sessions
class TestVncUrlDiscovery:
"""VNC URL is derived from the Camofox health endpoint."""
+1 -4
View File
@@ -53,11 +53,8 @@ class TestCamofoxIdentity:
class TestCamofoxConfigDefaults:
def test_default_config_includes_camofox_controls(self):
def test_default_config_includes_managed_persistence_toggle(self):
from hermes_cli.config import DEFAULT_CONFIG
browser_cfg = DEFAULT_CONFIG["browser"]
assert browser_cfg["camofox"]["managed_persistence"] is False
assert browser_cfg["camofox"]["user_id"] == ""
assert browser_cfg["camofox"]["session_key"] == ""
assert browser_cfg["camofox"]["adopt_existing_tab"] is False
-207
View File
@@ -1,207 +0,0 @@
"""Tests for the gateway-side clarify primitive (tools/clarify_gateway.py).
The clarify tool needs to ask the user a question and block the agent
thread until they respond. These tests cover the module-level state
machine: register, wait, resolve via button, resolve via text-fallback,
"Other"-button text-capture flip, timeout, session boundary cleanup.
"""
from __future__ import annotations
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import pytest
def _clear_clarify_state():
"""Reset module-level state between tests."""
from tools import clarify_gateway as cm
with cm._lock:
cm._entries.clear()
cm._session_index.clear()
cm._notify_cbs.clear()
class TestClarifyPrimitive:
"""Core register/wait/resolve mechanics."""
def setup_method(self):
_clear_clarify_state()
def test_button_choice_resolves_wait(self):
"""resolve_gateway_clarify unblocks wait_for_response with the chosen string."""
from tools import clarify_gateway as cm
cm.register("id1", "sk1", "Pick one", ["A", "B", "C"])
def resolver():
time.sleep(0.05)
cm.resolve_gateway_clarify("id1", "B")
threading.Thread(target=resolver).start()
result = cm.wait_for_response("id1", timeout=2.0)
assert result == "B"
def test_open_ended_auto_awaits_text(self):
"""Clarify with no choices is in text-capture mode immediately."""
from tools import clarify_gateway as cm
entry = cm.register("id2", "sk2", "Free form?", None)
assert entry.awaiting_text is True
# get_pending_for_session returns the entry so the gateway
# text-intercept can find it.
pending = cm.get_pending_for_session("sk2")
assert pending is not None
assert pending.clarify_id == "id2"
def test_button_choice_does_not_auto_await(self):
"""Multi-choice clarify should NOT be in text-capture mode initially."""
from tools import clarify_gateway as cm
entry = cm.register("id3", "sk3", "Pick", ["X", "Y"])
assert entry.awaiting_text is False
assert cm.get_pending_for_session("sk3") is None
def test_other_button_flips_to_text_mode(self):
"""mark_awaiting_text makes get_pending_for_session find the entry."""
from tools import clarify_gateway as cm
cm.register("id4", "sk4", "Pick", ["X", "Y"])
assert cm.get_pending_for_session("sk4") is None
flipped = cm.mark_awaiting_text("id4")
assert flipped is True
pending = cm.get_pending_for_session("sk4")
assert pending is not None
assert pending.clarify_id == "id4"
def test_mark_awaiting_text_unknown_id(self):
"""mark_awaiting_text on a non-existent id returns False."""
from tools import clarify_gateway as cm
assert cm.mark_awaiting_text("nope") is False
def test_timeout_returns_none(self):
"""wait_for_response returns None when no resolve fires within the timeout."""
from tools import clarify_gateway as cm
cm.register("id5", "sk5", "Q?", ["A"])
result = cm.wait_for_response("id5", timeout=0.2)
assert result is None
def test_resolve_unknown_id_returns_false(self):
"""resolve_gateway_clarify is idempotent on unknown ids."""
from tools import clarify_gateway as cm
assert cm.resolve_gateway_clarify("nope", "anything") is False
def test_resolve_after_wait_completes_is_noop(self):
"""A late resolve on a finished entry doesn't blow up."""
from tools import clarify_gateway as cm
cm.register("id6", "sk6", "Q?", ["A"])
# Time out, entry gets cleaned up
cm.wait_for_response("id6", timeout=0.1)
# Late button click — should not raise
result = cm.resolve_gateway_clarify("id6", "A")
assert result is False
def test_clear_session_cancels_pending_entries(self):
"""clear_session unblocks blocked threads with empty response."""
from tools import clarify_gateway as cm
cm.register("id7", "sk7", "Q?", ["A"])
def waiter():
return cm.wait_for_response("id7", timeout=10.0)
with ThreadPoolExecutor(1) as pool:
fut = pool.submit(waiter)
time.sleep(0.05)
cancelled = cm.clear_session("sk7")
assert cancelled == 1
result = fut.result(timeout=2.0)
# clear_session sets response="" then the wait returns it
assert result == ""
def test_has_pending(self):
from tools import clarify_gateway as cm
cm.register("id8", "sk8", "Q?", ["A"])
assert cm.has_pending("sk8") is True
assert cm.has_pending("nonexistent") is False
def test_notify_register_unregister_clears_pending(self):
"""unregister_notify cancels any pending clarify so threads unwind."""
from tools import clarify_gateway as cm
cm.register("id9", "sk9", "Q?", ["A"])
def waiter():
return cm.wait_for_response("id9", timeout=10.0)
with ThreadPoolExecutor(1) as pool:
fut = pool.submit(waiter)
time.sleep(0.05)
cm.register_notify("sk9", lambda entry: None)
cm.unregister_notify("sk9")
# unregister_notify calls clear_session; thread unwinds
result = fut.result(timeout=2.0)
assert result == ""
def test_session_index_isolation(self):
"""Entries from different sessions don't leak across get_pending lookups."""
from tools import clarify_gateway as cm
cm.register("idA", "alpha", "Q?", None) # auto-await text
cm.register("idB", "beta", "Q?", None) # auto-await text
a = cm.get_pending_for_session("alpha")
b = cm.get_pending_for_session("beta")
assert a is not None and a.clarify_id == "idA"
assert b is not None and b.clarify_id == "idB"
def test_clarify_timeout_config_default(self):
"""get_clarify_timeout returns 600 by default."""
from tools import clarify_gateway as cm
timeout = cm.get_clarify_timeout()
# Default 600s OR whatever is in the user's loaded config.
# Floor check: must be a positive int, not crashed.
assert isinstance(timeout, int)
assert timeout > 0
class TestGatewayTextIntercept:
"""The gateway's _handle_message intercepts text replies to pending clarifies."""
def setup_method(self):
_clear_clarify_state()
def test_get_pending_for_session_returns_oldest_text_awaiting(self):
"""When two clarifies are pending, get_pending_for_session returns the
first that is awaiting_text (the older one if both)."""
from tools import clarify_gateway as cm
# Older multi-choice (not awaiting text)
cm.register("first", "sk", "Q1?", ["A"])
# Newer open-ended (awaiting text)
cm.register("second", "sk", "Q2?", None)
pending = cm.get_pending_for_session("sk")
# The newer one is awaiting text; the older isn't.
assert pending is not None
assert pending.clarify_id == "second"
# Now flip the first to text mode too. Both are awaiting text,
# FIFO returns the older one.
cm.mark_awaiting_text("first")
pending2 = cm.get_pending_for_session("sk")
assert pending2 is not None
assert pending2.clarify_id == "first"
+4 -4
View File
@@ -91,7 +91,7 @@ def make_env(daytona_sdk, monkeypatch):
if list_return is not None:
mock_client.list.return_value = list_return
else:
mock_client.list.return_value = iter([])
mock_client.list.return_value = SimpleNamespace(items=[])
daytona_sdk.Daytona = MagicMock(return_value=mock_client)
@@ -156,13 +156,13 @@ class TestPersistence:
legacy.process.exec.return_value = _make_exec_response(result="/root")
env = make_env(
get_side_effect=daytona_sdk.DaytonaError("not found"),
list_return=iter([legacy]),
list_return=SimpleNamespace(items=[legacy]),
persistent=True,
task_id="mytask",
)
legacy.start.assert_called_once()
env._mock_client.list.assert_called_once_with(
labels={"hermes_task_id": "mytask"}, limit=1)
labels={"hermes_task_id": "mytask"}, page=1, limit=1)
env._mock_client.create.assert_not_called()
def test_persistent_creates_new_when_none_found(self, make_env, daytona_sdk):
@@ -176,7 +176,7 @@ class TestPersistence:
# by checking get() was called with the right sandbox name
env._mock_client.get.assert_called_with("hermes-mytask")
env._mock_client.list.assert_called_with(
labels={"hermes_task_id": "mytask"}, limit=1)
labels={"hermes_task_id": "mytask"}, page=1, limit=1)
def test_non_persistent_skips_lookup(self, make_env):
env = make_env(persistent=False)
+2 -8
View File
@@ -157,14 +157,8 @@ class TestHandleVisionAnalyzeFastPath:
from agent.auxiliary_client import set_runtime_main, clear_runtime_main
set_runtime_main("openrouter", "anthropic/claude-opus-4.6")
try:
# Mock decide_image_input_mode to always return "native" so the
# fast path fires regardless of model-catalog state in CI.
with patch(
"agent.image_routing.decide_image_input_mode",
return_value="native",
):
coro = _handle_vision_analyze({"image_url": str(img), "question": "?"})
result = asyncio.get_event_loop().run_until_complete(coro)
coro = _handle_vision_analyze({"image_url": str(img), "question": "?"})
result = asyncio.get_event_loop().run_until_complete(coro)
finally:
clear_runtime_main()
+9 -105
View File
@@ -98,16 +98,6 @@ def get_vnc_url() -> Optional[str]:
return _vnc_url
def _get_camofox_config() -> Dict[str, Any]:
"""Return the ``browser.camofox`` config block, or an empty dict."""
try:
camofox_cfg = load_config().get("browser", {}).get("camofox", {})
except Exception as exc:
logger.warning("camofox config check failed, defaulting to disabled: %s", exc)
return {}
return camofox_cfg if isinstance(camofox_cfg, dict) else {}
def _managed_persistence_enabled() -> bool:
"""Return whether Hermes-managed persistence is enabled for Camofox.
@@ -117,46 +107,12 @@ def _managed_persistence_enabled() -> bool:
Controlled by ``browser.camofox.managed_persistence`` in config.yaml.
"""
return bool(_get_camofox_config().get("managed_persistence"))
def _camofox_identity_override(task_id: Optional[str], camofox_cfg: Dict[str, Any]) -> Optional[Dict[str, str]]:
"""Return an externally configured Camofox identity, if one is set.
Integrations that own the visible Camofox browser can set a shared user ID
so Hermes operates in the same browser profile instead of creating a
separate private session.
"""
user_id = os.getenv("CAMOFOX_USER_ID", "").strip() or str(camofox_cfg.get("user_id") or "").strip()
if not user_id:
return None
session_key = (
os.getenv("CAMOFOX_SESSION_KEY", "").strip()
or str(camofox_cfg.get("session_key") or "").strip()
or f"task_{(task_id or 'default')[:16]}"
)
return {"user_id": user_id, "session_key": session_key}
def _env_flag(name: str) -> Optional[bool]:
raw = os.getenv(name, "").strip().lower()
if not raw:
return None
if raw in {"1", "true", "yes", "on"}:
return True
if raw in {"0", "false", "no", "off"}:
try:
camofox_cfg = load_config().get("browser", {}).get("camofox", {})
except Exception as exc:
logger.warning("managed_persistence check failed, defaulting to disabled: %s", exc)
return False
logger.debug("Ignoring invalid boolean env %s=%r", name, raw)
return None
def _adopt_existing_tab_enabled(camofox_cfg: Dict[str, Any]) -> bool:
"""Return whether Hermes should recover an existing Camofox tab ID."""
env_value = _env_flag("CAMOFOX_ADOPT_EXISTING_TAB")
if env_value is not None:
return env_value
return bool(camofox_cfg.get("adopt_existing_tab"))
return bool(camofox_cfg.get("managed_persistence"))
# ---------------------------------------------------------------------------
@@ -167,44 +123,6 @@ _sessions: Dict[str, Dict[str, Any]] = {}
_sessions_lock = threading.Lock()
def _adopt_existing_tab(session: Dict[str, Any]) -> Dict[str, Any]:
"""Attach process-local state to an already-open managed Camofox tab.
Some integrations own the visible Camofox tab outside Hermes. Gateway
restarts can leave this module's in-memory session cache empty even though
Camofox still has that tab, so rehydrate tab_id before creating a new tab.
"""
if session.get("tab_id") or not session.get("adopt_existing_tab"):
return session
if not get_camofox_url():
return session
try:
tabs = _get("/tabs", params={"userId": session["user_id"]}, timeout=5).get("tabs", [])
except Exception as exc:
logger.debug("Camofox tab adoption failed for %s: %s", session.get("user_id"), exc)
return session
if not isinstance(tabs, list) or not tabs:
return session
session_key = session.get("session_key")
matching_tabs = [
tab
for tab in tabs
if isinstance(tab, dict) and tab.get("listItemId") == session_key
]
candidates = matching_tabs or [tab for tab in tabs if isinstance(tab, dict)]
latest = candidates[-1] if candidates else None
tab_id = latest.get("tabId") if isinstance(latest, dict) else None
if isinstance(tab_id, str) and tab_id:
session["tab_id"] = tab_id
logger.debug("Adopted existing Camofox tab %s for %s", tab_id, session.get("user_id"))
return session
def _get_session(task_id: Optional[str]) -> Dict[str, Any]:
"""Get or create a camofox session for the given task.
@@ -215,26 +133,14 @@ def _get_session(task_id: Optional[str]) -> Dict[str, Any]:
task_id = task_id or "default"
with _sessions_lock:
if task_id in _sessions:
return _adopt_existing_tab(_sessions[task_id])
camofox_cfg = _get_camofox_config()
identity_override = _camofox_identity_override(task_id, camofox_cfg)
if identity_override:
session = {
"user_id": identity_override["user_id"],
"tab_id": None,
"session_key": identity_override["session_key"],
"managed": True,
"adopt_existing_tab": _adopt_existing_tab_enabled(camofox_cfg),
}
elif bool(camofox_cfg.get("managed_persistence")):
return _sessions[task_id]
if _managed_persistence_enabled():
identity = get_camofox_identity(task_id)
session = {
"user_id": identity["user_id"],
"tab_id": None,
"session_key": identity["session_key"],
"managed": True,
"adopt_existing_tab": _adopt_existing_tab_enabled(camofox_cfg),
}
else:
session = {
@@ -242,10 +148,9 @@ def _get_session(task_id: Optional[str]) -> Dict[str, Any]:
"tab_id": None,
"session_key": f"task_{task_id[:16]}",
"managed": False,
"adopt_existing_tab": False,
}
_sessions[task_id] = session
return _adopt_existing_tab(session)
return session
def _ensure_tab(task_id: Optional[str], url: str = "about:blank") -> Dict[str, Any]:
@@ -285,8 +190,7 @@ def camofox_soft_cleanup(task_id: Optional[str] = None) -> bool:
does nothing and returns ``False`` so the caller can fall back to
:func:`camofox_close`.
"""
camofox_cfg = _get_camofox_config()
if bool(camofox_cfg.get("managed_persistence")) or _camofox_identity_override(task_id, camofox_cfg):
if _managed_persistence_enabled():
_drop_session(task_id)
logger.debug("Camofox soft cleanup for task %s (managed persistence)", task_id)
return True
-278
View File
@@ -1,278 +0,0 @@
"""Gateway-side clarify primitive (blocking event-based queue).
The ``clarify`` tool needs to ask the user a question and block the agent
thread until they respond. In CLI mode this is trivial ``input()`` is
synchronous. In gateway mode the agent runs on a worker thread while the
event loop handles the user's reply, so we need a thread-safe primitive
that:
* stores a pending clarify request (with a generated ``clarify_id``),
* blocks the agent thread on an ``Event``,
* resolves the wait when the gateway's button-callback or text-intercept
fires ``resolve_gateway_clarify(clarify_id, response)``,
* supports timeouts so a user who never responds does NOT hang the agent
thread forever (which would also pin the gateway's running-agent guard).
State is module-level (same shape as ``tools.approval``) so platform
adapters can call ``resolve_gateway_clarify`` without holding a back-
reference to the ``GatewayRunner`` instance.
Two delivery paths from the adapter:
1. **Button UI** adapters override ``send_clarify`` to render inline
buttons (e.g. Telegram ``InlineKeyboardMarkup``). The button
callback resolves with the chosen string. A final "Other (type
answer)" button enters text-capture mode for free-form responses.
2. **Text fallback** adapters without rich UI render a numbered list.
The user replies with a number ("2") or with free text; the gateway's
``_handle_message`` intercepts the reply and resolves directly.
"""
from __future__ import annotations
import logging
import threading
import time
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Module-level state
# =========================================================================
@dataclass
class _ClarifyEntry:
"""One pending clarify request inside a gateway session."""
clarify_id: str
session_key: str
question: str
choices: Optional[List[str]]
event: threading.Event = field(default_factory=threading.Event)
response: Optional[str] = None
awaiting_text: bool = False # set when user picked "Other" or clarify is open-ended
def signature(self) -> Dict[str, object]:
return {
"clarify_id": self.clarify_id,
"session_key": self.session_key,
"question": self.question,
"choices": list(self.choices) if self.choices else None,
}
_lock = threading.RLock()
# clarify_id → _ClarifyEntry (primary lookup for button callbacks)
_entries: Dict[str, _ClarifyEntry] = {}
# session_key → list[clarify_id] (FIFO; for text-fallback intercept and session cleanup)
_session_index: Dict[str, List[str]] = {}
# =========================================================================
# Public API — agent-thread side
# =========================================================================
def register(
clarify_id: str,
session_key: str,
question: str,
choices: Optional[List[str]],
) -> _ClarifyEntry:
"""Register a pending clarify request and return the entry.
The caller (gateway clarify_callback) will then send the prompt to the
user and block on ``wait_for_response(clarify_id, timeout)``.
"""
entry = _ClarifyEntry(
clarify_id=clarify_id,
session_key=session_key,
question=question,
choices=list(choices) if choices else None,
# Open-ended (no choices) → next message IS the response, no buttons needed.
awaiting_text=not bool(choices),
)
with _lock:
_entries[clarify_id] = entry
_session_index.setdefault(session_key, []).append(clarify_id)
return entry
def wait_for_response(clarify_id: str, timeout: float) -> Optional[str]:
"""Block on the entry's event until resolved or timeout fires.
Polls in 1-second slices so the agent's inactivity heartbeat keeps
firing without this, ``Event.wait(timeout=600)`` blocks the thread
for 10 minutes with zero activity touches and the gateway's inactivity
watchdog kills the agent while the user is still typing.
Returns the resolved response string, or ``None`` on timeout.
"""
with _lock:
entry = _entries.get(clarify_id)
if entry is None:
return None
try:
from tools.environments.base import touch_activity_if_due
except Exception: # pragma: no cover - optional
touch_activity_if_due = None
deadline = time.monotonic() + max(timeout, 0.0)
activity_state = {"last_touch": time.monotonic(), "start": time.monotonic()}
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
break
if entry.event.wait(timeout=min(1.0, remaining)):
break
if touch_activity_if_due is not None:
touch_activity_if_due(activity_state, "waiting for user clarify response")
with _lock:
# Remove from indices regardless of resolution outcome.
_entries.pop(clarify_id, None)
ids = _session_index.get(entry.session_key)
if ids and clarify_id in ids:
ids.remove(clarify_id)
if not ids:
_session_index.pop(entry.session_key, None)
return entry.response
# =========================================================================
# Public API — gateway / adapter side
# =========================================================================
def resolve_gateway_clarify(clarify_id: str, response: str) -> bool:
"""Unblock the agent thread waiting on ``clarify_id``.
Returns True if an entry was found and resolved, False otherwise
(already resolved, expired, or never existed).
"""
with _lock:
entry = _entries.get(clarify_id)
if entry is None:
return False
entry.response = str(response) if response is not None else ""
entry.event.set()
return True
def get_pending_for_session(session_key: str) -> Optional[_ClarifyEntry]:
"""Return the OLDEST pending clarify entry for a session, or None.
Used by the text-fallback intercept in ``_handle_message`` when a
clarify is awaiting a free-form text response, the next user message
in that session is captured as the answer.
"""
with _lock:
ids = _session_index.get(session_key) or []
for cid in ids:
entry = _entries.get(cid)
if entry is None:
continue
if entry.awaiting_text:
return entry
return None
def mark_awaiting_text(clarify_id: str) -> bool:
"""Flip an entry into text-capture mode (user picked the 'Other' button).
Returns True if the entry exists and was flipped, False otherwise.
"""
with _lock:
entry = _entries.get(clarify_id)
if entry is None:
return False
entry.awaiting_text = True
return True
def has_pending(session_key: str) -> bool:
"""Return True when this session has at least one pending clarify entry."""
with _lock:
ids = _session_index.get(session_key) or []
return any(_entries.get(cid) is not None for cid in ids)
def clear_session(session_key: str) -> int:
"""Resolve and drop every pending clarify for a session.
Used by session-boundary cleanup (e.g. ``/new``, gateway shutdown,
cached-agent eviction) so blocked agent threads don't hang past the
end of their session. Returns the number of entries cancelled.
"""
with _lock:
ids = list(_session_index.pop(session_key, []) or [])
entries = [_entries.pop(cid, None) for cid in ids]
cancelled = 0
for entry in entries:
if entry is None:
continue
# Empty string sentinel — agent code can distinguish from a real
# response by inspecting the wait_for_response return value
# alongside its own timeout deadline. Most callers just treat any
# falsy result as "user did not respond".
entry.response = ""
entry.event.set()
cancelled += 1
return cancelled
# =========================================================================
# Config
# =========================================================================
def get_clarify_timeout() -> int:
"""Read the clarify response timeout (seconds) from config.
Defaults to 600 (10 minutes) long enough for the user to type a
thoughtful response, short enough that an abandoned prompt eventually
unblocks the agent thread instead of pinning the running-agent guard
forever.
Reads ``agent.clarify_timeout`` from config.yaml.
"""
try:
from hermes_cli.config import load_config
cfg = load_config() or {}
agent_cfg = cfg.get("agent", {}) or {}
return int(agent_cfg.get("clarify_timeout", 600))
except Exception:
return 600
# =========================================================================
# Per-session notify hook (gateway → adapter bridge)
# =========================================================================
# Mirrors tools.approval's _gateway_notify_cbs: the gateway registers a
# per-session callback that sends the clarify prompt to the user. The
# callback bridges sync→async (runs on the agent thread; schedules the
# adapter ``send_clarify`` call on the event loop).
_notify_cbs: Dict[str, Callable[[_ClarifyEntry], None]] = {}
def register_notify(session_key: str, cb: Callable[[_ClarifyEntry], None]) -> None:
"""Register a per-session notify callback used by ``clarify_callback``."""
with _lock:
_notify_cbs[session_key] = cb
def unregister_notify(session_key: str) -> None:
"""Drop the per-session notify callback and cancel any pending clarify entries."""
with _lock:
_notify_cbs.pop(session_key, None)
# Cancel any pending entries so blocked threads unwind when the run
# ends (interrupt, completion, gateway shutdown).
clear_session(session_key)
def get_notify(session_key: str) -> Optional[Callable[[_ClarifyEntry], None]]:
with _lock:
return _notify_cbs.get(session_key)
+3 -7
View File
@@ -101,13 +101,9 @@ class DaytonaEnvironment(BaseEnvironment):
if self._sandbox is None:
try:
# Daytona SDK >=0.108.0 uses cursor-based pagination and
# list() returns an iterator. Offset-based pagination
# (page=1) is removed on June 10, 2026.
results = self._daytona.list(labels=labels, limit=1)
legacy = next(iter(results), None)
if legacy is not None:
self._sandbox = legacy
page = self._daytona.list(labels=labels, page=1, limit=1)
if page.items:
self._sandbox = page.items[0]
self._sandbox.start()
logger.info("Daytona: resumed legacy sandbox %s for task %s",
self._sandbox.id, task_id)
+17 -209
View File
@@ -120,13 +120,6 @@ class WriteResult:
bytes_written: int = 0
dirs_created: bool = False
lint: Optional[Dict[str, Any]] = None
# Semantic diagnostics from the LSP layer, when applicable. Kept in
# its own field (not folded into ``lint``) so the model and any
# downstream parsers can read syntax errors and semantic errors as
# separate signals. ``None`` when LSP is disabled, when the file
# isn't in a git workspace, or when no diagnostics were introduced
# by this edit.
lsp_diagnostics: Optional[str] = None
error: Optional[str] = None
warning: Optional[str] = None
@@ -143,8 +136,6 @@ class PatchResult:
files_created: List[str] = field(default_factory=list)
files_deleted: List[str] = field(default_factory=list)
lint: Optional[Dict[str, Any]] = None
# See :class:`WriteResult.lsp_diagnostics`.
lsp_diagnostics: Optional[str] = None
error: Optional[str] = None
def to_dict(self) -> dict:
@@ -159,8 +150,6 @@ class PatchResult:
result["files_deleted"] = self.files_deleted
if self.lint:
result["lint"] = self.lint
if self.lsp_diagnostics:
result["lsp_diagnostics"] = self.lsp_diagnostics
if self.error:
result["error"] = self.error
return result
@@ -327,55 +316,6 @@ LINTERS = {
}
# Patterns that indicate the linter base command exists on PATH but
# couldn't actually run — e.g. ``npx tsc`` when tsc isn't installed in
# node_modules, or rustfmt complaining there's no Cargo project. When
# any of these substrings appears in the linter output, ``_check_lint``
# returns ``skipped`` instead of ``error`` so:
#
# 1. The write isn't flagged for a tooling problem the agent can't fix.
# 2. The LSP semantic tier still runs (it gates on success/skipped).
#
# Patterns are matched case-insensitively against linter stdout.
_LINTER_UNUSABLE_PATTERNS = {
'npx': (
# npx prints this banner when the package isn't installed locally
# AND it can't auto-install (no internet, registry off, etc.) or
# when the binary it tried to run is the wrong one.
'this is not the tsc command you are looking for',
# npx with --no-install resolution failures
'could not determine executable to run',
'not found in npm registry',
),
'rustfmt': (
# rustfmt outside a Cargo project
'no input filename given',
'error: not a workspace',
),
'go': (
# ``go vet`` on a file outside a module / GOPATH
'cannot find package',
'go: cannot find main module',
),
}
def _looks_like_linter_unusable(base_cmd: str, output: str) -> bool:
"""Return True iff ``output`` from ``base_cmd`` indicates the linter
itself couldn't run (a tooling gap), as opposed to a real lint error
in the file being checked.
``base_cmd`` is the first word of the linter command line (``npx``,
``rustfmt``, ``go``, ...). ``output`` is the stdout/stderr captured
from running it.
"""
patterns = _LINTER_UNUSABLE_PATTERNS.get(base_cmd)
if not patterns:
return False
lower = output.lower()
return any(p in lower for p in patterns)
def _lint_json_inproc(content: str) -> tuple[bool, str]:
"""In-process JSON syntax check. Returns (ok, error_message)."""
import json as _json
@@ -927,13 +867,6 @@ class ShellFileOperations(FileOperations):
if read_result.exit_code == 0 and read_result.stdout:
pre_content = read_result.stdout
# Snapshot LSP diagnostics for this file (best-effort) so the
# post-write LSP layer can return only diagnostics introduced
# by this specific edit. Mirrors claude-code's
# ``beforeFileEdited`` pattern but wired to the local LSP
# rather than an external IDE.
self._snapshot_lsp_baseline(path)
# Create parent directories
parent = os.path.dirname(path)
dirs_created = False
@@ -964,21 +897,10 @@ class ShellFileOperations(FileOperations):
# Post-write lint with delta refinement.
lint_result = self._check_lint_delta(path, pre_content=pre_content, post_content=content)
# Semantic diagnostics from the LSP layer — separate channel.
# Only fired when the syntax tier reported clean (no point asking
# an LSP for a file that won't even parse). Best-effort:
# ``""`` is returned for any failure path.
lsp_diagnostics: Optional[str] = None
if lint_result.success or lint_result.skipped:
block = self._maybe_lsp_diagnostics(path)
if block:
lsp_diagnostics = block
return WriteResult(
bytes_written=bytes_written,
dirs_created=dirs_created,
lint=lint_result.to_dict() if lint_result else None,
lsp_diagnostics=lsp_diagnostics,
)
# =========================================================================
@@ -1074,14 +996,7 @@ class ShellFileOperations(FileOperations):
success=True,
diff=diff,
files_modified=[path],
lint=lint_result.to_dict() if lint_result else None,
# Propagate the LSP diagnostics already captured by the
# internal ``write_file`` call. Its baseline was the
# pre-patch content (taken at the start of write_file via
# ``_snapshot_lsp_baseline``) so the delta is correct for
# the patch as a whole. Keep the field separate from the
# syntax-check ``lint`` so the agent can read both signals.
lsp_diagnostics=write_result.lsp_diagnostics,
lint=lint_result.to_dict() if lint_result else None
)
def patch_v4a(self, patch_content: str) -> PatchResult:
@@ -1166,24 +1081,6 @@ class ShellFileOperations(FileOperations):
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
result = self._exec(cmd, timeout=30)
if result.exit_code != 0 and _looks_like_linter_unusable(base_cmd, result.stdout):
# The linter command exists on PATH but couldn't actually run
# (e.g. ``npx tsc`` when tsc isn't in node_modules; ``rustfmt
# --check`` without a Cargo project). This is a tooling gap,
# not a real lint failure — surface it as ``skipped`` so the
# write doesn't get flagged AND so the LSP tier still runs.
from tools.ansi_strip import strip_ansi
cleaned = strip_ansi(result.stdout).strip()
# Collapse to a single line — the npx banner is multi-line ASCII.
first_line = next(
(ln.strip() for ln in cleaned.splitlines() if ln.strip()),
cleaned[:120],
)
return LintResult(
skipped=True,
message=f"{base_cmd} not usable: {first_line[:200]}",
)
return LintResult(
success=result.exit_code == 0,
output=result.stdout.strip() if result.stdout.strip() else ""
@@ -1192,25 +1089,21 @@ class ShellFileOperations(FileOperations):
def _check_lint_delta(self, path: str, pre_content: Optional[str],
post_content: Optional[str] = None) -> LintResult:
"""
Run post-write syntax lint with pre-write baseline comparison.
Run post-write lint with pre-write baseline comparison.
Two-tier strategy:
Strategy (post-first, pre-lazy):
1. Lint the post-write state. If clean return clean immediately.
This is the hot path and matches _check_lint() in cost.
2. If post-lint found errors AND we have pre-write content, lint
that too. If the pre-write file was already broken, return only
the *new* errors introduced by this edit errors that existed
before aren't the agent's problem to chase right now.
3. If pre_content is None (new file or unavailable), skip the delta
step and return all post-write errors.
1. **Syntax check** (in-process or shell-based, microseconds).
Catches the bug class that motivated this layer: corrupt
writes, mashed quotes, truncated output. Hot path.
2. **Delta refinement against pre-write content** when the
syntax tier reports errors. Filter out errors that already
existed pre-edit so the agent isn't distracted by inherited
state.
Semantic diagnostics from the LSP layer are fetched separately
via :meth:`_maybe_lsp_diagnostics` and surfaced in the
``lsp_diagnostics`` field on :class:`WriteResult` /
:class:`PatchResult`. Keeping the two channels separate lets
the agent (and any downstream parsers) read syntax errors and
semantic errors as independent signals.
This mirrors Cline's and OpenCode's post-edit LSP pattern: surface
only the errors this specific edit introduced, so the agent doesn't
get distracted by pre-existing problems.
Args:
path: File path (for linter selection).
@@ -1229,12 +1122,12 @@ class ShellFileOperations(FileOperations):
"""
post = self._check_lint(path, content=post_content)
# Hot path: clean post-write syntactically.
# Hot path: clean post-write, no pre-lint needed.
if post.success or post.skipped:
return post
# Post-write has syntax errors. If we have pre-content, run the
# delta refinement to filter out pre-existing errors.
# Post-write has errors. If we have pre-content, run the delta
# refinement to filter out pre-existing errors.
if pre_content is None:
return post
@@ -1273,91 +1166,6 @@ class ShellFileOperations(FileOperations):
"(pre-existing errors filtered out):\n" + "\n".join(post_lines)
)
)
def _lsp_local_only(self) -> bool:
"""Return True iff this FileOperations is wired to a local backend.
LSP servers run on the host process they need access to the
files they're linting. Remote/sandboxed backends (Docker,
Modal, SSH, Daytona) keep files inside the sandbox where the
host-side LSP server can't reach them, so we skip the LSP
path for those entirely.
"""
env = getattr(self, "env", None)
if env is None:
# Defensive: some tests construct ShellFileOperations via
# ``__new__`` without going through ``__init__``, so
# ``self.env`` may be missing. No env = no LSP path.
return False
try:
from tools.environments.local import LocalEnvironment
except Exception: # noqa: BLE001
return False
return isinstance(env, LocalEnvironment)
def _snapshot_lsp_baseline(self, path: str) -> None:
"""Capture pre-edit LSP diagnostics so the post-write delta is correct.
Best-effort. Silent on every failure path LSP is an
enrichment layer and must never break a write.
Skipped entirely on non-local backends (Docker, Modal, SSH,
etc.) the server can't see files inside the sandbox.
"""
if not self._lsp_local_only():
return
try:
from agent.lsp import get_service
svc = get_service()
except Exception: # noqa: BLE001
return
if svc is None:
return
try:
svc.snapshot_baseline(path)
except Exception: # noqa: BLE001
pass
def _maybe_lsp_diagnostics(self, path: str) -> str:
"""Best-effort LSP semantic diagnostics for ``path``.
Returns a formatted ``<diagnostics>`` block, or empty string
when LSP is unavailable / disabled / produced no errors.
Wraps everything in a try/except so a misbehaving LSP server
can't break a write. This intentionally swallows all errors
the calling tier already returned a clean syntax result, so
``""`` here just means "no extra info to add".
Skipped entirely on non-local backends (Docker, Modal, SSH,
etc.) same reasoning as ``_snapshot_lsp_baseline``.
"""
if not self._lsp_local_only():
return ""
try:
from agent.lsp import get_service
except Exception: # noqa: BLE001
return ""
try:
svc = get_service()
except Exception: # noqa: BLE001
return ""
if svc is None or not svc.enabled_for(path):
return ""
try:
diagnostics = svc.get_diagnostics_sync(path, delta=True)
except Exception: # noqa: BLE001
return ""
if not diagnostics:
return ""
try:
from agent.lsp.reporter import report_for_file, truncate
block = report_for_file(path, diagnostics)
if not block:
return ""
return truncate("LSP diagnostics introduced by this edit:\n" + block)
except Exception: # noqa: BLE001
return ""
# =========================================================================
# SEARCH Implementation
+1 -1
View File
@@ -585,7 +585,7 @@ class ProcessRegistry:
try:
if not _IS_WINDOWS:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL) # windows-footgun: ok — guarded by _IS_WINDOWS check above
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except (ProcessLookupError, PermissionError, OSError):
proc.kill()
else:
-3
View File
@@ -355,9 +355,6 @@ def _parse_target_ref(platform_name: str, target_ref: str):
# Matrix room IDs (start with !) and user IDs (start with @) are explicit
if platform_name == "matrix" and (target_ref.startswith("!") or target_ref.startswith("@")):
return target_ref, None, True
# XMPP JIDs (user@server or room@conference.server) are explicit
if platform_name == "xmpp" and "@" in target_ref:
return target_ref, None, True
return None, None, False
+1 -3
View File
@@ -130,9 +130,7 @@ def detect_audio_environment() -> dict:
try:
devices = sd.query_devices()
if not devices:
if os.environ.get('PULSE_SERVER'):
notices.append("No PortAudio devices detected but PULSE_SERVER is set -- continuing")
elif termux_capture:
if termux_capture:
notices.append("No PortAudio devices detected, but Termux:API microphone capture is available")
else:
warnings.append("No audio input/output devices detected")
+1 -3
View File
@@ -429,9 +429,7 @@ def _tavily_request(endpoint: str, payload: dict) -> dict:
payload["api_key"] = api_key
url = f"{_TAVILY_BASE_URL}/{endpoint.lstrip('/')}"
logger.info("Tavily %s request to %s", endpoint, url)
# Tavily /crawl requires Bearer auth in header (body-only auth returns 401)
headers = {"Authorization": f"Bearer {api_key}"} if endpoint.strip("/") == "crawl" else {}
response = httpx.post(url, json=payload, headers=headers, timeout=60)
response = httpx.post(url, json=payload, timeout=60)
response.raise_for_status()
return response.json()

Some files were not shown because too many files have changed in this diff Show More