Compare commits

..

8 Commits

Author SHA1 Message Date
rob-maron f16808fac1 Merge remote-tracking branch 'origin/main' into switch-managed-browser-to-browser-use 2026-04-07 08:12:45 -04:00
Ben 4d65666527 fix(browser-use): port missing improvements from PR #5605
- CDP URL normalization: resolve HTTP discovery URLs to websocket after
  cloud provider create_session() (prevents agent-browser failures)
- Managed session payload: send timeout=5 and proxyCountryCode=us for
  gateway-backed sessions (prevents billing overruns)
- Update prompt builder, browser_close schema, and module docstring to
  replace remaining Browserbase references with Browser Use
- Dynamic /browser status detection via _get_cloud_provider() instead
  of hardcoded env var checks (future-proof for new providers)
- Rename post_setup key from 'browserbase' to 'agent_browser'
- Update setup hint to mention Browser Use alongside Browserbase
- Add tests: CDP normalization, browserbase direct-only guard,
  managed browser-use gateway, direct browserbase fallback
2026-04-07 22:00:15 +10:00
Ben 9d431b23e2 fix(nous_subscription): browserbase explicit provider is direct-only
Since managed Nous gateway now routes through Browser Use, the
browserbase explicit provider path should not check managed_browser_available
(which resolves against the browser-use gateway). Simplified to direct-only
with managed=False.
2026-04-07 20:43:14 +10:00
Ben 04aa3ac44f fix(browser-use): use X-Browser-Use-API-Key header for managed mode
The managed gateway expects X-Browser-Use-API-Key, not X-BB-API-Key
(which is a Browserbase-specific header). Using the wrong header caused
a 401 AUTH_ERROR on every managed-mode browser session create.

Simplified _headers() to always use X-Browser-Use-API-Key regardless
of direct vs managed mode.
2026-04-07 18:27:14 +10:00
Ben 3718a8de7c Merge branch 'main' into switch-managed-browser-to-browser-use 2026-04-07 18:00:13 +10:00
Ben 7c33338a7a fix: upgrade Browser Use provider to v3 API
- Base URL: api/v2 -> api/v3 (v2 is legacy)
- Unified all endpoints to use native Browser Use paths:
  - POST /browsers (create session, returns cdpUrl)
  - PATCH /browsers/{id} with {action: stop} (close session)
- Removed managed-mode branching that used Browserbase-style
  /v1/sessions paths — v3 gateway now supports /browsers directly
- Removed unused managed_mode variable in close_session
2026-04-07 16:41:32 +10:00
Ben 3e3a1e7624 chore: remove redundant Browser Use hint from system prompt 2026-04-07 16:14:12 +10:00
Ben 6fb7ea1e39 feat: switch managed browser provider from Browserbase to Browser Use
The Nous subscription tool gateway now routes browser automation through
Browser Use instead of Browserbase. This commit:

- Adds managed Nous gateway support to BrowserUseProvider (idempotency
  keys, X-BB-API-Key auth header, external_call_id persistence)
- Removes managed gateway support from BrowserbaseProvider (now
  direct-only via BROWSERBASE_API_KEY/BROWSERBASE_PROJECT_ID)
- Updates browser_tool.py fallback: prefers Browser Use over Browserbase
- Updates nous_subscription.py: gateway vendor 'browser-use', auto-config
  sets cloud_provider='browser-use' for new subscribers
- Updates tools_config.py: Nous Subscription entry now uses Browser Use
- Updates setup.py, cli.py, status.py, prompt_builder.py display strings
- Updates all affected tests to match new behavior

Browserbase remains fully functional for users with direct API credentials.
The change only affects the managed/subscription path.
2026-04-07 16:09:24 +10:00
267 changed files with 1265 additions and 3774 deletions
-3
View File
@@ -19,9 +19,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install -y ripgrep
- name: Install uv
uses: astral-sh/setup-uv@v5
+1
View File
@@ -15,6 +15,7 @@ Usage::
import asyncio
import logging
import os
import sys
from pathlib import Path
from hermes_constants import get_hermes_home
+2
View File
@@ -262,6 +262,8 @@ class SessionManager:
if self._db_instance is not None:
return self._db_instance
try:
import os
from pathlib import Path
from hermes_state import SessionDB
hermes_home = get_hermes_home()
self._db_instance = SessionDB(db_path=hermes_home / "state.db")
+88 -2
View File
@@ -188,7 +188,9 @@ def _requires_bearer_auth(base_url: str | None) -> bool:
if not base_url:
return False
normalized = base_url.rstrip("/").lower()
return normalized.startswith(("https://api.minimax.io/anthropic", "https://api.minimaxi.com/anthropic"))
return normalized.startswith("https://api.minimax.io/anthropic") or normalized.startswith(
"https://api.minimaxi.com/anthropic"
)
def build_anthropic_client(api_key: str, base_url: str = None):
@@ -706,6 +708,29 @@ def run_hermes_oauth_login_pure() -> Optional[Dict[str, Any]]:
}
def run_hermes_oauth_login() -> Optional[str]:
"""Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription.
Opens a browser to claude.ai for authorization, prompts for the code,
exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json.
Returns the access token on success, None on failure.
"""
result = run_hermes_oauth_login_pure()
if not result:
return None
access_token = result["access_token"]
refresh_token = result["refresh_token"]
expires_at_ms = result["expires_at_ms"]
_save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms)
_write_claude_code_credentials(access_token, refresh_token, expires_at_ms)
print("Authentication successful!")
return access_token
def _save_hermes_oauth_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None:
"""Save OAuth credentials to ~/.hermes/.anthropic_oauth.json."""
data = {
@@ -733,6 +758,38 @@ def read_hermes_oauth_credentials() -> Optional[Dict[str, Any]]:
return None
def refresh_hermes_oauth_token() -> Optional[str]:
"""Refresh the Hermes-managed OAuth token using the stored refresh token.
Returns the new access token, or None if refresh fails.
"""
creds = read_hermes_oauth_credentials()
if not creds or not creds.get("refreshToken"):
return None
try:
refreshed = refresh_anthropic_oauth_pure(
creds["refreshToken"],
use_json=True,
)
_save_hermes_oauth_credentials(
refreshed["access_token"],
refreshed["refresh_token"],
refreshed["expires_at_ms"],
)
_write_claude_code_credentials(
refreshed["access_token"],
refreshed["refresh_token"],
refreshed["expires_at_ms"],
)
logger.debug("Successfully refreshed Hermes OAuth token")
return refreshed["access_token"]
except Exception as e:
logger.debug("Failed to refresh Hermes OAuth token: %s", e)
return None
# ---------------------------------------------------------------------------
# Message / tool / response format conversion
# ---------------------------------------------------------------------------
@@ -790,7 +847,7 @@ def _convert_openai_image_part_to_anthropic(part: Dict[str, Any]) -> Optional[Di
},
}
if url.startswith(("http://", "https://")):
if url.startswith("http://") or url.startswith("https://"):
return {
"type": "image",
"source": {
@@ -802,6 +859,35 @@ def _convert_openai_image_part_to_anthropic(part: Dict[str, Any]) -> Optional[Di
return None
def _convert_user_content_part_to_anthropic(part: Any) -> Optional[Dict[str, Any]]:
if isinstance(part, dict):
ptype = part.get("type")
if ptype == "text":
block = {"type": "text", "text": part.get("text", "")}
if isinstance(part.get("cache_control"), dict):
block["cache_control"] = dict(part["cache_control"])
return block
if ptype == "image_url":
return _convert_openai_image_part_to_anthropic(part)
if ptype == "image" and part.get("source"):
return dict(part)
if ptype == "image" and part.get("data"):
media_type = part.get("mimeType") or part.get("media_type") or "image/png"
return {
"type": "image",
"source": {
"type": "base64",
"media_type": media_type,
"data": part.get("data", ""),
},
}
if ptype == "tool_result":
return dict(part)
elif part is not None:
return {"type": "text", "text": str(part)}
return None
def convert_tools_to_anthropic(tools: List[Dict]) -> List[Dict]:
"""Convert OpenAI tool definitions to Anthropic format."""
if not tools:
+2 -14
View File
@@ -91,7 +91,6 @@ auxiliary_is_nous: bool = False
# Default auxiliary models per provider
_OPENROUTER_MODEL = "google/gemini-3-flash-preview"
_NOUS_MODEL = "google/gemini-3-flash-preview"
_NOUS_FREE_TIER_VISION_MODEL = "xiaomi/mimo-v2-omni"
_NOUS_DEFAULT_BASE_URL = "https://inference-api.nousresearch.com/v1"
_ANTHROPIC_DEFAULT_BASE_URL = "https://api.anthropic.com"
_AUTH_JSON_PATH = get_hermes_home() / "auth.json"
@@ -209,6 +208,7 @@ class _CodexCompletionsAdapter:
def create(self, **kwargs) -> Any:
messages = kwargs.get("messages", [])
model = kwargs.get("model", self._model)
temperature = kwargs.get("temperature")
# Separate system/instructions from conversation messages.
# Convert chat.completions multimodal content blocks to Responses
@@ -720,19 +720,7 @@ def _try_nous() -> Tuple[Optional[OpenAI], Optional[str]]:
global auxiliary_is_nous
auxiliary_is_nous = True
logger.debug("Auxiliary client: Nous Portal")
if nous.get("source") == "pool":
model = "gemini-3-flash"
else:
model = _NOUS_MODEL
# Free-tier users can't use paid auxiliary models — use the free
# multimodal model instead so vision/browser-vision still works.
try:
from hermes_cli.models import check_nous_free_tier
if check_nous_free_tier():
model = _NOUS_FREE_TIER_VISION_MODEL
logger.debug("Free-tier Nous account — using %s for auxiliary/vision", model)
except Exception:
pass
model = "gemini-3-flash" if nous.get("source") == "pool" else _NOUS_MODEL
return (
OpenAI(
api_key=_nous_api_key(nous),
+2 -3
View File
@@ -13,10 +13,9 @@ from __future__ import annotations
import json
import logging
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -93,7 +92,7 @@ class BuiltinMemoryProvider(MemoryProvider):
def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str:
"""Not used — the memory tool is intercepted in run_agent.py."""
return tool_error("Built-in memory tool is handled by the agent loop")
return json.dumps({"error": "Built-in memory tool is handled by the agent loop"})
def shutdown(self) -> None:
"""No cleanup needed — files are saved on every write."""
+3 -2
View File
@@ -343,9 +343,10 @@ def _resolve_path(cwd: Path, target: str, *, allowed_root: Path | None = None) -
def _ensure_reference_path_allowed(path: Path) -> None:
from hermes_constants import get_hermes_home
home = Path(os.path.expanduser("~")).resolve()
hermes_home = get_hermes_home().resolve()
hermes_home = Path(
os.getenv("HERMES_HOME", str(home / ".hermes"))
).expanduser().resolve()
blocked_exact = {home / rel for rel in _SENSITIVE_HOME_FILES}
blocked_exact.add(hermes_home / ".env")
+4 -1
View File
@@ -10,18 +10,21 @@ import uuid
import os
import re
from dataclasses import dataclass, fields, replace
from datetime import datetime
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple
from hermes_constants import OPENROUTER_BASE_URL
import hermes_cli.auth as auth_mod
from hermes_cli.auth import (
ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
PROVIDER_REGISTRY,
_agent_key_is_usable,
_codex_access_token_is_expiring,
_decode_jwt_claims,
_import_codex_cli_tokens,
_is_expiring,
_load_auth_store,
_load_provider_state,
_resolve_zai_base_url,
+18
View File
@@ -986,6 +986,24 @@ def _osc8_link(url: str, text: str) -> str:
return f"\033]8;;{url}\033\\{text}\033]8;;\033\\"
def honcho_session_line(workspace: str, session_name: str) -> str:
"""One-line session indicator: `Honcho session: <clickable name>`."""
url = honcho_session_url(workspace, session_name)
linked_name = _osc8_link(url, f"{_SKY_BLUE}{session_name}{_ANSI_RESET}")
return f"{_DIM}Honcho session:{_ANSI_RESET} {linked_name}"
def write_tty(text: str) -> None:
"""Write directly to /dev/tty, bypassing stdout capture."""
try:
fd = os.open("/dev/tty", os.O_WRONLY)
os.write(fd, text.encode("utf-8"))
os.close(fd)
except OSError:
sys.stdout.write(text)
sys.stdout.flush()
# =========================================================================
# Context pressure display (CLI user-facing warnings)
# =========================================================================
+2 -3
View File
@@ -34,7 +34,6 @@ import re
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -250,7 +249,7 @@ class MemoryManager:
"""
provider = self._tool_to_provider.get(tool_name)
if provider is None:
return tool_error(f"No memory provider handles tool '{tool_name}'")
return json.dumps({"error": f"No memory provider handles tool '{tool_name}'"})
try:
return provider.handle_tool_call(tool_name, args, **kwargs)
except Exception as e:
@@ -258,7 +257,7 @@ class MemoryManager:
"Memory provider '%s' handle_tool_call(%s) failed: %s",
provider.name, tool_name, e,
)
return tool_error(f"Memory tool '{tool_name}' failed: {e}")
return json.dumps({"error": f"Memory tool '{tool_name}' failed: {e}"})
# -- Lifecycle hooks -----------------------------------------------------
+1 -1
View File
@@ -34,7 +34,7 @@ from __future__ import annotations
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
+2 -2
View File
@@ -510,8 +510,8 @@ def fetch_endpoint_model_metadata(
def _get_context_cache_path() -> Path:
"""Return path to the persistent context length cache file."""
from hermes_constants import get_hermes_home
return get_hermes_home() / "context_length_cache.yaml"
hermes_home = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
return hermes_home / "context_length_cache.yaml"
def _load_context_cache() -> Dict[str, int]:
+6 -5
View File
@@ -23,9 +23,9 @@ import json
import logging
import os
import time
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union
from utils import atomic_json_write
@@ -185,8 +185,9 @@ def _get_reverse_mapping() -> Dict[str, str]:
def _get_cache_path() -> Path:
"""Return path to disk cache file."""
from hermes_constants import get_hermes_home
return get_hermes_home() / "models_dev_cache.json"
env_val = os.environ.get("HERMES_HOME", "")
hermes_home = Path(env_val) if env_val else Path.home() / ".hermes"
return hermes_home / "models_dev_cache.json"
def _load_disk_cache() -> Dict[str, Any]:
@@ -230,7 +231,7 @@ def fetch_models_dev(force_refresh: bool = False) -> Dict[str, Any]:
response = requests.get(MODELS_DEV_URL, timeout=15)
response.raise_for_status()
data = response.json()
if isinstance(data, dict) and data:
if isinstance(data, dict) and len(data) > 0:
_models_dev_cache = data
_models_dev_cache_time = time.time()
_save_disk_cache(data)
+1 -1
View File
@@ -10,7 +10,7 @@ import os
import re
import sys
from pathlib import Path
from typing import Any, Dict, List, Set, Tuple
from typing import Any, Dict, List, Optional, Set, Tuple
from hermes_constants import get_hermes_home
+1
View File
@@ -15,6 +15,7 @@ Inspired by Block/goose's SubdirectoryHintTracker.
import logging
import os
import re
import shlex
from pathlib import Path
from typing import Dict, Any, Optional, Set
+1 -3
View File
@@ -31,8 +31,6 @@ from multiprocessing import Pool, Lock
import traceback
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeRemainingColumn, MofNCompleteColumn
from rich.console import Console
logger = logging.getLogger(__name__)
import fire
from run_agent import AIAgent
@@ -1018,7 +1016,7 @@ class BatchRunner:
tool_stats = data.get('tool_stats', {})
# Check for invalid tool names (model hallucinations)
invalid_tools = [k for k in tool_stats if k not in VALID_TOOLS]
invalid_tools = [k for k in tool_stats.keys() if k not in VALID_TOOLS]
if invalid_tools:
filtered_entries += 1
+19 -2
View File
@@ -70,7 +70,7 @@ _COMMAND_SPINNER_FRAMES = ("⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
from hermes_constants import get_hermes_home, display_hermes_home
from hermes_constants import get_hermes_home, display_hermes_home, OPENROUTER_BASE_URL
from hermes_cli.env_loader import load_hermes_dotenv
_hermes_home = get_hermes_home()
@@ -3536,6 +3536,13 @@ class HermesCLI:
_cprint(f" Original session: {parent_session_id}")
_cprint(f" Branch session: {new_session_id}")
def reset_conversation(self):
"""Reset the conversation by starting a new session."""
# Shut down memory provider before resetting — actual session boundary
if hasattr(self, 'agent') and self.agent:
self.agent.shutdown_memory_provider(self.conversation_history)
self.new_session()
def save_conversation(self):
"""Save the current conversation to a file."""
if not self.conversation_history:
@@ -4239,6 +4246,7 @@ class HermesCLI:
try:
config = load_gateway_config()
connected = config.get_connected_platforms()
print(" Messaging Platform Configuration:")
print(" " + "-" * 55)
@@ -6000,7 +6008,7 @@ class HermesCLI:
timeout = CLI_CONFIG.get("clarify", {}).get("timeout", 120)
response_queue = queue.Queue()
is_open_ended = not choices
is_open_ended = not choices or len(choices) == 0
self._clarify_state = {
"question": question,
@@ -6283,6 +6291,14 @@ class HermesCLI:
except Exception:
pass
def _clear_current_input(self) -> None:
if getattr(self, "_app", None):
try:
self._app.current_buffer.text = ""
except Exception:
pass
def chat(self, message, images: list = None) -> Optional[str]:
"""
Send a message to the agent and get a response.
@@ -7823,6 +7839,7 @@ class HermesCLI:
title = '🔐 Sudo Password Required'
body = 'Enter password below (hidden), or press Enter to skip'
box_width = _panel_box_width(title, [body])
inner = max(0, box_width - 2)
lines = []
lines.append(('class:sudo-border', '╭─ '))
lines.append(('class:sudo-title', title))
+12 -59
View File
@@ -25,6 +25,7 @@ except ImportError:
import msvcrt
except ImportError:
msvcrt = None
import time
from pathlib import Path
from typing import Optional
@@ -158,44 +159,6 @@ def _resolve_delivery_target(job: dict) -> Optional[dict]:
}
# Media extension sets — keep in sync with gateway/platforms/base.py:_process_message_background
_AUDIO_EXTS = frozenset({'.ogg', '.opus', '.mp3', '.wav', '.m4a'})
_VIDEO_EXTS = frozenset({'.mp4', '.mov', '.avi', '.mkv', '.webm', '.3gp'})
_IMAGE_EXTS = frozenset({'.jpg', '.jpeg', '.png', '.webp', '.gif'})
def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata: dict | None, loop, job: dict) -> None:
"""Send extracted MEDIA files as native platform attachments via a live adapter.
Routes each file to the appropriate adapter method (send_voice, send_image_file,
send_video, send_document) based on file extension — mirroring the routing logic
in ``BasePlatformAdapter._process_message_background``.
"""
from pathlib import Path
for media_path, _is_voice in media_files:
try:
ext = Path(media_path).suffix.lower()
if ext in _AUDIO_EXTS:
coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
elif ext in _VIDEO_EXTS:
coro = adapter.send_video(chat_id=chat_id, video_path=media_path, metadata=metadata)
elif ext in _IMAGE_EXTS:
coro = adapter.send_image_file(chat_id=chat_id, image_path=media_path, metadata=metadata)
else:
coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)
future = asyncio.run_coroutine_threadsafe(coro, loop)
result = future.result(timeout=30)
if result and not getattr(result, "success", True):
logger.warning(
"Job '%s': media send failed for %s: %s",
job.get("id", "?"), media_path, getattr(result, "error", "unknown"),
)
except Exception as e:
logger.warning("Job '%s': failed to send media %s: %s", job.get("id", "?"), media_path, e)
def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
"""
Deliver job output to the configured target (origin chat, specific platform, etc.).
@@ -284,28 +247,18 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> None:
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
send_metadata = {"thread_id": thread_id} if thread_id else None
try:
# Send cleaned text (MEDIA tags stripped) — not the raw content
text_to_send = cleaned_delivery_content.strip()
adapter_ok = True
if text_to_send:
future = asyncio.run_coroutine_threadsafe(
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
loop,
future = asyncio.run_coroutine_threadsafe(
runtime_adapter.send(chat_id, delivery_content, metadata=send_metadata),
loop,
)
send_result = future.result(timeout=60)
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
job["id"], platform_name, chat_id, err,
)
send_result = future.result(timeout=60)
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(
"Job '%s': live adapter send to %s:%s failed (%s), falling back to standalone",
job["id"], platform_name, chat_id, err,
)
adapter_ok = False # fall through to standalone path
# Send extracted media files as native attachments via the live adapter
if adapter_ok and media_files:
_send_media_via_adapter(runtime_adapter, chat_id, media_files, send_metadata, loop, job)
if adapter_ok:
else:
logger.info("Job '%s': delivered to %s:%s via live adapter", job["id"], platform_name, chat_id)
return
except Exception as e:
+1 -2
View File
@@ -24,8 +24,7 @@ from pathlib import Path
logger = logging.getLogger("hooks.boot-md")
from hermes_constants import get_hermes_home
HERMES_HOME = get_hermes_home()
HERMES_HOME = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
BOOT_FILE = HERMES_HOME / "BOOT.md"
+1
View File
@@ -124,6 +124,7 @@ def _build_discord(adapter) -> List[Dict[str, str]]:
def _build_slack(adapter) -> List[Dict[str, str]]:
"""List Slack channels the bot has joined."""
channels = []
# Slack adapter may expose a web client
client = getattr(adapter, "_app", None) or getattr(adapter, "_client", None)
if not client:
+35 -1
View File
@@ -314,4 +314,38 @@ def parse_deliver_spec(
return deliver
def build_delivery_context_for_tool(
config: GatewayConfig,
origin: Optional[SessionSource] = None
) -> Dict[str, Any]:
"""
Build context for the unified cronjob tool to understand delivery options.
This is passed to the tool so it can validate and explain delivery targets.
"""
connected = config.get_connected_platforms()
options = {
"origin": {
"description": "Back to where this job was created",
"available": origin is not None,
},
"local": {
"description": "Save to local files only",
"available": True,
}
}
for platform in connected:
home = config.get_home_channel(platform)
options[platform.value] = {
"description": f"{platform.value.title()} home channel",
"available": True,
"home_channel": home.to_dict() if home else None,
}
return {
"origin": origin.to_dict() if origin else None,
"options": options,
"always_log_local": config.always_log_local,
}
+3 -24
View File
@@ -27,6 +27,7 @@ sys.path.insert(0, str(_Path(__file__).resolve().parents[2]))
from gateway.config import Platform, PlatformConfig
from gateway.session import SessionSource, build_session_key
from hermes_cli.config import get_hermes_home
from hermes_constants import get_hermes_dir
@@ -484,9 +485,6 @@ class BasePlatformAdapter(ABC):
self._background_tasks: set[asyncio.Task] = set()
# Chats where auto-TTS on voice input is disabled (set by /voice off)
self._auto_tts_disabled_chats: set = set()
# Chats where typing indicator is paused (e.g. during approval waits).
# _keep_typing skips send_typing when the chat_id is in this set.
self._typing_paused: set = set()
@property
def has_fatal_error(self) -> bool:
@@ -946,16 +944,10 @@ class BasePlatformAdapter(ABC):
Telegram/Discord typing status expires after ~5 seconds, so we refresh every 2
to recover quickly after progress messages interrupt it.
Skips send_typing when the chat is in ``_typing_paused`` (e.g. while
the agent is waiting for dangerous-command approval). This is critical
for Slack's Assistant API where ``assistant_threads_setStatus`` disables
the compose box pausing lets the user type ``/approve`` or ``/deny``.
"""
try:
while True:
if chat_id not in self._typing_paused:
await self.send_typing(chat_id, metadata=metadata)
await self.send_typing(chat_id, metadata=metadata)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass # Normal cancellation when handler completes
@@ -969,20 +961,7 @@ class BasePlatformAdapter(ABC):
await self.stop_typing(chat_id)
except Exception:
pass
self._typing_paused.discard(chat_id)
def pause_typing_for_chat(self, chat_id: str) -> None:
"""Pause typing indicator for a chat (e.g. during approval waits).
Thread-safe (CPython GIL) can be called from the sync agent thread
while ``_keep_typing`` runs on the async event loop.
"""
self._typing_paused.add(chat_id)
def resume_typing_for_chat(self, chat_id: str) -> None:
"""Resume typing indicator for a chat after approval resolves."""
self._typing_paused.discard(chat_id)
# ── Processing lifecycle hooks ──────────────────────────────────────────
# Subclasses override these to react to message processing events
# (e.g. Discord adds 👀/✅/❌ reactions).
+5
View File
@@ -60,6 +60,7 @@ try:
CreateMessageRequestBody,
GetChatRequest,
GetMessageRequest,
GetImageRequest,
GetMessageResourceRequest,
P2ImMessageMessageReadV1,
ReplyMessageRequest,
@@ -387,6 +388,10 @@ def _coerce_required_int(value: Any, default: int, min_value: int = 0) -> int:
return default if parsed is None else parsed
def _is_loop_ready(loop: Optional[asyncio.AbstractEventLoop]) -> bool:
return loop is not None and not bool(getattr(loop, "is_closed", lambda: False)())
# ---------------------------------------------------------------------------
# Post payload builders and parsers
# ---------------------------------------------------------------------------
+1 -1
View File
@@ -1057,7 +1057,7 @@ class MatrixAdapter(BasePlatformAdapter):
# Message type.
msg_type = MessageType.TEXT
if body.startswith(("!", "/")):
if body.startswith("!") or body.startswith("/"):
msg_type = MessageType.COMMAND
source = self.build_source(
+1
View File
@@ -430,6 +430,7 @@ class MattermostAdapter(BasePlatformAdapter):
ct = resp.content_type or "application/octet-stream"
break
except (aiohttp.ClientError, asyncio.TimeoutError) as exc:
last_exc = exc
if attempt < 2:
await asyncio.sleep(1.5 * (attempt + 1))
continue
+53 -329
View File
@@ -84,17 +84,6 @@ class SlackAdapter(BasePlatformAdapter):
self._seen_messages: Dict[str, float] = {}
self._SEEN_TTL = 300 # 5 minutes
self._SEEN_MAX = 2000 # prune threshold
# Track pending approval message_ts → resolved flag to prevent
# double-clicks on approval buttons.
self._approval_resolved: Dict[str, bool] = {}
# Track timestamps of messages sent by the bot so we can respond
# to thread replies even without an explicit @mention.
self._bot_message_ts: set = set()
self._BOT_TS_MAX = 5000 # cap to avoid unbounded growth
# Track threads where the bot has been @mentioned — once mentioned,
# respond to ALL subsequent messages in that thread automatically.
self._mentioned_threads: set = set()
self._MENTIONED_THREADS_MAX = 5000
async def connect(self) -> bool:
"""Connect to Slack via Socket Mode."""
@@ -187,15 +176,6 @@ class SlackAdapter(BasePlatformAdapter):
await ack()
await self._handle_slash_command(command)
# Register Block Kit action handlers for approval buttons
for _action_id in (
"hermes_approve_once",
"hermes_approve_session",
"hermes_approve_always",
"hermes_deny",
):
self._app.action(_action_id)(self._handle_approval_action)
# Start Socket Mode handler in background
self._handler = AsyncSocketModeHandler(self._app, app_token)
self._socket_mode_task = asyncio.create_task(self._handler.start_async())
@@ -276,22 +256,9 @@ class SlackAdapter(BasePlatformAdapter):
last_result = await self._get_client(chat_id).chat_postMessage(**kwargs)
# Track the sent message ts so we can auto-respond to thread
# replies without requiring @mention.
sent_ts = last_result.get("ts") if last_result else None
if sent_ts:
self._bot_message_ts.add(sent_ts)
# Also register the thread root so replies-to-my-replies work
if thread_ts:
self._bot_message_ts.add(thread_ts)
if len(self._bot_message_ts) > self._BOT_TS_MAX:
excess = len(self._bot_message_ts) - self._BOT_TS_MAX // 2
for old_ts in list(self._bot_message_ts)[:excess]:
self._bot_message_ts.discard(old_ts)
return SendResult(
success=True,
message_id=sent_ts,
message_id=last_result.get("ts") if last_result else None,
raw_response=last_result,
)
@@ -799,61 +766,30 @@ class SlackAdapter(BasePlatformAdapter):
else:
thread_ts = event.get("thread_ts") or ts # ts fallback for channels
# In channels, respond if:
# 1. The bot is @mentioned in this message, OR
# 2. The message is a reply in a thread the bot started/participated in, OR
# 3. The message is in a thread where the bot was previously @mentioned, OR
# 4. There's an existing session for this thread (survives restarts)
# In channels, only respond if bot is mentioned OR if this is a
# reply in a thread where the bot has an active session.
bot_uid = self._team_bot_user_ids.get(team_id, self._bot_user_id)
is_mentioned = bot_uid and f"<@{bot_uid}>" in text
event_thread_ts = event.get("thread_ts")
is_thread_reply = bool(event_thread_ts and event_thread_ts != ts)
if not is_dm and bot_uid and not is_mentioned:
reply_to_bot_thread = (
is_thread_reply and event_thread_ts in self._bot_message_ts
)
in_mentioned_thread = (
event_thread_ts is not None
and event_thread_ts in self._mentioned_threads
)
has_session = (
is_thread_reply
and self._has_active_session_for_thread(
channel_id=channel_id,
thread_ts=event_thread_ts,
user_id=user_id,
)
)
if not reply_to_bot_thread and not in_mentioned_thread and not has_session:
# Check if this is a thread reply (thread_ts exists and differs from ts)
event_thread_ts = event.get("thread_ts")
is_thread_reply = event_thread_ts and event_thread_ts != ts
if is_thread_reply and self._has_active_session_for_thread(
channel_id=channel_id,
thread_ts=event_thread_ts,
user_id=user_id,
):
# Allow thread replies without mention if there's an active session
pass
else:
# Not a thread reply or no active session - ignore
return
if is_mentioned:
# Strip the bot mention from the text
text = text.replace(f"<@{bot_uid}>", "").strip()
# Register this thread so all future messages auto-trigger the bot
if event_thread_ts:
self._mentioned_threads.add(event_thread_ts)
if len(self._mentioned_threads) > self._MENTIONED_THREADS_MAX:
to_remove = list(self._mentioned_threads)[:self._MENTIONED_THREADS_MAX // 2]
for t in to_remove:
self._mentioned_threads.discard(t)
# When entering a thread for the first time (no existing session),
# fetch thread context so the agent understands the conversation.
if is_thread_reply and not self._has_active_session_for_thread(
channel_id=channel_id,
thread_ts=event_thread_ts,
user_id=user_id,
):
thread_context = await self._fetch_thread_context(
channel_id=channel_id,
thread_ts=event_thread_ts,
current_ts=ts,
team_id=team_id,
)
if thread_context:
text = thread_context + text
# Determine message type
msg_type = MessageType.TEXT
@@ -976,233 +912,6 @@ class SlackAdapter(BasePlatformAdapter):
await self._remove_reaction(channel_id, ts, "eyes")
await self._add_reaction(channel_id, ts, "white_check_mark")
# ----- Approval button support (Block Kit) -----
async def send_exec_approval(
self, chat_id: str, command: str, session_key: str,
description: str = "dangerous command",
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send a Block Kit approval prompt with interactive buttons.
The buttons call ``resolve_gateway_approval()`` to unblock the waiting
agent thread same mechanism as the text ``/approve`` flow.
"""
if not self._app:
return SendResult(success=False, error="Not connected")
try:
cmd_preview = command[:2900] + "..." if len(command) > 2900 else command
thread_ts = self._resolve_thread_ts(None, metadata)
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f":warning: *Command Approval Required*\n"
f"```{cmd_preview}```\n"
f"Reason: {description}"
),
},
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "Allow Once"},
"style": "primary",
"action_id": "hermes_approve_once",
"value": session_key,
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Allow Session"},
"action_id": "hermes_approve_session",
"value": session_key,
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Always Allow"},
"action_id": "hermes_approve_always",
"value": session_key,
},
{
"type": "button",
"text": {"type": "plain_text", "text": "Deny"},
"style": "danger",
"action_id": "hermes_deny",
"value": session_key,
},
],
},
]
kwargs: Dict[str, Any] = {
"channel": chat_id,
"text": f"⚠️ Command approval required: {cmd_preview[:100]}",
"blocks": blocks,
}
if thread_ts:
kwargs["thread_ts"] = thread_ts
result = await self._get_client(chat_id).chat_postMessage(**kwargs)
msg_ts = result.get("ts", "")
if msg_ts:
self._approval_resolved[msg_ts] = False
return SendResult(success=True, message_id=msg_ts, raw_response=result)
except Exception as e:
logger.error("[Slack] send_exec_approval failed: %s", e, exc_info=True)
return SendResult(success=False, error=str(e))
async def _handle_approval_action(self, ack, body, action) -> None:
"""Handle an approval button click from Block Kit."""
await ack()
action_id = action.get("action_id", "")
session_key = action.get("value", "")
message = body.get("message", {})
msg_ts = message.get("ts", "")
channel_id = body.get("channel", {}).get("id", "")
user_name = body.get("user", {}).get("name", "unknown")
# Map action_id to approval choice
choice_map = {
"hermes_approve_once": "once",
"hermes_approve_session": "session",
"hermes_approve_always": "always",
"hermes_deny": "deny",
}
choice = choice_map.get(action_id, "deny")
# Prevent double-clicks
if self._approval_resolved.get(msg_ts, False):
return
self._approval_resolved[msg_ts] = True
# Update the message to show the decision and remove buttons
label_map = {
"once": f"✅ Approved once by {user_name}",
"session": f"✅ Approved for session by {user_name}",
"always": f"✅ Approved permanently by {user_name}",
"deny": f"❌ Denied by {user_name}",
}
decision_text = label_map.get(choice, f"Resolved by {user_name}")
# Get original text from the section block
original_text = ""
for block in message.get("blocks", []):
if block.get("type") == "section":
original_text = block.get("text", {}).get("text", "")
break
updated_blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": original_text or "Command approval request",
},
},
{
"type": "context",
"elements": [
{"type": "mrkdwn", "text": decision_text},
],
},
]
try:
await self._get_client(channel_id).chat_update(
channel=channel_id,
ts=msg_ts,
text=decision_text,
blocks=updated_blocks,
)
except Exception as e:
logger.warning("[Slack] Failed to update approval message: %s", e)
# Resolve the approval — this unblocks the agent thread
try:
from tools.approval import resolve_gateway_approval
count = resolve_gateway_approval(session_key, choice)
logger.info(
"Slack button resolved %d approval(s) for session %s (choice=%s, user=%s)",
count, session_key, choice, user_name,
)
except Exception as exc:
logger.error("Failed to resolve gateway approval from Slack button: %s", exc)
# Clean up stale approval state
self._approval_resolved.pop(msg_ts, None)
# ----- Thread context fetching -----
async def _fetch_thread_context(
self, channel_id: str, thread_ts: str, current_ts: str,
team_id: str = "", limit: int = 30,
) -> str:
"""Fetch recent thread messages to provide context when the bot is
mentioned mid-thread for the first time.
Returns a formatted string with thread history, or empty string on
failure or if the thread is empty (just the parent message).
"""
try:
client = self._get_client(channel_id)
result = await client.conversations_replies(
channel=channel_id,
ts=thread_ts,
limit=limit + 1, # +1 because it includes the current message
inclusive=True,
)
messages = result.get("messages", [])
if not messages:
return ""
context_parts = []
for msg in messages:
msg_ts = msg.get("ts", "")
# Skip the current message (the one that triggered this fetch)
if msg_ts == current_ts:
continue
# Skip bot messages from ourselves
if msg.get("bot_id") or msg.get("subtype") == "bot_message":
continue
msg_user = msg.get("user", "unknown")
msg_text = msg.get("text", "").strip()
if not msg_text:
continue
# Strip bot mentions from context messages
bot_uid = self._team_bot_user_ids.get(team_id, self._bot_user_id)
if bot_uid:
msg_text = msg_text.replace(f"<@{bot_uid}>", "").strip()
# Mark the thread parent
is_parent = msg_ts == thread_ts
prefix = "[thread parent] " if is_parent else ""
# Resolve user name (cached)
name = await self._resolve_user_name(msg_user, chat_id=channel_id)
context_parts.append(f"{prefix}{name}: {msg_text}")
if not context_parts:
return ""
return (
"[Thread context — previous messages in this thread:]\n"
+ "\n".join(context_parts)
+ "\n[End of thread context]\n\n"
)
except Exception as e:
logger.warning("[Slack] Failed to fetch thread context: %s", e)
return ""
async def _handle_slash_command(self, command: dict) -> None:
"""Handle /hermes slash command."""
text = command.get("text", "").strip()
@@ -1251,22 +960,27 @@ class SlackAdapter(BasePlatformAdapter):
user_id: str,
) -> bool:
"""Check if there's an active session for a thread.
Used to determine if thread replies without @mentions should be
processed (they should if there's an active session).
Uses ``build_session_key()`` as the single source of truth for key
construction avoids the bug where manual key building didn't
respect ``thread_sessions_per_user`` and ``group_sessions_per_user``
settings correctly.
Args:
channel_id: The Slack channel ID
thread_ts: The thread timestamp (parent message ts)
user_id: The user ID of the sender
Returns:
True if there's an active session for this thread
"""
session_store = getattr(self, "_session_store", None)
if not session_store:
return False
try:
from gateway.session import SessionSource, build_session_key
# Build a SessionSource for this thread
from gateway.session import SessionSource
from gateway.config import Platform
source = SessionSource(
platform=Platform.SLACK,
chat_id=channel_id,
@@ -1274,21 +988,31 @@ class SlackAdapter(BasePlatformAdapter):
user_id=user_id,
thread_id=thread_ts,
)
# Read session isolation settings from the store's config
store_cfg = getattr(session_store, "config", None)
gspu = getattr(store_cfg, "group_sessions_per_user", True) if store_cfg else True
tspu = getattr(store_cfg, "thread_sessions_per_user", False) if store_cfg else False
session_key = build_session_key(
source,
group_sessions_per_user=gspu,
thread_sessions_per_user=tspu,
# Generate the session key using the same logic as SessionStore
# This mirrors the logic in build_session_key for group sessions
key_parts = ["agent:main", "slack", "group", channel_id, thread_ts]
# Include user_id if group_sessions_per_user is enabled
# We check the session store config if available
group_sessions_per_user = getattr(
session_store, "config", {}
)
if hasattr(group_sessions_per_user, "group_sessions_per_user"):
group_sessions_per_user = group_sessions_per_user.group_sessions_per_user
else:
group_sessions_per_user = True # Default
if group_sessions_per_user and user_id:
key_parts.append(str(user_id))
session_key = ":".join(key_parts)
# Check if the session exists in the store
session_store._ensure_loaded()
return session_key in session_store._entries
except Exception:
# If anything goes wrong, default to False (require mention)
return False
async def _download_slack_file(self, url: str, ext: str, audio: bool = False, team_id: str = "") -> str:
+1 -117
View File
@@ -153,8 +153,6 @@ class TelegramAdapter(BasePlatformAdapter):
self._dm_topics_config: List[Dict[str, Any]] = self.config.extra.get("dm_topics", [])
# Interactive model picker state per chat
self._model_picker_state: Dict[str, dict] = {}
# Approval button state: message_id → session_key
self._approval_state: Dict[int, str] = {}
def _fallback_ips(self) -> list[str]:
"""Return validated fallback IPs from config (populated by _apply_env_overrides)."""
@@ -1012,70 +1010,6 @@ class TelegramAdapter(BasePlatformAdapter):
logger.warning("[%s] send_update_prompt failed: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def send_exec_approval(
self, chat_id: str, command: str, session_key: str,
description: str = "dangerous command",
metadata: Optional[Dict[str, Any]] = None,
) -> SendResult:
"""Send an inline-keyboard approval prompt with interactive buttons.
The buttons call ``resolve_gateway_approval()`` to unblock the waiting
agent thread same mechanism as the text ``/approve`` flow.
"""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
cmd_preview = command[:3800] + "..." if len(command) > 3800 else command
text = (
f"⚠️ *Command Approval Required*\n\n"
f"`{cmd_preview}`\n\n"
f"Reason: {description}"
)
# Resolve thread context for thread replies
thread_id = None
if metadata:
thread_id = metadata.get("thread_id") or metadata.get("message_thread_id")
# We'll use the message_id as part of callback_data to look up session_key
# Send a placeholder first, then update — or use a counter.
# Simpler: use a monotonic counter to generate short IDs.
import itertools
if not hasattr(self, "_approval_counter"):
self._approval_counter = itertools.count(1)
approval_id = next(self._approval_counter)
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("✅ Allow Once", callback_data=f"ea:once:{approval_id}"),
InlineKeyboardButton("✅ Session", callback_data=f"ea:session:{approval_id}"),
],
[
InlineKeyboardButton("✅ Always", callback_data=f"ea:always:{approval_id}"),
InlineKeyboardButton("❌ Deny", callback_data=f"ea:deny:{approval_id}"),
],
])
kwargs: Dict[str, Any] = {
"chat_id": int(chat_id),
"text": text,
"parse_mode": ParseMode.MARKDOWN,
"reply_markup": keyboard,
}
if thread_id:
kwargs["message_thread_id"] = int(thread_id)
msg = await self._bot.send_message(**kwargs)
# Store session_key keyed by approval_id for the callback handler
self._approval_state[approval_id] = session_key
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
logger.warning("[%s] send_exec_approval failed: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def send_model_picker(
self,
chat_id: str,
@@ -1387,56 +1321,6 @@ class TelegramAdapter(BasePlatformAdapter):
await self._handle_model_picker_callback(query, data, chat_id)
return
# --- Exec approval callbacks (ea:choice:id) ---
if data.startswith("ea:"):
parts = data.split(":", 2)
if len(parts) == 3:
choice = parts[1] # once, session, always, deny
try:
approval_id = int(parts[2])
except (ValueError, IndexError):
await query.answer(text="Invalid approval data.")
return
session_key = self._approval_state.pop(approval_id, None)
if not session_key:
await query.answer(text="This approval has already been resolved.")
return
# Map choice to human-readable label
label_map = {
"once": "✅ Approved once",
"session": "✅ Approved for session",
"always": "✅ Approved permanently",
"deny": "❌ Denied",
}
user_display = getattr(query.from_user, "first_name", "User")
label = label_map.get(choice, "Resolved")
await query.answer(text=label)
# Edit message to show decision, remove buttons
try:
await query.edit_message_text(
text=f"{label} by {user_display}",
parse_mode=ParseMode.MARKDOWN,
reply_markup=None,
)
except Exception:
pass # non-fatal if edit fails
# Resolve the approval — unblocks the agent thread
try:
from tools.approval import resolve_gateway_approval
count = resolve_gateway_approval(session_key, choice)
logger.info(
"Telegram button resolved %d approval(s) for session %s (choice=%s, user=%s)",
count, session_key, choice, user_display,
)
except Exception as exc:
logger.error("Failed to resolve gateway approval from Telegram button: %s", exc)
return
# --- Update prompt callbacks ---
if not data.startswith("update_prompt:"):
return
@@ -1485,7 +1369,7 @@ class TelegramAdapter(BasePlatformAdapter):
with open(audio_path, "rb") as audio_file:
# .ogg files -> send as voice (round playable bubble)
if audio_path.endswith((".ogg", ".opus")):
if audio_path.endswith(".ogg") or audio_path.endswith(".opus"):
_voice_thread = metadata.get("thread_id") if metadata else None
msg = await self._bot.send_voice(
chat_id=int(chat_id),
+4 -2
View File
@@ -203,8 +203,10 @@ class WebhookAdapter(BasePlatformAdapter):
def _reload_dynamic_routes(self) -> None:
"""Reload agent-created subscriptions from disk if the file changed."""
from hermes_constants import get_hermes_home
hermes_home = get_hermes_home()
from pathlib import Path as _Path
hermes_home = _Path(
os.getenv("HERMES_HOME", str(_Path.home() / ".hermes"))
).expanduser()
subs_path = hermes_home / _DYNAMIC_ROUTES_FILENAME
if not subs_path.exists():
if self._dynamic_routes:
+2 -2
View File
@@ -653,7 +653,7 @@ class WeComAdapter(BasePlatformAdapter):
return ".png"
if data.startswith(b"\xff\xd8\xff"):
return ".jpg"
if data.startswith((b"GIF87a", b"GIF89a")):
if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
return ".gif"
if data.startswith(b"RIFF") and data[8:12] == b"WEBP":
return ".webp"
@@ -689,7 +689,7 @@ class WeComAdapter(BasePlatformAdapter):
@staticmethod
def _derive_message_type(body: Dict[str, Any], text: str, media_types: List[str]) -> MessageType:
"""Choose the normalized inbound message type."""
if any(mtype.startswith(("application/", "text/")) for mtype in media_types):
if any(mtype.startswith("application/") or mtype.startswith("text/") for mtype in media_types):
return MessageType.DOCUMENT
if any(mtype.startswith("image/") for mtype in media_types):
return MessageType.TEXT if text else MessageType.PHOTO
+1
View File
@@ -27,6 +27,7 @@ _IS_WINDOWS = platform.system() == "Windows"
from pathlib import Path
from typing import Dict, Optional, Any
from hermes_cli.config import get_hermes_home
from hermes_constants import get_hermes_dir
logger = logging.getLogger(__name__)
+10 -34
View File
@@ -24,6 +24,7 @@ import signal
import tempfile
import threading
import time
import uuid
from pathlib import Path
from datetime import datetime
from typing import Dict, Optional, Any, List
@@ -377,7 +378,7 @@ def _check_unavailable_skill(command_name: str) -> str | None:
)
# Check optional skills (shipped with repo but not installed)
from hermes_constants import get_optional_skills_dir
from hermes_constants import get_hermes_home, get_optional_skills_dir
repo_root = Path(__file__).resolve().parent.parent
optional_dir = get_optional_skills_dir(repo_root / "optional-skills")
if optional_dir.exists():
@@ -1857,11 +1858,6 @@ class GatewayRunner:
if _quick_key in self._running_agents and _stale_ts:
_stale_age = time.time() - _stale_ts
_stale_agent = self._running_agents.get(_quick_key)
# Never evict the pending sentinel — it was just placed moments
# ago during the async setup phase before the real agent is
# created. Sentinels have no get_activity_summary(), so the
# idle check below would always evaluate to inf >= timeout and
# immediately evict them, racing with the setup path.
_stale_idle = float("inf") # assume idle if we can't check
_stale_detail = ""
if _stale_agent and hasattr(_stale_agent, "get_activity_summary"):
@@ -1880,11 +1876,8 @@ class GatewayRunner:
# cases where the agent object was garbage-collected).
_wall_ttl = max(_raw_stale_timeout * 10, 7200) if _raw_stale_timeout > 0 else float("inf")
_should_evict = (
_stale_agent is not _AGENT_PENDING_SENTINEL
and (
(_raw_stale_timeout > 0 and _stale_idle >= _raw_stale_timeout)
or _stale_age > _wall_ttl
)
(_raw_stale_timeout > 0 and _stale_idle >= _raw_stale_timeout)
or _stale_age > _wall_ttl
)
if _should_evict:
logger.warning(
@@ -2821,7 +2814,7 @@ class GatewayRunner:
guessed, _ = _mimetypes.guess_type(path)
if guessed:
mtype = guessed
if not mtype.startswith(("application/", "text/")):
if not (mtype.startswith("application/") or mtype.startswith("text/")):
continue
# Extract display filename by stripping the doc_{uuid12}_ prefix
import os as _os
@@ -3908,7 +3901,7 @@ class GatewayRunner:
return f"🎭 Personality set to **{args}**\n_(takes effect on next message)_"
available = "`none`, " + ", ".join(f"`{n}`" for n in personalities)
available = "`none`, " + ", ".join(f"`{n}`" for n in personalities.keys())
return f"Unknown personality: `{args}`\n\nAvailable: {available}"
async def _handle_retry_command(self, event: MessageEvent) -> str:
@@ -4551,7 +4544,6 @@ class GatewayRunner:
provider_data_collection=pr.get("data_collection"),
session_id=task_id,
platform=platform_key,
user_id=source.user_id,
session_db=self._session_db,
fallback_model=self._fallback_model,
)
@@ -5321,6 +5313,9 @@ class GatewayRunner:
old_servers = set(_servers.keys())
# Read new config before shutting down, so we know what will be added/removed
new_config = _load_mcp_config()
new_server_names = set(new_config.keys())
# Shutdown existing connections
await loop.run_in_executor(None, shutdown_mcp_servers)
@@ -5408,6 +5403,7 @@ class GatewayRunner:
from tools.approval import (
resolve_gateway_approval, has_blocking_approval,
pending_approval_count,
)
if not has_blocking_approval(session_key):
@@ -5435,11 +5431,6 @@ class GatewayRunner:
if not count:
return "No pending command to approve."
# Resume typing indicator — agent is about to continue processing.
_adapter = self.adapters.get(source.platform)
if _adapter:
_adapter.resume_typing_for_chat(source.chat_id)
count_msg = f" ({count} commands)" if count > 1 else ""
logger.info("User approved %d dangerous command(s) via /approve%s", count, scope_msg)
return f"✅ Command{'s' if count > 1 else ''} approved{scope_msg}{count_msg}. The agent is resuming..."
@@ -5472,11 +5463,6 @@ class GatewayRunner:
if not count:
return "No pending command to deny."
# Resume typing indicator — agent continues (with BLOCKED result).
_adapter = self.adapters.get(source.platform)
if _adapter:
_adapter.resume_typing_for_chat(source.chat_id)
count_msg = f" ({count} commands)" if count > 1 else ""
logger.info("User denied %d dangerous command(s) via /deny", count)
return f"❌ Command{'s' if count > 1 else ''} denied{count_msg}."
@@ -6646,7 +6632,6 @@ class GatewayRunner:
provider_data_collection=pr.get("data_collection"),
session_id=session_id,
platform=platform_key,
user_id=source.user_id,
session_db=self._session_db,
fallback_model=self._fallback_model,
)
@@ -6771,15 +6756,6 @@ class GatewayRunner:
UX. Otherwise fall back to a plain text message with
``/approve`` instructions.
"""
# Pause the typing indicator while the agent waits for
# user approval. Critical for Slack's Assistant API where
# assistant_threads_setStatus disables the compose box — the
# user literally cannot type /approve while "is thinking..."
# is active. The approval message send auto-clears the Slack
# status; pausing prevents _keep_typing from re-setting it.
# Typing resumes in _handle_approve_command/_handle_deny_command.
_status_adapter.pause_typing_for_chat(_status_chat_id)
cmd = approval_data.get("command", "")
desc = approval_data.get("description", "dangerous command")
+1 -1
View File
@@ -128,7 +128,7 @@ class GatewayStreamConsumer:
got_done
or got_segment_break
or (elapsed >= self.cfg.edit_interval
and self._accumulated)
and len(self._accumulated) > 0)
or len(self._accumulated) >= self.cfg.buffer_threshold
)
+7 -67
View File
@@ -2279,21 +2279,14 @@ def _prompt_model_selection(
model_ids: List[str],
current_model: str = "",
pricing: Optional[Dict[str, Dict[str, str]]] = None,
unavailable_models: Optional[List[str]] = None,
portal_url: str = "",
) -> Optional[str]:
"""Interactive model selection. Puts current_model first with a marker. Returns chosen model ID or None.
If *pricing* is provided (``{model_id: {prompt, completion}}``), a compact
price indicator is shown next to each model in aligned columns.
If *unavailable_models* is provided, those models are shown grayed out
and unselectable, with an upgrade link to *portal_url*.
"""
from hermes_cli.models import _format_price_per_mtok
_unavailable = unavailable_models or []
# Reorder: current model first, then the rest (deduplicated)
ordered = []
if current_model and current_model in model_ids:
@@ -2302,12 +2295,9 @@ def _prompt_model_selection(
if mid not in ordered:
ordered.append(mid)
# All models for column-width computation (selectable + unavailable)
all_models = list(ordered) + list(_unavailable)
# Column-aligned labels when pricing is available
has_pricing = bool(pricing and any(pricing.get(m) for m in all_models))
name_col = max((len(m) for m in all_models), default=0) + 2 if has_pricing else 0
has_pricing = bool(pricing and any(pricing.get(m) for m in ordered))
name_col = max((len(m) for m in ordered), default=0) + 2 if has_pricing else 0
# Pre-compute formatted prices and dynamic column widths
_price_cache: dict[str, tuple[str, str, str]] = {}
@@ -2315,7 +2305,7 @@ def _prompt_model_selection(
cache_col = 0 # only set if any model has cache pricing
has_cache = False
if has_pricing:
for mid in all_models:
for mid in ordered:
p = pricing.get(mid) # type: ignore[union-attr]
if p:
inp = _format_price_per_mtok(p.get("prompt", ""))
@@ -2360,35 +2350,12 @@ def _prompt_model_selection(
header += f" {'Cache':>{cache_col}}"
menu_title += header + " /Mtok"
# ANSI escape for dim text
_DIM = "\033[2m"
_RESET = "\033[0m"
# Try arrow-key menu first, fall back to number input
try:
from simple_term_menu import TerminalMenu
choices = [f" {_label(mid)}" for mid in ordered]
choices.append(" Enter custom model name")
choices.append(" Skip (keep current)")
# Print the unavailable block BEFORE the menu via regular print().
# simple_term_menu pads title lines to terminal width (causes wrapping),
# so we keep the title minimal and use stdout for the static block.
# clear_screen=False means our printed output stays visible above.
_upgrade_url = (portal_url or DEFAULT_NOUS_PORTAL_URL).rstrip("/")
if _unavailable:
print(menu_title)
print()
for mid in _unavailable:
print(f"{_DIM} {_label(mid)}{_RESET}")
print()
print(f"{_DIM} ── Upgrade at {_upgrade_url} for paid models ──{_RESET}")
print()
effective_title = "Available free models:"
else:
effective_title = menu_title
menu = TerminalMenu(
choices,
cursor_index=default_idx,
@@ -2397,7 +2364,7 @@ def _prompt_model_selection(
menu_highlight_style=("fg_green",),
cycle_cursor=True,
clear_screen=False,
title=effective_title,
title=menu_title,
)
idx = menu.show()
if idx is None:
@@ -2420,13 +2387,6 @@ def _prompt_model_selection(
n = len(ordered)
print(f" {n + 1:>{num_width}}. Enter custom model name")
print(f" {n + 2:>{num_width}}. Skip (keep current)")
if _unavailable:
_upgrade_url = (portal_url or DEFAULT_NOUS_PORTAL_URL).rstrip("/")
print()
print(f" {_DIM}── Unavailable models (requires paid tier — upgrade at {_upgrade_url}) ──{_RESET}")
for mid in _unavailable:
print(f" {'':>{num_width}} {_DIM}{_label(mid)}{_RESET}")
print()
while True:
@@ -2839,6 +2799,7 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
)
inference_base_url = auth_state["inference_base_url"]
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
with _auth_store_lock():
auth_store = _load_auth_store()
@@ -2860,37 +2821,16 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
code="invalid_token",
)
from hermes_cli.models import (
_PROVIDER_MODELS, get_pricing_for_provider, filter_nous_free_models,
check_nous_free_tier, partition_nous_models_by_tier,
)
from hermes_cli.models import _PROVIDER_MODELS
model_ids = _PROVIDER_MODELS.get("nous", [])
print()
unavailable_models: list = []
if model_ids:
pricing = get_pricing_for_provider("nous")
model_ids = filter_nous_free_models(model_ids, pricing)
free_tier = check_nous_free_tier()
if free_tier:
model_ids, unavailable_models = partition_nous_models_by_tier(
model_ids, pricing, free_tier=True,
)
_portal = auth_state.get("portal_base_url", "")
if model_ids:
print(f"Showing {len(model_ids)} curated models — use \"Enter custom model name\" for others.")
selected_model = _prompt_model_selection(
model_ids, pricing=pricing,
unavailable_models=unavailable_models,
portal_url=_portal,
)
selected_model = _prompt_model_selection(model_ids)
if selected_model:
_save_model_choice(selected_model)
print(f"Default model set to: {selected_model}")
elif unavailable_models:
_url = (_portal or DEFAULT_NOUS_PORTAL_URL).rstrip("/")
print("No free models currently available.")
print(f"Upgrade at {_url} to access paid models.")
else:
print("No curated models available for Nous Portal.")
except Exception as exc:
+1
View File
@@ -18,6 +18,7 @@ from agent.credential_pool import (
STRATEGY_ROUND_ROBIN,
STRATEGY_RANDOM,
STRATEGY_LEAST_USED,
SUPPORTED_POOL_STRATEGIES,
PooledCredential,
_exhausted_until,
_normalize_custom_pool_name,
+1
View File
@@ -5,6 +5,7 @@ Pure display functions with no HermesCLI state dependency.
import json
import logging
import os
import shutil
import subprocess
import threading
+42 -1
View File
@@ -25,7 +25,7 @@ def clarify_callback(cli, question, choices):
timeout = CLI_CONFIG.get("clarify", {}).get("timeout", 120)
response_queue = queue.Queue()
is_open_ended = not choices
is_open_ended = not choices or len(choices) == 0
cli._clarify_state = {
"question": question,
@@ -63,6 +63,47 @@ def clarify_callback(cli, question, choices):
)
def sudo_password_callback(cli) -> str:
"""Prompt for sudo password through the TUI.
Sets up a password input area and blocks until the user responds.
"""
timeout = 45
response_queue = queue.Queue()
cli._sudo_state = {"response_queue": response_queue}
cli._sudo_deadline = _time.monotonic() + timeout
if hasattr(cli, "_app") and cli._app:
cli._app.invalidate()
while True:
try:
result = response_queue.get(timeout=1)
cli._sudo_state = None
cli._sudo_deadline = 0
if hasattr(cli, "_app") and cli._app:
cli._app.invalidate()
if result:
cprint(f"\n{_DIM} ✓ Password received (cached for session){_RST}")
else:
cprint(f"\n{_DIM} ⏭ Skipped{_RST}")
return result
except queue.Empty:
remaining = cli._sudo_deadline - _time.monotonic()
if remaining <= 0:
break
if hasattr(cli, "_app") and cli._app:
cli._app.invalidate()
cli._sudo_state = None
cli._sudo_deadline = 0
if hasattr(cli, "_app") and cli._app:
cli._app.invalidate()
cprint(f"\n{_DIM} ⏱ Timeout — continuing without sudo{_RST}")
return ""
def prompt_for_secret(cli, var_name: str, prompt: str, metadata=None) -> dict:
"""Prompt for a secret value through the TUI (e.g. API keys for skills).
+2
View File
@@ -10,6 +10,7 @@ Usage:
import importlib.util
import logging
import shutil
import sys
from datetime import datetime
from pathlib import Path
@@ -23,6 +24,7 @@ from hermes_cli.setup import (
print_info,
print_success,
print_error,
print_warning,
prompt_yes_no,
)
+22 -108
View File
@@ -1,4 +1,4 @@
"""Clipboard image extraction for macOS, Windows, Linux, and WSL2.
"""Clipboard image extraction for macOS, Linux, and WSL2.
Provides a single function `save_clipboard_image(dest)` that checks the
system clipboard for image data, saves it to *dest* as PNG, and returns
@@ -6,10 +6,9 @@ True on success. No external Python dependencies — uses only OS-level
CLI tools that ship with the platform (or are commonly installed).
Platform support:
macOS osascript (always available), pngpaste (if installed)
Windows PowerShell via .NET System.Windows.Forms.Clipboard
WSL2 powershell.exe via .NET System.Windows.Forms.Clipboard
Linux wl-paste (Wayland), xclip (X11)
macOS osascript (always available), pngpaste (if installed)
WSL2 powershell.exe via .NET System.Windows.Forms.Clipboard
Linux wl-paste (Wayland), xclip (X11)
"""
import base64
@@ -33,8 +32,6 @@ def save_clipboard_image(dest: Path) -> bool:
dest.parent.mkdir(parents=True, exist_ok=True)
if sys.platform == "darwin":
return _macos_save(dest)
if sys.platform == "win32":
return _windows_save(dest)
return _linux_save(dest)
@@ -45,8 +42,6 @@ def has_clipboard_image() -> bool:
"""
if sys.platform == "darwin":
return _macos_has_image()
if sys.platform == "win32":
return _windows_has_image()
if _is_wsl():
return _wsl_has_image()
if os.environ.get("WAYLAND_DISPLAY"):
@@ -117,104 +112,6 @@ def _macos_osascript(dest: Path) -> bool:
return False
# ── Shared PowerShell scripts (native Windows + WSL2) ─────────────────────
# .NET System.Windows.Forms.Clipboard — used by both native Windows (powershell)
# and WSL2 (powershell.exe) paths.
_PS_CHECK_IMAGE = (
"Add-Type -AssemblyName System.Windows.Forms;"
"[System.Windows.Forms.Clipboard]::ContainsImage()"
)
_PS_EXTRACT_IMAGE = (
"Add-Type -AssemblyName System.Windows.Forms;"
"Add-Type -AssemblyName System.Drawing;"
"$img = [System.Windows.Forms.Clipboard]::GetImage();"
"if ($null -eq $img) { exit 1 }"
"$ms = New-Object System.IO.MemoryStream;"
"$img.Save($ms, [System.Drawing.Imaging.ImageFormat]::Png);"
"[System.Convert]::ToBase64String($ms.ToArray())"
)
# ── Native Windows ────────────────────────────────────────────────────────
# Native Windows uses ``powershell`` (Windows PowerShell 5.1, always present)
# or ``pwsh`` (PowerShell 7+, optional). Discovery is cached per-process.
def _find_powershell() -> str | None:
"""Return the first available PowerShell executable, or None."""
for name in ("powershell", "pwsh"):
try:
r = subprocess.run(
[name, "-NoProfile", "-NonInteractive", "-Command", "echo ok"],
capture_output=True, text=True, timeout=5,
)
if r.returncode == 0 and "ok" in r.stdout:
return name
except FileNotFoundError:
continue
except Exception:
continue
return None
# Cache the resolved PowerShell executable (checked once per process)
_ps_exe: str | None | bool = False # False = not yet checked
def _get_ps_exe() -> str | None:
global _ps_exe
if _ps_exe is False:
_ps_exe = _find_powershell()
return _ps_exe
def _windows_has_image() -> bool:
"""Check if the Windows clipboard contains an image."""
ps = _get_ps_exe()
if ps is None:
return False
try:
r = subprocess.run(
[ps, "-NoProfile", "-NonInteractive", "-Command", _PS_CHECK_IMAGE],
capture_output=True, text=True, timeout=5,
)
return r.returncode == 0 and "True" in r.stdout
except Exception as e:
logger.debug("Windows clipboard image check failed: %s", e)
return False
def _windows_save(dest: Path) -> bool:
"""Extract clipboard image on native Windows via PowerShell → base64 PNG."""
ps = _get_ps_exe()
if ps is None:
logger.debug("No PowerShell found — Windows clipboard image paste unavailable")
return False
try:
r = subprocess.run(
[ps, "-NoProfile", "-NonInteractive", "-Command", _PS_EXTRACT_IMAGE],
capture_output=True, text=True, timeout=15,
)
if r.returncode != 0:
return False
b64_data = r.stdout.strip()
if not b64_data:
return False
png_bytes = base64.b64decode(b64_data)
dest.write_bytes(png_bytes)
return dest.exists() and dest.stat().st_size > 0
except Exception as e:
logger.debug("Windows clipboard image extraction failed: %s", e)
dest.unlink(missing_ok=True)
return False
# ── Linux ────────────────────────────────────────────────────────────────
def _is_wsl() -> bool:
@@ -245,7 +142,24 @@ def _linux_save(dest: Path) -> bool:
# ── WSL2 (powershell.exe) ────────────────────────────────────────────────
# Reuses _PS_CHECK_IMAGE / _PS_EXTRACT_IMAGE defined above.
# PowerShell script: get clipboard image as base64-encoded PNG on stdout.
# Using .NET System.Windows.Forms.Clipboard — always available on Windows.
_PS_CHECK_IMAGE = (
"Add-Type -AssemblyName System.Windows.Forms;"
"[System.Windows.Forms.Clipboard]::ContainsImage()"
)
_PS_EXTRACT_IMAGE = (
"Add-Type -AssemblyName System.Windows.Forms;"
"Add-Type -AssemblyName System.Drawing;"
"$img = [System.Windows.Forms.Clipboard]::GetImage();"
"if ($null -eq $img) { exit 1 }"
"$ms = New-Object System.IO.MemoryStream;"
"$img.Save($ms, [System.Drawing.Imaging.ImageFormat]::Png);"
"[System.Convert]::ToBase64String($ms.ToArray())"
)
def _wsl_has_image() -> bool:
"""Check if Windows clipboard has an image (via powershell.exe)."""
+4 -2
View File
@@ -294,8 +294,10 @@ def _resolve_config_gates() -> set[str]:
return set()
try:
import yaml
from hermes_constants import get_hermes_home
config_path = str(get_hermes_home() / "config.yaml")
config_path = os.path.join(
os.getenv("HERMES_HOME", os.path.expanduser("~/.hermes")),
"config.yaml",
)
if os.path.exists(config_path):
with open(config_path, encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
+1 -19
View File
@@ -1881,24 +1881,6 @@ def _normalize_max_turns_config(config: Dict[str, Any]) -> Dict[str, Any]:
def read_raw_config() -> Dict[str, Any]:
"""Read ~/.hermes/config.yaml as-is, without merging defaults or migrating.
Returns the raw YAML dict, or ``{}`` if the file doesn't exist or can't
be parsed. Use this for lightweight config reads where you just need a
single value and don't want the overhead of ``load_config()``'s deep-merge
+ migration pipeline.
"""
try:
config_path = get_config_path()
if config_path.exists():
with open(config_path, encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception:
pass
return {}
def load_config() -> Dict[str, Any]:
"""Load configuration from ~/.hermes/config.yaml."""
import copy
@@ -2538,7 +2520,7 @@ def set_config_value(key: str, value: str):
'TINKER_API_KEY',
]
if key.upper() in api_keys or key.upper().endswith(('_API_KEY', '_TOKEN')) or key.upper().startswith('TERMINAL_SSH'):
if key.upper() in api_keys or key.upper().endswith('_API_KEY') or key.upper().endswith('_TOKEN') or key.upper().startswith('TERMINAL_SSH'):
save_env_value(key.upper(), value)
print(f"✓ Set {key} in {get_env_path()}")
return
+2 -2
View File
@@ -920,8 +920,8 @@ def run_doctor(args):
pass
except ImportError:
pass
except Exception:
pass
except Exception as _e:
logger.debug("Profile health check failed: %s", _e)
# =========================================================================
# Summary
+1
View File
@@ -15,6 +15,7 @@ Usage examples::
hermes logs --since 30m -f # follow, starting 30 min ago
"""
import os
import re
import sys
import time
+26 -60
View File
@@ -1154,7 +1154,7 @@ def _model_flow_nous(config, current_model="", args=None):
from hermes_cli.auth import (
get_provider_auth_state, _prompt_model_selection, _save_model_choice,
_update_config_for_provider, resolve_nous_runtime_credentials,
AuthError, format_auth_error,
fetch_nous_models, AuthError, format_auth_error,
_login_nous, PROVIDER_REGISTRY,
)
from hermes_cli.config import get_env_value, save_config, save_env_value
@@ -1195,15 +1195,14 @@ def _model_flow_nous(config, current_model="", args=None):
# Already logged in — use curated model list (same as OpenRouter defaults).
# The live /models endpoint returns hundreds of models; the curated list
# shows only agentic models users recognize from OpenRouter.
from hermes_cli.models import (
_PROVIDER_MODELS, get_pricing_for_provider, filter_nous_free_models,
check_nous_free_tier, partition_nous_models_by_tier,
)
from hermes_cli.models import _PROVIDER_MODELS, get_pricing_for_provider
model_ids = _PROVIDER_MODELS.get("nous", [])
if not model_ids:
print("No curated models available for Nous Portal.")
return
print(f"Showing {len(model_ids)} curated models — use \"Enter custom model name\" for others.")
# Verify credentials are still valid (catches expired sessions early)
try:
creds = resolve_nous_runtime_credentials(min_key_ttl_seconds=5 * 60)
@@ -1229,44 +1228,7 @@ def _model_flow_nous(config, current_model="", args=None):
# Fetch live pricing (non-blocking — returns empty dict on failure)
pricing = get_pricing_for_provider("nous")
# Check if user is on free tier
free_tier = check_nous_free_tier()
# For both tiers: apply the allowlist filter first (removes non-allowlisted
# free models and allowlist models that aren't actually free).
# Then for free users: partition remaining models into selectable/unavailable.
model_ids = filter_nous_free_models(model_ids, pricing)
unavailable_models: list[str] = []
if free_tier:
model_ids, unavailable_models = partition_nous_models_by_tier(model_ids, pricing, free_tier=True)
if not model_ids and not unavailable_models:
print("No models available for Nous Portal after filtering.")
return
# Resolve portal URL for upgrade links (may differ on staging)
_nous_portal_url = ""
try:
_nous_state = get_provider_auth_state("nous")
if _nous_state:
_nous_portal_url = _nous_state.get("portal_base_url", "")
except Exception:
pass
if free_tier and not model_ids:
print("No free models currently available.")
if unavailable_models:
from hermes_cli.auth import DEFAULT_NOUS_PORTAL_URL
_url = (_nous_portal_url or DEFAULT_NOUS_PORTAL_URL).rstrip("/")
print(f"Upgrade at {_url} to access paid models.")
return
print(f"Showing {len(model_ids)} curated models — use \"Enter custom model name\" for others.")
selected = _prompt_model_selection(
model_ids, current_model=current_model, pricing=pricing,
unavailable_models=unavailable_models, portal_url=_nous_portal_url,
)
selected = _prompt_model_selection(model_ids, current_model=current_model, pricing=pricing)
if selected:
_save_model_choice(selected)
# Reactivate Nous as the provider and update config
@@ -1314,6 +1276,7 @@ def _model_flow_openai_codex(config, current_model=""):
PROVIDER_REGISTRY, DEFAULT_CODEX_BASE_URL,
)
from hermes_cli.codex_models import get_codex_model_ids
from hermes_cli.config import get_env_value, save_env_value
import argparse
status = get_codex_auth_status()
@@ -1366,7 +1329,7 @@ def _model_flow_custom(config):
so it appears in the provider menu on subsequent runs.
"""
from hermes_cli.auth import _save_model_choice, deactivate_provider
from hermes_cli.config import get_env_value, load_config, save_config
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
current_url = get_env_value("OPENAI_BASE_URL") or ""
current_key = get_env_value("OPENAI_API_KEY") or ""
@@ -1628,7 +1591,7 @@ def _model_flow_named_custom(config, provider_info):
Otherwise probes the endpoint's /models API to let the user pick one.
"""
from hermes_cli.auth import _save_model_choice, deactivate_provider
from hermes_cli.config import load_config, save_config
from hermes_cli.config import save_env_value, load_config, save_config
from hermes_cli.models import fetch_api_models
name = provider_info["name"]
@@ -1838,7 +1801,7 @@ def _model_flow_copilot(config, current_model=""):
deactivate_provider,
resolve_api_key_provider_credentials,
)
from hermes_cli.config import save_env_value, load_config, save_config
from hermes_cli.config import get_env_value, save_env_value, load_config, save_config
from hermes_cli.models import (
fetch_api_models,
fetch_github_model_catalog,
@@ -2429,6 +2392,8 @@ def _model_flow_anthropic(config, current_model=""):
)
from hermes_cli.models import _PROVIDER_MODELS
pconfig = PROVIDER_REGISTRY["anthropic"]
# Check ALL credential sources
existing_key = (
get_env_value("ANTHROPIC_TOKEN")
@@ -3697,7 +3662,7 @@ def cmd_update(args):
try:
from hermes_cli.gateway import (
is_macos, is_linux, _ensure_user_systemd_env,
find_gateway_pids,
get_systemd_linger_status, find_gateway_pids,
_get_service_pids,
)
import signal as _signal
@@ -3853,7 +3818,7 @@ def cmd_profile(args):
"""Profile management — create, delete, list, switch, alias."""
from hermes_cli.profiles import (
list_profiles, create_profile, delete_profile, seed_profile_skills,
set_active_profile, get_active_profile_name,
get_active_profile, set_active_profile, get_active_profile_name,
check_alias_collision, create_wrapper_script, remove_wrapper_script,
_is_wrapper_dir_in_path, _get_wrapper_dir,
)
@@ -3981,6 +3946,7 @@ def cmd_profile(args):
print(f" {name} chat Start chatting")
print(f" {name} gateway start Start the messaging gateway")
if clone or clone_all:
from hermes_constants import get_hermes_home
profile_dir_display = f"~/.hermes/profiles/{name}"
print(f"\n Edit {profile_dir_display}/.env for different API keys")
print(f" Edit {profile_dir_display}/SOUL.md for different personality")
@@ -4403,7 +4369,7 @@ For more help on a command:
gateway_uninstall.add_argument("--system", action="store_true", help="Target the Linux system-level gateway service")
# gateway setup
gateway_subparsers.add_parser("setup", help="Configure messaging platforms")
gateway_setup = gateway_subparsers.add_parser("setup", help="Configure messaging platforms")
gateway_parser.set_defaults(func=cmd_gateway)
@@ -4678,10 +4644,10 @@ For more help on a command:
config_subparsers = config_parser.add_subparsers(dest="config_command")
# config show (default)
config_subparsers.add_parser("show", help="Show current configuration")
config_show = config_subparsers.add_parser("show", help="Show current configuration")
# config edit
config_subparsers.add_parser("edit", help="Open config file in editor")
config_edit = config_subparsers.add_parser("edit", help="Open config file in editor")
# config set
config_set = config_subparsers.add_parser("set", help="Set a configuration value")
@@ -4689,16 +4655,16 @@ For more help on a command:
config_set.add_argument("value", nargs="?", help="Value to set")
# config path
config_subparsers.add_parser("path", help="Print config file path")
config_path = config_subparsers.add_parser("path", help="Print config file path")
# config env-path
config_subparsers.add_parser("env-path", help="Print .env file path")
config_env = config_subparsers.add_parser("env-path", help="Print .env file path")
# config check
config_subparsers.add_parser("check", help="Check for missing/outdated config")
config_check = config_subparsers.add_parser("check", help="Check for missing/outdated config")
# config migrate
config_subparsers.add_parser("migrate", help="Update config with new options")
config_migrate = config_subparsers.add_parser("migrate", help="Update config with new options")
config_parser.set_defaults(func=cmd_config)
@@ -4712,7 +4678,7 @@ For more help on a command:
)
pairing_sub = pairing_parser.add_subparsers(dest="pairing_action")
pairing_sub.add_parser("list", help="Show pending + approved users")
pairing_list_parser = pairing_sub.add_parser("list", help="Show pending + approved users")
pairing_approve_parser = pairing_sub.add_parser("approve", help="Approve a pairing code")
pairing_approve_parser.add_argument("platform", help="Platform name (telegram, discord, slack, whatsapp)")
@@ -4722,7 +4688,7 @@ For more help on a command:
pairing_revoke_parser.add_argument("platform", help="Platform name")
pairing_revoke_parser.add_argument("user_id", help="User ID to revoke")
pairing_sub.add_parser("clear-pending", help="Clear all pending codes")
pairing_clear_parser = pairing_sub.add_parser("clear-pending", help="Clear all pending codes")
def cmd_pairing(args):
from hermes_cli.pairing import pairing_command
@@ -4898,7 +4864,7 @@ For more help on a command:
memory_sub = memory_parser.add_subparsers(dest="memory_command")
memory_sub.add_parser("setup", help="Interactive provider selection and configuration")
memory_sub.add_parser("status", help="Show current memory provider config")
memory_sub.add_parser("off", help="Disable external provider (built-in only)")
memory_off_p = memory_sub.add_parser("off", help="Disable external provider (built-in only)")
def cmd_memory(args):
sub = getattr(args, "memory_command", None)
@@ -5062,7 +5028,7 @@ For more help on a command:
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_subparsers.add_parser("stats", help="Show session store statistics")
sessions_stats = sessions_subparsers.add_parser("stats", help="Show session store statistics")
sessions_rename = sessions_subparsers.add_parser("rename", help="Set or change a session's title")
sessions_rename.add_argument("session_id", help="Session ID to rename")
@@ -5422,7 +5388,7 @@ For more help on a command:
)
profile_subparsers = profile_parser.add_subparsers(dest="profile_action")
profile_subparsers.add_parser("list", help="List all profiles")
profile_list = profile_subparsers.add_parser("list", help="List all profiles")
profile_use = profile_subparsers.add_parser("use", help="Set sticky default profile")
profile_use.add_argument("profile_name", help="Profile name (or 'default')")
+4 -6
View File
@@ -12,8 +12,6 @@ import os
import sys
from pathlib import Path
from hermes_constants import get_hermes_home
# ---------------------------------------------------------------------------
# Curses-based interactive picker (same pattern as hermes tools)
@@ -277,7 +275,7 @@ def cmd_setup_provider(provider_name: str) -> None:
config["memory"] = {}
if hasattr(provider, "post_setup"):
hermes_home = str(get_hermes_home())
hermes_home = str(Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))))
provider.post_setup(hermes_home, config)
return
@@ -328,7 +326,7 @@ def cmd_setup(args) -> None:
# If the provider has a post_setup hook, delegate entirely to it.
# The hook handles its own config, connection test, and activation.
if hasattr(provider, "post_setup"):
hermes_home = str(get_hermes_home())
hermes_home = str(Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))))
provider.post_setup(hermes_home, config)
return
@@ -338,7 +336,7 @@ def cmd_setup(args) -> None:
if not isinstance(provider_config, dict):
provider_config = {}
env_path = get_hermes_home() / ".env"
env_path = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) / ".env"
env_writes = {}
if schema:
@@ -402,7 +400,7 @@ def cmd_setup(args) -> None:
save_config(config)
# Write non-secret config to provider's native location
hermes_home = str(get_hermes_home())
hermes_home = str(Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))))
if provider_config and hasattr(provider, "save_config"):
try:
provider.save_config(provider_config, hermes_home)
+7 -1
View File
@@ -21,16 +21,22 @@ OpenRouter variant suffixes (``:free``, ``:extended``, ``:fast``).
from __future__ import annotations
import logging
from dataclasses import dataclass
from dataclasses import dataclass, field
from typing import List, NamedTuple, Optional
from hermes_cli.providers import (
ALIASES,
LABELS,
TRANSPORT_TO_API_MODE,
determine_api_mode,
get_label,
get_provider,
is_aggregator,
normalize_provider,
resolve_provider_full,
)
from hermes_cli.model_normalize import (
detect_vendor,
normalize_model_for_provider,
)
from agent.models_dev import (
+6 -198
View File
@@ -44,7 +44,7 @@ OPENROUTER_MODELS: list[tuple[str, str]] = [
("stepfun/step-3.5-flash", ""),
("minimax/minimax-m2.7", ""),
("minimax/minimax-m2.5", ""),
("z-ai/glm-5.1", ""),
("z-ai/glm-5", ""),
("z-ai/glm-5-turbo", ""),
("moonshotai/kimi-k2.5", ""),
("x-ai/grok-4.20-beta", ""),
@@ -75,7 +75,7 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"stepfun/step-3.5-flash",
"minimax/minimax-m2.7",
"minimax/minimax-m2.5",
"z-ai/glm-5.1",
"z-ai/glm-5",
"z-ai/glm-5-turbo",
"moonshotai/kimi-k2.5",
"x-ai/grok-4.20-beta",
@@ -265,202 +265,6 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
],
}
# ---------------------------------------------------------------------------
# Nous Portal free-model filtering
# ---------------------------------------------------------------------------
# Models that are ALLOWED to appear when priced as free on Nous Portal.
# Any other free model is hidden — prevents promotional/temporary free models
# from cluttering the selection when users are paying subscribers.
# Models in this list are ALSO filtered out if they are NOT free (i.e. they
# should only appear in the menu when they are genuinely free).
_NOUS_ALLOWED_FREE_MODELS: frozenset[str] = frozenset({
"xiaomi/mimo-v2-pro",
"xiaomi/mimo-v2-omni",
})
def _is_model_free(model_id: str, pricing: dict[str, dict[str, str]]) -> bool:
"""Return True if *model_id* has zero-cost prompt AND completion pricing."""
p = pricing.get(model_id)
if not p:
return False
try:
return float(p.get("prompt", "1")) == 0 and float(p.get("completion", "1")) == 0
except (TypeError, ValueError):
return False
def filter_nous_free_models(
model_ids: list[str],
pricing: dict[str, dict[str, str]],
) -> list[str]:
"""Filter the Nous Portal model list according to free-model policy.
Rules:
Paid models that are NOT in the allowlist keep (normal case).
Free models that are NOT in the allowlist drop.
Allowlist models that ARE free keep.
Allowlist models that are NOT free drop.
"""
if not pricing:
return model_ids # no pricing data — can't filter, show everything
result: list[str] = []
for mid in model_ids:
free = _is_model_free(mid, pricing)
if mid in _NOUS_ALLOWED_FREE_MODELS:
# Allowlist model: only show when it's actually free
if free:
result.append(mid)
else:
# Regular model: keep only when it's NOT free
if not free:
result.append(mid)
return result
# ---------------------------------------------------------------------------
# Nous Portal account tier detection
# ---------------------------------------------------------------------------
def fetch_nous_account_tier(access_token: str, portal_base_url: str = "") -> dict[str, Any]:
"""Fetch the user's Nous Portal account/subscription info.
Calls ``<portal>/api/oauth/account`` with the OAuth access token.
Returns the parsed JSON dict on success, e.g.::
{
"subscription": {
"plan": "Plus",
"tier": 2,
"monthly_charge": 20,
"credits_remaining": 1686.60,
...
},
...
}
Returns an empty dict on any failure (network, auth, parse).
"""
base = (portal_base_url or "https://portal.nousresearch.com").rstrip("/")
url = f"{base}/api/oauth/account"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=8) as resp:
return json.loads(resp.read().decode())
except Exception:
return {}
def is_nous_free_tier(account_info: dict[str, Any]) -> bool:
"""Return True if the account info indicates a free (unpaid) tier.
Checks ``subscription.monthly_charge == 0``. Returns False when
the field is missing or unparseable (assumes paid don't block users).
"""
sub = account_info.get("subscription")
if not isinstance(sub, dict):
return False
charge = sub.get("monthly_charge")
if charge is None:
return False
try:
return float(charge) == 0
except (TypeError, ValueError):
return False
def partition_nous_models_by_tier(
model_ids: list[str],
pricing: dict[str, dict[str, str]],
free_tier: bool,
) -> tuple[list[str], list[str]]:
"""Split Nous models into (selectable, unavailable) based on user tier.
For paid-tier users: all models are selectable, none unavailable
(free-model filtering is handled separately by ``filter_nous_free_models``).
For free-tier users: only free models are selectable; paid models
are returned as unavailable (shown grayed out in the menu).
"""
if not free_tier:
return (model_ids, [])
if not pricing:
return (model_ids, []) # can't determine, show everything
selectable: list[str] = []
unavailable: list[str] = []
for mid in model_ids:
if _is_model_free(mid, pricing):
selectable.append(mid)
else:
unavailable.append(mid)
return (selectable, unavailable)
# ---------------------------------------------------------------------------
# TTL cache for free-tier detection — avoids repeated API calls within a
# session while still picking up upgrades quickly.
# ---------------------------------------------------------------------------
_FREE_TIER_CACHE_TTL: int = 180 # seconds (3 minutes)
_free_tier_cache: tuple[bool, float] | None = None # (result, timestamp)
def clear_nous_free_tier_cache() -> None:
"""Invalidate the cached free-tier result (e.g. after login/logout)."""
global _free_tier_cache
_free_tier_cache = None
def check_nous_free_tier() -> bool:
"""Check if the current Nous Portal user is on a free (unpaid) tier.
Results are cached for ``_FREE_TIER_CACHE_TTL`` seconds to avoid
hitting the Portal API on every call. The cache is short-lived so
that an account upgrade is reflected within a few minutes.
Returns False (assume paid) on any error never blocks paying users.
"""
global _free_tier_cache
import time
now = time.monotonic()
if _free_tier_cache is not None:
cached_result, cached_at = _free_tier_cache
if now - cached_at < _FREE_TIER_CACHE_TTL:
return cached_result
try:
from hermes_cli.auth import get_provider_auth_state, resolve_nous_runtime_credentials
# Ensure we have a fresh token (triggers refresh if needed)
resolve_nous_runtime_credentials(min_key_ttl_seconds=60)
state = get_provider_auth_state("nous")
if not state:
_free_tier_cache = (False, now)
return False
access_token = state.get("access_token", "")
portal_url = state.get("portal_base_url", "")
if not access_token:
_free_tier_cache = (False, now)
return False
account_info = fetch_nous_account_tier(access_token, portal_url)
result = is_nous_free_tier(account_info)
_free_tier_cache = (result, now)
return result
except Exception:
_free_tier_cache = (False, now)
return False # default to paid on error — don't block users
_PROVIDER_LABELS = {
"openrouter": "OpenRouter",
"openai-codex": "OpenAI Codex",
@@ -1131,6 +935,10 @@ def _payload_items(payload: Any) -> list[dict[str, Any]]:
return []
def _extract_model_ids(payload: Any) -> list[str]:
return [item.get("id", "") for item in _payload_items(payload) if item.get("id")]
def copilot_default_headers() -> dict[str, str]:
"""Standard headers for Copilot API requests.
+2 -2
View File
@@ -38,7 +38,6 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Set, Union
from hermes_constants import get_hermes_home
from utils import env_var_enabled
try:
@@ -259,7 +258,8 @@ class PluginManager:
manifests: List[PluginManifest] = []
# 1. User plugins (~/.hermes/plugins/)
user_dir = get_hermes_home() / "plugins"
hermes_home = os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))
user_dir = Path(hermes_home) / "plugins"
manifests.extend(self._scan_directory(user_dir, source="user"))
# 2. Project plugins (./.hermes/plugins/)
+3 -4
View File
@@ -16,8 +16,6 @@ import subprocess
import sys
from pathlib import Path
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Minimum manifest version this installer understands.
@@ -28,7 +26,8 @@ _SUPPORTED_MANIFEST_VERSION = 1
def _plugins_dir() -> Path:
"""Return the user plugins directory, creating it if needed."""
plugins = get_hermes_home() / "plugins"
hermes_home = os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))
plugins = Path(hermes_home) / "plugins"
plugins.mkdir(parents=True, exist_ok=True)
return plugins
@@ -295,7 +294,7 @@ def cmd_install(identifier: str, force: bool = False) -> None:
sys.exit(1)
# Warn about insecure / local URL schemes
if git_url.startswith(("http://", "file://")):
if git_url.startswith("http://") or git_url.startswith("file://"):
console.print(
"[yellow]Warning:[/yellow] Using insecure/local URL scheme. "
"Consider using https:// or git@ for production installs."
+2 -1
View File
@@ -26,7 +26,7 @@ import shutil
import stat
import subprocess
import sys
from dataclasses import dataclass
from dataclasses import dataclass, field
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import List, Optional
@@ -517,6 +517,7 @@ def delete_profile(name: str, yes: bool = False) -> Path:
]
# Check for service
from hermes_cli.gateway import _profile_suffix, get_service_name
wrapper_path = _get_wrapper_dir() / name
has_wrapper = wrapper_path.exists()
if has_wrapper:
+22 -1
View File
@@ -20,7 +20,8 @@ Other modules import from this file. No parallel registries.
from __future__ import annotations
import logging
from dataclasses import dataclass
import os
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
@@ -344,6 +345,26 @@ def get_label(provider_id: str) -> str:
return canonical
# Build LABELS dict for backward compat
def _build_labels() -> Dict[str, str]:
"""Build labels dict from overlays + overrides. Lazy, cached."""
labels: Dict[str, str] = {}
for pid in HERMES_OVERLAYS:
labels[pid] = get_label(pid)
labels.update(_LABEL_OVERRIDES)
return labels
# Lazy-built on first access
_labels_cache: Optional[Dict[str, str]] = None
@property
def LABELS() -> Dict[str, str]:
"""Backward-compatible labels dict."""
global _labels_cache
if _labels_cache is None:
_labels_cache = _build_labels()
return _labels_cache
# For direct import compat, expose as module-level dict
# Built on demand by get_label() calls
LABELS: Dict[str, str] = {
+25
View File
@@ -21,6 +21,7 @@ from typing import Optional, Dict, Any
from hermes_cli.nous_subscription import (
apply_nous_provider_defaults,
get_nous_subscription_explainer_lines,
get_nous_subscription_features,
)
from tools.tool_backend_helpers import managed_nous_tools_enabled
@@ -42,6 +43,18 @@ def _model_config_dict(config: Dict[str, Any]) -> Dict[str, Any]:
return {}
def _set_model_provider(
config: Dict[str, Any], provider_id: str, base_url: str = ""
) -> None:
model_cfg = _model_config_dict(config)
model_cfg["provider"] = provider_id
if base_url:
model_cfg["base_url"] = base_url.rstrip("/")
else:
model_cfg.pop("base_url", None)
config["model"] = model_cfg
def _set_default_model(config: Dict[str, Any], model_name: str) -> None:
if not model_name:
return
@@ -314,6 +327,16 @@ def _setup_provider_model_selection(config, provider_id, current_model, prompt_c
config["model"] = model_cfg
def _sync_model_from_disk(config: Dict[str, Any]) -> None:
disk_model = load_config().get("model")
if isinstance(disk_model, dict):
model_cfg = _model_config_dict(config)
model_cfg.update(disk_model)
config["model"] = model_cfg
elif isinstance(disk_model, str) and disk_model.strip():
_set_default_model(config, disk_model.strip())
# Import config helpers
from hermes_cli.config import (
get_hermes_home,
@@ -1325,6 +1348,8 @@ def setup_terminal_backend(config: dict):
terminal_choices.append(f"Keep current ({current_backend})")
idx_to_backend[keep_current_idx] = current_backend
default_terminal = backend_to_idx.get(current_backend, 0)
terminal_idx = prompt_choice(
"Select terminal backend:", terminal_choices, keep_current_idx
)
+1
View File
@@ -96,6 +96,7 @@ Activate with ``/skin <name>`` in the CLI or ``display.skin: <name>`` in config.
"""
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
+16
View File
@@ -61,6 +61,22 @@ def _prompt(question: str, default: str = None, password: bool = False) -> str:
print()
return default or ""
def _prompt_yes_no(question: str, default: bool = True) -> bool:
default_str = "Y/n" if default else "y/N"
while True:
try:
value = input(color(f"{question} [{default_str}]: ", Colors.YELLOW)).strip().lower()
except (KeyboardInterrupt, EOFError):
print()
return default
if not value:
return default
if value in ('y', 'yes'):
return True
if value in ('n', 'no'):
return False
# ─── Toolset Registry ─────────────────────────────────────────────────────────
# Toolsets shown in the configurator, grouped for display.
+5
View File
@@ -6,6 +6,7 @@ Provides options for:
- Keep data: Remove code but keep ~/.hermes/ (configs, sessions, logs)
"""
import os
import shutil
import subprocess
from pathlib import Path
@@ -23,6 +24,10 @@ def log_success(msg: str):
def log_warn(msg: str):
print(f"{color('', Colors.YELLOW)} {msg}")
def log_error(msg: str):
print(f"{color('', Colors.RED)} {msg}")
def get_project_root() -> Path:
"""Get the project installation directory."""
return Path(__file__).parent.parent.resolve()
+4 -3
View File
@@ -16,7 +16,7 @@ import re
import secrets
import time
from pathlib import Path
from typing import Dict
from typing import Dict, Optional
from hermes_constants import display_hermes_home
@@ -25,8 +25,9 @@ _SUBSCRIPTIONS_FILENAME = "webhook_subscriptions.json"
def _hermes_home() -> Path:
from hermes_constants import get_hermes_home
return get_hermes_home()
return Path(
os.getenv("HERMES_HOME", str(Path.home() / ".hermes"))
).expanduser()
def _subscriptions_path() -> Path:
+1
View File
@@ -13,6 +13,7 @@ secrets are never written to disk.
"""
import logging
import os
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import Optional
+1
View File
@@ -16,6 +16,7 @@ Key design decisions:
import json
import logging
import os
import random
import re
import sqlite3
+2
View File
@@ -16,6 +16,7 @@ crashes due to a bad timezone string.
import logging
import os
from datetime import datetime
from pathlib import Path
from hermes_constants import get_hermes_home
from typing import Optional
@@ -91,6 +92,7 @@ def get_timezone() -> Optional[ZoneInfo]:
def get_timezone_name() -> str:
"""Return the IANA name of the configured timezone, or empty string."""
global _cached_tz_name, _cache_resolved
if not _cache_resolved:
get_timezone() # populates cache
return _cached_tz_name or ""
+2 -1
View File
@@ -37,8 +37,9 @@ import sys
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
logger = logging.getLogger("hermes.mcp_serve")
+7 -7
View File
@@ -23,11 +23,11 @@ import os
import shutil
import subprocess
import threading
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -321,7 +321,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
return self._tool_curate(args)
elif tool_name == "brv_status":
return self._tool_status()
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def shutdown(self) -> None:
if self._sync_thread and self._sync_thread.is_alive():
@@ -332,7 +332,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
def _tool_query(self, args: dict) -> str:
query = args.get("query", "")
if not query:
return tool_error("query is required")
return json.dumps({"error": "query is required"})
result = _run_brv(
["query", "--", query.strip()[:5000]],
@@ -340,7 +340,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
)
if not result["success"]:
return tool_error(result.get("error", "Query failed"))
return json.dumps({"error": result.get("error", "Query failed")})
output = result.get("output", "").strip()
if not output or len(output) < _MIN_OUTPUT_LEN:
@@ -355,7 +355,7 @@ class ByteRoverMemoryProvider(MemoryProvider):
def _tool_curate(self, args: dict) -> str:
content = args.get("content", "")
if not content:
return tool_error("content is required")
return json.dumps({"error": "content is required"})
result = _run_brv(
["curate", "--", content],
@@ -363,14 +363,14 @@ class ByteRoverMemoryProvider(MemoryProvider):
)
if not result["success"]:
return tool_error(result.get("error", "Curate failed"))
return json.dumps({"error": result.get("error", "Curate failed")})
return json.dumps({"result": "Memory curated successfully."})
def _tool_status(self) -> str:
result = _run_brv(["status"], timeout=15, cwd=self._cwd)
if not result["success"]:
return tool_error(result.get("error", "Status check failed"))
return json.dumps({"error": result.get("error", "Status check failed")})
return json.dumps({"status": result.get("output", "")})
+10 -10
View File
@@ -26,7 +26,6 @@ import threading
from typing import Any, Dict, List
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -291,7 +290,8 @@ class HindsightMemoryProvider(MemoryProvider):
if self._mode == "local":
def _start_daemon():
import traceback
log_dir = get_hermes_home() / "logs"
from pathlib import Path
log_dir = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "hindsight-embed.log"
try:
@@ -434,12 +434,12 @@ class HindsightMemoryProvider(MemoryProvider):
client = self._get_client()
except Exception as e:
logger.warning("Hindsight client init failed: %s", e)
return tool_error(f"Hindsight client unavailable: {e}")
return json.dumps({"error": f"Hindsight client unavailable: {e}"})
if tool_name == "hindsight_retain":
content = args.get("content", "")
if not content:
return tool_error("Missing required parameter: content")
return json.dumps({"error": "Missing required parameter: content"})
context = args.get("context")
try:
_run_sync(client.aretain(
@@ -448,12 +448,12 @@ class HindsightMemoryProvider(MemoryProvider):
return json.dumps({"result": "Memory stored successfully."})
except Exception as e:
logger.warning("hindsight_retain failed: %s", e)
return tool_error(f"Failed to store memory: {e}")
return json.dumps({"error": f"Failed to store memory: {e}"})
elif tool_name == "hindsight_recall":
query = args.get("query", "")
if not query:
return tool_error("Missing required parameter: query")
return json.dumps({"error": "Missing required parameter: query"})
try:
resp = _run_sync(client.arecall(
bank_id=self._bank_id, query=query, budget=self._budget
@@ -464,12 +464,12 @@ class HindsightMemoryProvider(MemoryProvider):
return json.dumps({"result": "\n".join(lines)})
except Exception as e:
logger.warning("hindsight_recall failed: %s", e)
return tool_error(f"Failed to search memory: {e}")
return json.dumps({"error": f"Failed to search memory: {e}"})
elif tool_name == "hindsight_reflect":
query = args.get("query", "")
if not query:
return tool_error("Missing required parameter: query")
return json.dumps({"error": "Missing required parameter: query"})
try:
resp = _run_sync(client.areflect(
bank_id=self._bank_id, query=query, budget=self._budget
@@ -477,9 +477,9 @@ class HindsightMemoryProvider(MemoryProvider):
return json.dumps({"result": resp.text or "No relevant memories found."})
except Exception as e:
logger.warning("hindsight_reflect failed: %s", e)
return tool_error(f"Failed to reflect: {e}")
return json.dumps({"error": f"Failed to reflect: {e}"})
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def shutdown(self) -> None:
global _loop, _loop_thread
+8 -8
View File
@@ -20,10 +20,10 @@ from __future__ import annotations
import json
import logging
import re
from pathlib import Path
from typing import Any, Dict, List
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
from .store import MemoryStore
from .retrieval import FactRetriever
@@ -231,7 +231,7 @@ class HolographicMemoryProvider(MemoryProvider):
return self._handle_fact_store(args)
elif tool_name == "fact_feedback":
return self._handle_fact_feedback(args)
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def on_session_end(self, messages: List[Dict[str, Any]]) -> None:
if not self._config.get("auto_extract", False):
@@ -297,7 +297,7 @@ class HolographicMemoryProvider(MemoryProvider):
elif action == "reason":
entities = args.get("entities", [])
if not entities:
return tool_error("reason requires 'entities' list")
return json.dumps({"error": "reason requires 'entities' list"})
results = retriever.reason(
entities,
category=args.get("category"),
@@ -335,12 +335,12 @@ class HolographicMemoryProvider(MemoryProvider):
return json.dumps({"facts": facts, "count": len(facts)})
else:
return tool_error(f"Unknown action: {action}")
return json.dumps({"error": f"Unknown action: {action}"})
except KeyError as exc:
return tool_error(f"Missing required argument: {exc}")
return json.dumps({"error": f"Missing required argument: {exc}"})
except Exception as exc:
return tool_error(str(exc))
return json.dumps({"error": str(exc)})
def _handle_fact_feedback(self, args: dict) -> str:
try:
@@ -349,9 +349,9 @@ class HolographicMemoryProvider(MemoryProvider):
result = self._store.record_feedback(fact_id, helpful=helpful)
return json.dumps(result)
except KeyError as exc:
return tool_error(f"Missing required argument: {exc}")
return json.dumps({"error": f"Missing required argument: {exc}"})
except Exception as exc:
return tool_error(str(exc))
return json.dumps({"error": str(exc)})
# -- Auto-extraction (on_session_end) ------------------------------------
+1
View File
@@ -6,6 +6,7 @@ Single-user Hermes memory store plugin.
import re
import sqlite3
import threading
from datetime import datetime
from pathlib import Path
try:
+10 -16
View File
@@ -18,10 +18,10 @@ from __future__ import annotations
import json
import logging
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -217,12 +217,6 @@ class HonchoMemoryProvider(MemoryProvider):
logger.debug("Honcho not configured — plugin inactive")
return
# Override peer_name with gateway user_id for per-user memory scoping.
# CLI sessions won't have user_id, so the config default is preserved.
_gw_user_id = kwargs.get("user_id")
if _gw_user_id:
cfg.peer_name = _gw_user_id
self._config = cfg
# ----- B1: recall_mode from config -----
@@ -639,15 +633,15 @@ class HonchoMemoryProvider(MemoryProvider):
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
"""Handle a Honcho tool call, with lazy session init for tools-only mode."""
if self._cron_skipped:
return tool_error("Honcho is not active (cron context).")
return json.dumps({"error": "Honcho is not active (cron context)."})
# Port #1957: ensure session is initialized for tools-only mode
if not self._session_initialized:
if not self._ensure_session():
return tool_error("Honcho session could not be initialized.")
return json.dumps({"error": "Honcho session could not be initialized."})
if not self._manager or not self._session_key:
return tool_error("Honcho is not active for this session.")
return json.dumps({"error": "Honcho is not active for this session."})
try:
if tool_name == "honcho_profile":
@@ -659,7 +653,7 @@ class HonchoMemoryProvider(MemoryProvider):
elif tool_name == "honcho_search":
query = args.get("query", "")
if not query:
return tool_error("Missing required parameter: query")
return json.dumps({"error": "Missing required parameter: query"})
max_tokens = min(int(args.get("max_tokens", 800)), 2000)
result = self._manager.search_context(
self._session_key, query, max_tokens=max_tokens
@@ -671,7 +665,7 @@ class HonchoMemoryProvider(MemoryProvider):
elif tool_name == "honcho_context":
query = args.get("query", "")
if not query:
return tool_error("Missing required parameter: query")
return json.dumps({"error": "Missing required parameter: query"})
peer = args.get("peer", "user")
result = self._manager.dialectic_query(
self._session_key, query, peer=peer
@@ -681,17 +675,17 @@ class HonchoMemoryProvider(MemoryProvider):
elif tool_name == "honcho_conclude":
conclusion = args.get("conclusion", "")
if not conclusion:
return tool_error("Missing required parameter: conclusion")
return json.dumps({"error": "Missing required parameter: conclusion"})
ok = self._manager.create_conclusion(self._session_key, conclusion)
if ok:
return json.dumps({"result": f"Conclusion saved: {conclusion}"})
return tool_error("Failed to save conclusion.")
return json.dumps({"error": "Failed to save conclusion."})
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
except Exception as e:
logger.error("Honcho tool %s failed: %s", tool_name, e)
return tool_error(f"Honcho {tool_name} failed: {e}")
return json.dumps({"error": f"Honcho {tool_name} failed: {e}"})
def shutdown(self) -> None:
for t in (self._prefetch_thread, self._sync_thread):
+2 -1
View File
@@ -11,7 +11,7 @@ import sys
from pathlib import Path
from hermes_constants import get_hermes_home
from plugins.memory.honcho.client import resolve_active_host, resolve_config_path, HOST
from plugins.memory.honcho.client import resolve_active_host, resolve_config_path, GLOBAL_CONFIG_PATH, HOST
def clone_honcho_for_profile(profile_name: str) -> bool:
@@ -1220,6 +1220,7 @@ def register_cli(subparser) -> None:
Called by the plugin CLI registration system during argparse setup.
The *subparser* is the parser for ``hermes honcho``.
"""
import argparse
subparser.add_argument(
"--target-profile", metavar="NAME", dest="target_profile",
+9 -11
View File
@@ -20,10 +20,10 @@ import logging
import os
import threading
import time
from pathlib import Path
from typing import Any, Dict, List
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -203,9 +203,7 @@ class Mem0MemoryProvider(MemoryProvider):
def initialize(self, session_id: str, **kwargs) -> None:
self._config = _load_config()
self._api_key = self._config.get("api_key", "")
# Prefer gateway-provided user_id for per-user memory scoping;
# fall back to config/env default for CLI (single-user) sessions.
self._user_id = kwargs.get("user_id") or self._config.get("user_id", "hermes-user")
self._user_id = self._config.get("user_id", "hermes-user")
self._agent_id = self._config.get("agent_id", "hermes")
self._rerank = self._config.get("rerank", True)
@@ -306,7 +304,7 @@ class Mem0MemoryProvider(MemoryProvider):
try:
client = self._get_client()
except Exception as e:
return tool_error(str(e))
return json.dumps({"error": str(e)})
if tool_name == "mem0_profile":
try:
@@ -318,12 +316,12 @@ class Mem0MemoryProvider(MemoryProvider):
return json.dumps({"result": "\n".join(lines), "count": len(lines)})
except Exception as e:
self._record_failure()
return tool_error(f"Failed to fetch profile: {e}")
return json.dumps({"error": f"Failed to fetch profile: {e}"})
elif tool_name == "mem0_search":
query = args.get("query", "")
if not query:
return tool_error("Missing required parameter: query")
return json.dumps({"error": "Missing required parameter: query"})
rerank = args.get("rerank", False)
top_k = min(int(args.get("top_k", 10)), 50)
try:
@@ -340,12 +338,12 @@ class Mem0MemoryProvider(MemoryProvider):
return json.dumps({"results": items, "count": len(items)})
except Exception as e:
self._record_failure()
return tool_error(f"Search failed: {e}")
return json.dumps({"error": f"Search failed: {e}"})
elif tool_name == "mem0_conclude":
conclusion = args.get("conclusion", "")
if not conclusion:
return tool_error("Missing required parameter: conclusion")
return json.dumps({"error": "Missing required parameter: conclusion"})
try:
client.add(
[{"role": "user", "content": conclusion}],
@@ -356,9 +354,9 @@ class Mem0MemoryProvider(MemoryProvider):
return json.dumps({"result": "Fact stored."})
except Exception as e:
self._record_failure()
return tool_error(f"Failed to store: {e}")
return json.dumps({"error": f"Failed to store: {e}"})
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def shutdown(self) -> None:
for t in (self._prefetch_thread, self._sync_thread):
+7 -8
View File
@@ -31,7 +31,6 @@ import threading
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -462,7 +461,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
if not self._client:
return tool_error("OpenViking server not connected")
return json.dumps({"error": "OpenViking server not connected"})
try:
if tool_name == "viking_search":
@@ -475,9 +474,9 @@ class OpenVikingMemoryProvider(MemoryProvider):
return self._tool_remember(args)
elif tool_name == "viking_add_resource":
return self._tool_add_resource(args)
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
except Exception as e:
return tool_error(str(e))
return json.dumps({"error": str(e)})
def shutdown(self) -> None:
# Wait for background threads to finish
@@ -494,7 +493,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
def _tool_search(self, args: dict) -> str:
query = args.get("query", "")
if not query:
return tool_error("query is required")
return json.dumps({"error": "query is required"})
payload: Dict[str, Any] = {"query": query}
mode = args.get("mode", "auto")
@@ -531,7 +530,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
def _tool_read(self, args: dict) -> str:
uri = args.get("uri", "")
if not uri:
return tool_error("uri is required")
return json.dumps({"error": "uri is required"})
level = args.get("level", "overview")
# Map our level names to OpenViking GET endpoints
@@ -583,7 +582,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
def _tool_remember(self, args: dict) -> str:
content = args.get("content", "")
if not content:
return tool_error("content is required")
return json.dumps({"error": "content is required"})
# Store as a session message that will be extracted during commit.
# The category hint helps OpenViking's extraction classify correctly.
@@ -607,7 +606,7 @@ class OpenVikingMemoryProvider(MemoryProvider):
def _tool_add_resource(self, args: dict) -> str:
url = args.get("url", "")
if not url:
return tool_error("url is required")
return json.dumps({"error": "url is required"})
payload: Dict[str, Any] = {"path": url}
if args.get("reason"):
+5 -6
View File
@@ -20,6 +20,7 @@ Config (env vars or hermes config.yaml under retaindb:):
from __future__ import annotations
import hashlib
import json
import logging
import os
@@ -34,7 +35,6 @@ from typing import Any, Dict, List
from urllib.parse import quote
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -189,7 +189,7 @@ class _Client:
"Content-Type": "application/json",
"x-sdk-runtime": "hermes-plugin",
}
if path.startswith(("/v1/memory", "/v1/context")):
if path.startswith("/v1/memory") or path.startswith("/v1/context"):
h["X-API-Key"] = token
return h
@@ -505,8 +505,7 @@ class RetainDBMemoryProvider(MemoryProvider):
self._user_id = kwargs.get("user_id", "default") or "default"
self._agent_id = kwargs.get("agent_id", "hermes") or "hermes"
from hermes_constants import get_hermes_home
hermes_home_path = get_hermes_home()
hermes_home_path = Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
db_path = hermes_home_path / "retaindb_queue.db"
self._queue = _WriteQueue(self._client, db_path)
@@ -650,11 +649,11 @@ class RetainDBMemoryProvider(MemoryProvider):
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
if not self._client:
return tool_error("RetainDB not initialized")
return json.dumps({"error": "RetainDB not initialized"})
try:
return json.dumps(self._dispatch(tool_name, args))
except Exception as exc:
return tool_error(str(exc))
return json.dumps({"error": str(exc)})
def _dispatch(self, tool_name: str, args: dict) -> Any:
c = self._client
+9 -10
View File
@@ -18,7 +18,6 @@ from pathlib import Path
from typing import Any, Dict, List, Optional
from agent.memory_provider import MemoryProvider
from tools.registry import tool_error
logger = logging.getLogger(__name__)
@@ -588,7 +587,7 @@ class SupermemoryMemoryProvider(MemoryProvider):
def _tool_store(self, args: dict) -> str:
content = str(args.get("content") or "").strip()
if not content:
return tool_error("content is required")
return json.dumps({"error": "content is required"})
metadata = args.get("metadata") or {}
if not isinstance(metadata, dict):
metadata = {}
@@ -599,12 +598,12 @@ class SupermemoryMemoryProvider(MemoryProvider):
preview = content[:80] + ("..." if len(content) > 80 else "")
return json.dumps({"saved": True, "id": result.get("id", ""), "preview": preview})
except Exception as exc:
return tool_error(f"Failed to store memory: {exc}")
return json.dumps({"error": f"Failed to store memory: {exc}"})
def _tool_search(self, args: dict) -> str:
query = str(args.get("query") or "").strip()
if not query:
return tool_error("query is required")
return json.dumps({"error": "query is required"})
try:
limit = max(1, min(20, int(args.get("limit", 5) or 5)))
except Exception:
@@ -622,20 +621,20 @@ class SupermemoryMemoryProvider(MemoryProvider):
formatted.append(entry)
return json.dumps({"results": formatted, "count": len(formatted)})
except Exception as exc:
return tool_error(f"Search failed: {exc}")
return json.dumps({"error": f"Search failed: {exc}"})
def _tool_forget(self, args: dict) -> str:
memory_id = str(args.get("id") or "").strip()
query = str(args.get("query") or "").strip()
if not memory_id and not query:
return tool_error("Provide either id or query")
return json.dumps({"error": "Provide either id or query"})
try:
if memory_id:
self._client.forget_memory(memory_id)
return json.dumps({"forgotten": True, "id": memory_id})
return json.dumps(self._client.forget_by_query(query))
except Exception as exc:
return tool_error(f"Forget failed: {exc}")
return json.dumps({"error": f"Forget failed: {exc}"})
def _tool_profile(self, args: dict) -> str:
query = str(args.get("query") or "").strip() or None
@@ -652,11 +651,11 @@ class SupermemoryMemoryProvider(MemoryProvider):
"dynamic_count": len(profile["dynamic"]),
})
except Exception as exc:
return tool_error(f"Profile failed: {exc}")
return json.dumps({"error": f"Profile failed: {exc}"})
def handle_tool_call(self, tool_name: str, args: Dict[str, Any], **kwargs) -> str:
if not self._active or not self._client:
return tool_error("Supermemory is not configured")
return json.dumps({"error": "Supermemory is not configured"})
if tool_name == "supermemory_store":
return self._tool_store(args)
if tool_name == "supermemory_search":
@@ -665,7 +664,7 @@ class SupermemoryMemoryProvider(MemoryProvider):
return self._tool_forget(args)
if tool_name == "supermemory_profile":
return self._tool_profile(args)
return tool_error(f"Unknown tool: {tool_name}")
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def register(ctx):
+102 -69
View File
@@ -20,6 +20,7 @@ Usage:
response = agent.run_conversation("Tell me about the latest Python updates")
"""
import atexit
import asyncio
import base64
import concurrent.futures
@@ -35,6 +36,7 @@ import sys
import tempfile
import time
import threading
import weakref
from types import SimpleNamespace
import uuid
from typing import List, Dict, Any, Optional
@@ -526,7 +528,6 @@ class AIAgent:
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
platform: str = None,
user_id: str = None,
skip_context_files: bool = False,
skip_memory: bool = False,
session_db=None,
@@ -591,7 +592,6 @@ class AIAgent:
self.quiet_mode = quiet_mode
self.ephemeral_system_prompt = ephemeral_system_prompt
self.platform = platform # "cli", "telegram", "discord", "whatsapp", etc.
self._user_id = user_id # Platform user identifier (gateway sessions)
# Pluggable print function — CLI replaces this with _cprint so that
# raw ANSI status lines are routed through prompt_toolkit's renderer
# instead of going directly to stdout where patch_stdout's StdoutProxy
@@ -654,7 +654,7 @@ class AIAgent:
self.stream_delta_callback = stream_delta_callback
self.status_callback = status_callback
self.tool_gen_callback = tool_gen_callback
self._last_reported_tool = None # Track for "new tool" mode
# Tool execution state — allows _vprint during tool execution
# even when stream consumers are registered (no tokens streaming then)
@@ -1094,9 +1094,6 @@ class AIAgent:
"hermes_home": str(_ghh()),
"agent_context": "primary",
}
# Thread gateway user identity for per-user memory scoping
if self._user_id:
_init_kwargs["user_id"] = self._user_id
# Profile identity for per-profile provider scoping
try:
from hermes_cli.profiles import get_active_profile_name
@@ -1505,6 +1502,10 @@ class AIAgent:
"""Return True when the base URL targets OpenRouter."""
return "openrouter" in self._base_url_lower
def _is_anthropic_url(self) -> bool:
"""Return True when the base URL targets Anthropic (native or /anthropic proxy path)."""
return "api.anthropic.com" in self._base_url_lower or self._base_url_lower.rstrip("/").endswith("/anthropic")
def _max_tokens_param(self, value: int) -> dict:
"""Return the correct max tokens kwarg for the current provider.
@@ -1690,6 +1691,74 @@ class AIAgent:
return None
def _classify_empty_content_response(
self,
assistant_message,
*,
finish_reason: Optional[str],
approx_tokens: int,
api_messages: List[Dict[str, Any]],
conversation_history: Optional[List[Dict[str, Any]]],
) -> Dict[str, Any]:
"""Classify think-only/empty responses so we can retry, compress, or salvage.
We intentionally do NOT short-circuit all structured-reasoning responses.
Prior discussion/PR history shows some models recover on retry. Instead we:
- compress immediately when the pattern looks like implicit context pressure
- salvage reasoning early when the same reasoning-only payload repeats
- otherwise preserve the normal retry path
"""
reasoning_text = self._extract_reasoning(assistant_message)
has_structured_reasoning = bool(
getattr(assistant_message, "reasoning", None)
or getattr(assistant_message, "reasoning_content", None)
or getattr(assistant_message, "reasoning_details", None)
)
content = getattr(assistant_message, "content", None) or ""
stripped_content = self._strip_think_blocks(content).strip()
signature = (
content,
reasoning_text or "",
bool(has_structured_reasoning),
finish_reason or "",
)
repeated_signature = signature == getattr(self, "_last_empty_content_signature", None)
compressor = getattr(self, "context_compressor", None)
ctx_len = getattr(compressor, "context_length", 0) or 0
threshold_tokens = getattr(compressor, "threshold_tokens", 0) or 0
is_large_session = bool(
(ctx_len and approx_tokens >= max(int(ctx_len * 0.4), threshold_tokens))
or len(api_messages) > 80
)
is_local_custom = is_local_endpoint(getattr(self, "base_url", "") or "")
is_resumed = bool(conversation_history)
context_pressure_signals = any(
[
finish_reason == "length",
getattr(compressor, "_context_probed", False),
is_large_session,
is_resumed,
]
)
should_compress = bool(
self.compression_enabled
and is_local_custom
and context_pressure_signals
and not stripped_content
)
self._last_empty_content_signature = signature
return {
"reasoning_text": reasoning_text,
"has_structured_reasoning": has_structured_reasoning,
"repeated_signature": repeated_signature,
"should_compress": should_compress,
"is_local_custom": is_local_custom,
"is_large_session": is_large_session,
"is_resumed": is_resumed,
}
def _cleanup_task_resources(self, task_id: str) -> None:
"""Clean up VM and browser resources for a given task."""
try:
@@ -2633,7 +2702,20 @@ class AIAgent:
if not _soul_loaded:
# Fallback to hardcoded identity
prompt_parts = [DEFAULT_AGENT_IDENTITY]
_ai_peer_name = (
None
if False
else None
)
if _ai_peer_name:
_identity = DEFAULT_AGENT_IDENTITY.replace(
"You are Hermes Agent",
f"You are {_ai_peer_name}",
1,
)
else:
_identity = DEFAULT_AGENT_IDENTITY
prompt_parts = [_identity]
# Tool-aware behavioral guidance: only inject when the tools are loaded
tool_guidance = []
@@ -3318,7 +3400,7 @@ class AIAgent:
elif "stream" in api_kwargs:
raise ValueError("Codex Responses stream flag is only allowed in fallback streaming requests.")
unexpected = sorted(key for key in api_kwargs if key not in allowed_keys)
unexpected = sorted(key for key in api_kwargs.keys() if key not in allowed_keys)
if unexpected:
raise ValueError(
f"Codex Responses request has unsupported field(s): {', '.join(unexpected)}."
@@ -5741,7 +5823,6 @@ class AIAgent:
api_msg.pop("reasoning", None)
api_msg.pop("finish_reason", None)
api_msg.pop("_flush_sentinel", None)
api_msg.pop("_thinking_prefill", None)
if _needs_sanitize:
self._sanitize_tool_calls_for_strict_api(api_msg)
api_messages.append(api_msg)
@@ -5827,7 +5908,7 @@ class AIAgent:
args = json.loads(tc.function.arguments)
flush_target = args.get("target", "memory")
from tools.memory_tool import memory_tool as _memory_tool
_memory_tool(
result = _memory_tool(
action=args.get("action"),
target=flush_target,
content=args.get("content"),
@@ -6665,7 +6746,7 @@ class AIAgent:
api_messages = []
for msg in messages:
api_msg = msg.copy()
for internal_field in ("reasoning", "finish_reason", "_thinking_prefill"):
for internal_field in ("reasoning", "finish_reason"):
api_msg.pop(internal_field, None)
if _needs_sanitize:
self._sanitize_tool_calls_for_strict_api(api_msg)
@@ -6857,7 +6938,6 @@ class AIAgent:
self._empty_content_retries = 0
self._incomplete_scratchpad_retries = 0
self._codex_incomplete_retries = 0
self._thinking_prefill_retries = 0
self._last_content_with_tools = None
self._mute_post_response = False
self._surrogate_sanitized = False
@@ -7203,8 +7283,6 @@ class AIAgent:
# Remove finish_reason - not accepted by strict APIs (e.g. Mistral)
if "finish_reason" in api_msg:
api_msg.pop("finish_reason")
# Strip internal thinking-prefill marker
api_msg.pop("_thinking_prefill", None)
# Strip Codex Responses API fields (call_id, response_item_id) for
# strict providers like Mistral, Fireworks, etc. that reject unknown fields.
# Uses new dicts so the internal messages list retains the fields
@@ -7390,7 +7468,7 @@ class AIAgent:
elif not isinstance(output_items, list):
response_invalid = True
error_details.append("response.output is not a list")
elif not output_items:
elif len(output_items) == 0:
# If we reach here, _run_codex_stream's backfill
# from output_item.done events and text-delta
# synthesis both failed to populate output.
@@ -7413,11 +7491,11 @@ class AIAgent:
elif not isinstance(content_blocks, list):
response_invalid = True
error_details.append("response.content is not a list")
elif not content_blocks:
elif len(content_blocks) == 0:
response_invalid = True
error_details.append("response.content is empty")
else:
if response is None or not hasattr(response, 'choices') or response.choices is None or not response.choices:
if response is None or not hasattr(response, 'choices') or response.choices is None or len(response.choices) == 0:
response_invalid = True
if response is None:
error_details.append("response is None")
@@ -8739,15 +8817,6 @@ class AIAgent:
if clean:
self._vprint(f" ┊ 💬 {clean}")
# Pop thinking-only prefill message(s) before appending
# (tool-call path — same rationale as the final-response path).
while (
messages
and isinstance(messages[-1], dict)
and messages[-1].get("_thinking_prefill")
):
messages.pop()
messages.append(assistant_msg)
# Close any open streaming display (response box, reasoning
@@ -8861,36 +8930,11 @@ class AIAgent:
self._response_was_previewed = True
break
# ── Thinking-only prefill continuation ──────────
# The model produced structured reasoning (via API
# fields) but no visible text content. Rather than
# giving up, append the assistant message as-is and
# continue — the model will see its own reasoning
# on the next turn and produce the text portion.
# Inspired by clawdbot's "incomplete-text" recovery.
_has_structured = bool(
getattr(assistant_message, "reasoning", None)
or getattr(assistant_message, "reasoning_content", None)
or getattr(assistant_message, "reasoning_details", None)
)
if _has_structured and self._thinking_prefill_retries < 2:
self._thinking_prefill_retries += 1
self._vprint(
f"{self.log_prefix}↻ Thinking-only response — "
f"prefilling to continue "
f"({self._thinking_prefill_retries}/2)"
)
interim_msg = self._build_assistant_message(
assistant_message, "incomplete"
)
interim_msg["_thinking_prefill"] = True
messages.append(interim_msg)
self._session_messages = messages
self._save_session_log(messages)
continue
# Exhausted prefill attempts or no structured
# reasoning — fall through to "(empty)" terminal.
# Reasoning-only response: the model produced thinking
# but no visible content. This is a valid response —
# keep reasoning in its own field and set content to
# "(empty)" so every provider accepts the message.
# No retries needed.
reasoning_text = self._extract_reasoning(assistant_message)
assistant_msg = self._build_assistant_message(assistant_message, finish_reason)
assistant_msg["content"] = "(empty)"
@@ -8909,7 +8953,6 @@ class AIAgent:
if hasattr(self, '_empty_content_retries'):
self._empty_content_retries = 0
self._last_empty_content_signature = None
self._thinking_prefill_retries = 0
if (
self.api_mode == "codex_responses"
@@ -8948,18 +8991,7 @@ class AIAgent:
final_response = self._strip_think_blocks(final_response).strip()
final_msg = self._build_assistant_message(assistant_message, finish_reason)
# Pop thinking-only prefill message(s) before appending
# the final response. This avoids consecutive assistant
# messages which break strict-alternation providers
# (Anthropic Messages API) and keeps history clean.
while (
messages
and isinstance(messages[-1], dict)
and messages[-1].get("_thinking_prefill")
):
messages.pop()
messages.append(final_msg)
if not self.quiet_mode:
@@ -9001,6 +9033,7 @@ class AIAgent:
"content": f"Error executing tool: {error_msg}",
}
messages.append(err_msg)
pending_handled = True
break
# Non-tool errors don't need a synthetic message injected.
+2
View File
@@ -21,6 +21,8 @@ Usage:
"""
import argparse
import json
import os
import re
import shutil
import subprocess
+2
View File
@@ -17,6 +17,7 @@ Usage:
import json
import random
import os
from pathlib import Path
from typing import List, Dict, Any, Tuple
import fire
@@ -137,6 +138,7 @@ def sample_from_datasets(
List of sampled trajectory entries
"""
from multiprocessing import Pool
from functools import partial
random.seed(seed)
+1 -1
View File
@@ -24,7 +24,7 @@ This is educational cinema. Every frame teaches. Every animation reveals structu
## Prerequisites
Run `scripts/setup.sh` to verify all dependencies. Requires: Python 3.10+, Manim Community Edition v0.20+ (`pip install manim`), LaTeX (`texlive-full` on Linux, `mactex` on macOS), and ffmpeg. Reference docs tested against Manim CE v0.20.1.
Run `scripts/setup.sh` to verify all dependencies. Requires: Python 3.10+, Manim Community Edition (`pip install manim`), LaTeX (`texlive-full` on Linux, `mactex` on macOS), and ffmpeg.
## Modes
@@ -50,31 +50,6 @@ self.play(circle.animate.set_color(RED))
self.play(circle.animate.shift(RIGHT * 2).scale(0.5)) # chain multiple
```
## Additional Creation Animations
```python
self.play(GrowFromPoint(circle, LEFT * 3)) # scale 0 -> 1 from a specific point
self.play(GrowFromEdge(rect, DOWN)) # grow from one edge
self.play(SpinInFromNothing(square)) # scale up while rotating (default PI/2)
self.play(GrowArrow(arrow)) # grows arrow from start to tip
```
## Movement Animations
```python
# Move a mobject along an arbitrary path
path = Arc(radius=2, angle=PI)
self.play(MoveAlongPath(dot, path), run_time=2)
# Rotate (as a Transform, not .animate — supports about_point)
self.play(Rotate(square, angle=PI / 2, about_point=ORIGIN), run_time=1.5)
# Rotating (continuous rotation, updater-style — good for spinning objects)
self.play(Rotating(gear, angle=TAU, run_time=4, rate_func=linear))
```
`MoveAlongPath` takes any `VMobject` as the path — use `Arc`, `CubicBezier`, `Line`, or a custom `VMobject`. Position is computed via `path.point_from_proportion()`.
## Emphasis Animations
```python
@@ -65,57 +65,6 @@ MathTex(r"\vec{v}") # vector
MathTex(r"\lim_{x \to \infty} f(x)") # limit
```
## Matrices
`MathTex` supports standard LaTeX matrix environments via `amsmath` (loaded by default):
```python
# Bracketed matrix
MathTex(r"\begin{bmatrix} 1 & 0 \\ 0 & 1 \end{bmatrix}")
# Parenthesized matrix
MathTex(r"\begin{pmatrix} a & b \\ c & d \end{pmatrix}")
# Determinant (vertical bars)
MathTex(r"\begin{vmatrix} a & b \\ c & d \end{vmatrix}")
# Plain (no delimiters)
MathTex(r"\begin{matrix} x_1 \\ x_2 \\ x_3 \end{matrix}")
```
For matrices you need to animate element-by-element or color individual entries, use the `IntegerMatrix`, `DecimalMatrix`, or `MobjectMatrix` mobjects instead — see `mobjects.md`.
## Cases and Piecewise Functions
```python
MathTex(r"""
f(x) = \begin{cases}
x^2 & \text{if } x \geq 0 \\
-x^2 & \text{if } x < 0
\end{cases}
""")
```
## Aligned Environments
For multi-line derivations with alignment, use `aligned` inside `MathTex`:
```python
MathTex(r"""
\begin{aligned}
\nabla \cdot \mathbf{E} &= \frac{\rho}{\epsilon_0} \\
\nabla \cdot \mathbf{B} &= 0 \\
\nabla \times \mathbf{E} &= -\frac{\partial \mathbf{B}}{\partial t} \\
\nabla \times \mathbf{B} &= \mu_0 \mathbf{J} + \mu_0 \epsilon_0 \frac{\partial \mathbf{E}}{\partial t}
\end{aligned}
""")
```
Note: `MathTex` wraps content in `align*` by default. Override with `tex_environment` if needed:
```python
MathTex(r"...", tex_environment="gather*")
```
## Derivation Pattern
```python
@@ -35,52 +35,6 @@ rrect = RoundedRectangle(corner_radius=0.3, width=4, height=2)
brace = Brace(rect, DOWN, color=YELLOW)
```
## Polygons and Arcs
```python
# Arbitrary polygon from vertices
poly = Polygon(LEFT, UP * 2, RIGHT, color=GREEN, fill_opacity=0.3)
# Regular n-sided polygon
hexagon = RegularPolygon(n=6, color=TEAL, fill_opacity=0.4)
# Triangle (shorthand for RegularPolygon(n=3))
tri = Triangle(color=YELLOW, fill_opacity=0.5)
# Arc (portion of a circle)
arc = Arc(radius=2, start_angle=0, angle=PI / 2, color=BLUE)
# Arc between two points
arc_between = ArcBetweenPoints(LEFT * 2, RIGHT * 2, angle=TAU / 4, color=RED)
# Curved arrow (arc with tip)
curved_arrow = CurvedArrow(LEFT * 2, RIGHT * 2, color=ORANGE)
```
## Sectors and Annuli
```python
# Sector (pie slice)
sector = Sector(outer_radius=2, start_angle=0, angle=PI / 3, fill_opacity=0.7, color=BLUE)
# Annulus (ring)
ring = Annulus(inner_radius=1, outer_radius=2, fill_opacity=0.5, color=GREEN)
# Annular sector (partial ring)
partial_ring = AnnularSector(
inner_radius=1, outer_radius=2,
angle=PI / 2, start_angle=0,
fill_opacity=0.7, color=TEAL
)
# Cutout (punch holes in a shape)
background = Square(side_length=4, fill_opacity=1, color=BLUE)
hole = Circle(radius=0.5)
cutout = Cutout(background, hole, fill_opacity=1, color=BLUE)
```
Use cases: pie charts, ring progress indicators, Venn diagrams with arcs, geometric proofs.
## Positioning
```python
@@ -145,29 +99,6 @@ class NetworkNode(Group):
self.add(self.circle, self.label)
```
## Matrix Mobjects
Display matrices as grids of numbers or mobjects:
```python
# Integer matrix
m = IntegerMatrix([[1, 2], [3, 4]])
# Decimal matrix (control decimal places)
m = DecimalMatrix([[1.5, 2.7], [3.1, 4.9]], element_to_mobject_config={"num_decimal_places": 2})
# Mobject matrix (any mobject in each cell)
m = MobjectMatrix([
[MathTex(r"\pi"), MathTex(r"e")],
[MathTex(r"\phi"), MathTex(r"\tau")]
])
# Bracket types: "(" "[" "|" or "\\{"
m = IntegerMatrix([[1, 0], [0, 1]], left_bracket="[", right_bracket="]")
```
Use cases: linear algebra, transformation matrices, system-of-equations coefficient display.
## Constants
Directions: `UP, DOWN, LEFT, RIGHT, ORIGIN, UL, UR, DL, DR`
@@ -12,7 +12,7 @@ Adapt this for your specific task by modifying:
import torch
import re
from datasets import load_dataset
from datasets import load_dataset, Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig
from trl import GRPOTrainer, GRPOConfig
@@ -16,10 +16,13 @@ Usage in execute_code:
"""
import os
import sys
import json
import time
import re
import yaml
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
try:
from openai import OpenAI
@@ -20,6 +20,7 @@ Usage in execute_code:
import os
import re
import json
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -403,6 +404,7 @@ def race_godmode_classic(query, api_key=None, timeout=60):
Each combo uses a different model paired with its best-performing jailbreak prompt.
Returns the best result across all combos.
"""
from collections import namedtuple
HALL_OF_FAME = [
{
@@ -17,6 +17,7 @@ Usage:
import re
import base64
import sys
# ═══════════════════════════════════════════════════════════════════
# Trigger words that commonly trip safety classifiers
-289
View File
@@ -1,289 +0,0 @@
"""Tests for per-user memory scoping via user_id threading.
Verifies that gateway user_id flows from AIAgent -> MemoryManager -> plugins,
so each gateway user gets their own memory bucket instead of sharing a static one.
"""
import json
import os
import pytest
from unittest.mock import MagicMock, patch
from agent.memory_provider import MemoryProvider
from agent.memory_manager import MemoryManager
# ---------------------------------------------------------------------------
# Concrete test provider that records init kwargs
# ---------------------------------------------------------------------------
class RecordingProvider(MemoryProvider):
"""Minimal provider that records what initialize() receives."""
def __init__(self, name="recording"):
self._name = name
self._init_kwargs = {}
self._init_session_id = None
@property
def name(self) -> str:
return self._name
def is_available(self) -> bool:
return True
def initialize(self, session_id: str, **kwargs) -> None:
self._init_session_id = session_id
self._init_kwargs = dict(kwargs)
def system_prompt_block(self) -> str:
return ""
def prefetch(self, query: str, *, session_id: str = "") -> str:
return ""
def sync_turn(self, user_content, assistant_content, *, session_id=""):
pass
def get_tool_schemas(self):
return []
def handle_tool_call(self, tool_name, args, **kwargs):
return json.dumps({})
def shutdown(self):
pass
# ---------------------------------------------------------------------------
# MemoryManager user_id threading tests
# ---------------------------------------------------------------------------
class TestMemoryManagerUserIdThreading:
"""Verify user_id reaches providers via initialize_all."""
def test_user_id_forwarded_to_provider(self):
mgr = MemoryManager()
p = RecordingProvider()
mgr.add_provider(p)
mgr.initialize_all(
session_id="sess-123",
platform="telegram",
user_id="tg_user_42",
)
assert p._init_kwargs.get("user_id") == "tg_user_42"
assert p._init_kwargs.get("platform") == "telegram"
assert p._init_session_id == "sess-123"
def test_no_user_id_when_cli(self):
"""CLI sessions should not have user_id in kwargs."""
mgr = MemoryManager()
p = RecordingProvider()
mgr.add_provider(p)
mgr.initialize_all(
session_id="sess-456",
platform="cli",
)
assert "user_id" not in p._init_kwargs
assert p._init_kwargs.get("platform") == "cli"
def test_user_id_none_not_forwarded(self):
"""Explicit None user_id should not appear in kwargs."""
mgr = MemoryManager()
p = RecordingProvider()
mgr.add_provider(p)
# Simulates what happens when AIAgent passes user_id=None
# (the agent code only adds user_id to kwargs when it's truthy)
mgr.initialize_all(
session_id="sess-789",
platform="discord",
)
assert "user_id" not in p._init_kwargs
def test_multiple_providers_all_receive_user_id(self):
from agent.builtin_memory_provider import BuiltinMemoryProvider
mgr = MemoryManager()
# Use builtin + one external (MemoryManager only allows one external)
builtin = BuiltinMemoryProvider()
ext = RecordingProvider("external")
mgr.add_provider(builtin)
mgr.add_provider(ext)
mgr.initialize_all(
session_id="sess-multi",
platform="slack",
user_id="slack_U12345",
)
assert ext._init_kwargs.get("user_id") == "slack_U12345"
assert ext._init_kwargs.get("platform") == "slack"
# ---------------------------------------------------------------------------
# Mem0 provider user_id tests
# ---------------------------------------------------------------------------
class TestMem0UserIdScoping:
"""Verify Mem0 plugin uses gateway user_id when provided."""
def test_gateway_user_id_overrides_default(self):
"""When user_id is passed via kwargs, it should override the config default."""
from plugins.memory.mem0 import Mem0MemoryProvider
provider = Mem0MemoryProvider()
# Mock _load_config to return a config with default user_id
with patch("plugins.memory.mem0._load_config", return_value={
"api_key": "test-key",
"user_id": "hermes-user",
"agent_id": "hermes",
"rerank": True,
}):
provider.initialize(session_id="test-sess", user_id="tg_user_99")
assert provider._user_id == "tg_user_99"
def test_no_user_id_falls_back_to_config(self):
"""Without user_id in kwargs, should use config default."""
from plugins.memory.mem0 import Mem0MemoryProvider
provider = Mem0MemoryProvider()
with patch("plugins.memory.mem0._load_config", return_value={
"api_key": "test-key",
"user_id": "custom-default",
"agent_id": "hermes",
"rerank": True,
}):
provider.initialize(session_id="test-sess")
assert provider._user_id == "custom-default"
def test_no_user_id_no_config_uses_hermes_user(self):
"""Without user_id or config override, should default to 'hermes-user'."""
from plugins.memory.mem0 import Mem0MemoryProvider
provider = Mem0MemoryProvider()
with patch("plugins.memory.mem0._load_config", return_value={
"api_key": "test-key",
"agent_id": "hermes",
"rerank": True,
}):
provider.initialize(session_id="test-sess")
assert provider._user_id == "hermes-user"
def test_different_users_get_different_ids(self):
"""Two providers initialized with different user_ids should be scoped differently."""
from plugins.memory.mem0 import Mem0MemoryProvider
p1 = Mem0MemoryProvider()
p2 = Mem0MemoryProvider()
with patch("plugins.memory.mem0._load_config", return_value={
"api_key": "test-key",
"user_id": "hermes-user",
"agent_id": "hermes",
"rerank": True,
}):
p1.initialize(session_id="sess-1", user_id="alice_123")
p2.initialize(session_id="sess-2", user_id="bob_456")
assert p1._user_id == "alice_123"
assert p2._user_id == "bob_456"
assert p1._user_id != p2._user_id
# ---------------------------------------------------------------------------
# Honcho provider user_id tests
# ---------------------------------------------------------------------------
class TestHonchoUserIdScoping:
"""Verify Honcho plugin uses gateway user_id for peer_name when provided."""
def test_gateway_user_id_overrides_peer_name(self):
"""When user_id is in kwargs, cfg.peer_name should be overridden."""
from plugins.memory.honcho import HonchoMemoryProvider
provider = HonchoMemoryProvider()
# Create a mock config with a static peer_name
mock_cfg = MagicMock()
mock_cfg.enabled = True
mock_cfg.api_key = "test-key"
mock_cfg.base_url = None
mock_cfg.peer_name = "static-user"
mock_cfg.recall_mode = "tools" # Use tools mode to defer session init
with patch(
"plugins.memory.honcho.client.HonchoClientConfig.from_global_config",
return_value=mock_cfg,
):
provider.initialize(
session_id="test-sess",
user_id="discord_user_789",
platform="discord",
)
# The config's peer_name should have been overridden with the user_id
assert mock_cfg.peer_name == "discord_user_789"
def test_no_user_id_preserves_config_peer_name(self):
"""Without user_id, the config peer_name should be preserved."""
from plugins.memory.honcho import HonchoMemoryProvider
provider = HonchoMemoryProvider()
mock_cfg = MagicMock()
mock_cfg.enabled = True
mock_cfg.api_key = "test-key"
mock_cfg.base_url = None
mock_cfg.peer_name = "my-custom-peer"
mock_cfg.recall_mode = "tools"
with patch(
"plugins.memory.honcho.client.HonchoClientConfig.from_global_config",
return_value=mock_cfg,
):
provider.initialize(
session_id="test-sess",
platform="cli",
)
# peer_name should not have been overridden
assert mock_cfg.peer_name == "my-custom-peer"
# ---------------------------------------------------------------------------
# AIAgent user_id propagation test
# ---------------------------------------------------------------------------
class TestAIAgentUserIdPropagation:
"""Verify AIAgent stores user_id and passes it to memory init kwargs."""
def test_user_id_stored_on_agent(self):
"""AIAgent should store user_id as instance attribute."""
with patch.dict(os.environ, {"HERMES_HOME": "/tmp/test_hermes"}):
from run_agent import AIAgent
agent = object.__new__(AIAgent)
# Manually set the attribute as __init__ does
agent._user_id = "test_user_42"
assert agent._user_id == "test_user_42"
def test_user_id_none_by_default(self):
"""AIAgent should have None user_id when not provided (CLI mode)."""
with patch.dict(os.environ, {"HERMES_HOME": "/tmp/test_hermes"}):
from run_agent import AIAgent
agent = object.__new__(AIAgent)
agent._user_id = None
assert agent._user_id is None
View File
+1 -237
View File
@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, _send_media_via_adapter, run_job, SILENT_MARKER, _build_job_prompt
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
class TestResolveOrigin:
@@ -277,188 +277,6 @@ class TestDeliverResultWrapping:
# Media files should be forwarded separately
assert kwargs["media_files"] == [("/tmp/test-voice.ogg", False)]
def test_live_adapter_sends_media_as_attachments(self):
"""When a live adapter is available, MEDIA files should be sent as native
platform attachments (e.g., Discord voice, Telegram audio) rather than
as literal 'MEDIA:/path' text."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
adapter.send_voice.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.DISCORD: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
# run_coroutine_threadsafe returns concurrent.futures.Future (has timeout kwarg)
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "tts-job",
"deliver": "origin",
"origin": {"platform": "discord", "chat_id": "9876"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Here is TTS\nMEDIA:/tmp/cron-voice.mp3",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
# Text should be sent without the MEDIA tag
adapter.send.assert_called_once()
text_sent = adapter.send.call_args[0][1]
assert "MEDIA:" not in text_sent
assert "Here is TTS" in text_sent
# Audio file should be sent as a voice attachment
adapter.send_voice.assert_called_once()
voice_call = adapter.send_voice.call_args
assert voice_call[1]["audio_path"] == "/tmp/cron-voice.mp3"
def test_live_adapter_routes_image_to_send_image_file(self):
"""Image MEDIA files should be routed to send_image_file, not send_voice."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
adapter.send_image_file.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.DISCORD: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "img-job",
"deliver": "origin",
"origin": {"platform": "discord", "chat_id": "1234"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Chart attached\nMEDIA:/tmp/chart.png",
adapters={Platform.DISCORD: adapter},
loop=loop,
)
adapter.send_image_file.assert_called_once()
assert adapter.send_image_file.call_args[1]["image_path"] == "/tmp/chart.png"
adapter.send_voice.assert_not_called()
def test_live_adapter_media_only_no_text(self):
"""When content is ONLY a MEDIA tag with no text, media should still be sent."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send_voice.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "voice-only",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "999"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"MEDIA:/tmp/voice.ogg",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
# Text send should NOT be called (no text after stripping MEDIA tag)
adapter.send.assert_not_called()
# Audio should still be delivered
adapter.send_voice.assert_called_once()
def test_live_adapter_sends_cleaned_text_not_raw(self):
"""The live adapter path must send cleaned text (MEDIA tags stripped),
not the raw delivery_content with embedded MEDIA: tags."""
from gateway.config import Platform
from concurrent.futures import Future
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
loop = MagicMock()
loop.is_running.return_value = True
def fake_run_coro(coro, _loop):
future = Future()
future.set_result(MagicMock(success=True))
coro.close()
return future
job = {
"id": "img-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "555"},
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
_deliver_result(
job,
"Report\nMEDIA:/tmp/chart.png",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)
text_sent = adapter.send.call_args[0][1]
assert "MEDIA:" not in text_sent
assert "Report" in text_sent
def test_no_mirror_to_session_call(self):
"""Cron deliveries should NOT mirror into the gateway session."""
from gateway.config import Platform
@@ -1044,57 +862,3 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestSendMediaViaAdapter:
"""Unit tests for _send_media_via_adapter — routes files to typed adapter methods."""
@staticmethod
def _run_with_loop(adapter, chat_id, media_files, metadata, job):
"""Helper: run _send_media_via_adapter with a real running event loop."""
import asyncio
import threading
loop = asyncio.new_event_loop()
t = threading.Thread(target=loop.run_forever, daemon=True)
t.start()
try:
_send_media_via_adapter(adapter, chat_id, media_files, metadata, loop, job)
finally:
loop.call_soon_threadsafe(loop.stop)
t.join(timeout=5)
loop.close()
def test_video_dispatched_to_send_video(self):
adapter = MagicMock()
adapter.send_video = AsyncMock()
media_files = [("/tmp/clip.mp4", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j1"})
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/clip.mp4"
def test_unknown_ext_dispatched_to_send_document(self):
adapter = MagicMock()
adapter.send_document = AsyncMock()
media_files = [("/tmp/report.pdf", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j2"})
adapter.send_document.assert_called_once()
assert adapter.send_document.call_args[1]["file_path"] == "/tmp/report.pdf"
def test_multiple_media_files_all_delivered(self):
adapter = MagicMock()
adapter.send_voice = AsyncMock()
adapter.send_image_file = AsyncMock()
media_files = [("/tmp/voice.mp3", False), ("/tmp/photo.jpg", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j3"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()
def test_single_failure_does_not_block_others(self):
adapter = MagicMock()
adapter.send_voice = AsyncMock(side_effect=RuntimeError("network error"))
adapter.send_image_file = AsyncMock()
media_files = [("/tmp/voice.ogg", False), ("/tmp/photo.png", False)]
self._run_with_loop(adapter, "123", media_files, None, {"id": "j4"})
adapter.send_voice.assert_called_once()
adapter.send_image_file.assert_called_once()
+28 -86
View File
@@ -2,54 +2,12 @@
import asyncio
import json
import re
import sys
import types
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from gateway.config import Platform, PlatformConfig
def _make_fake_nio():
"""Create a lightweight fake ``nio`` module with real response classes.
Tests that call production methods doing ``import nio`` / ``isinstance(resp, nio.XxxResponse)``
need real classes (not MagicMock auto-attributes) to satisfy isinstance checks.
Use via ``patch.dict("sys.modules", {"nio": _make_fake_nio()})``.
"""
mod = types.ModuleType("nio")
class RoomSendResponse:
def __init__(self, event_id="$fake"):
self.event_id = event_id
class RoomRedactResponse:
pass
class RoomCreateResponse:
def __init__(self, room_id="!fake:example.org"):
self.room_id = room_id
class RoomInviteResponse:
pass
class UploadResponse:
def __init__(self, content_uri="mxc://example.org/fake"):
self.content_uri = content_uri
# Minimal Api stub for code that checks nio.Api.RoomPreset
class _Api:
pass
mod.Api = _Api
mod.RoomSendResponse = RoomSendResponse
mod.RoomRedactResponse = RoomRedactResponse
mod.RoomCreateResponse = RoomCreateResponse
mod.RoomInviteResponse = RoomInviteResponse
mod.UploadResponse = UploadResponse
return mod
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
@@ -1492,10 +1450,7 @@ class TestMatrixEncryptedMedia:
@pytest.mark.asyncio
async def test_on_room_message_media_decrypts_encrypted_image_and_passes_local_path(self):
try:
from nio.crypto.attachments import encrypt_attachment
except (ImportError, ModuleNotFoundError):
pytest.skip("matrix-nio[e2e] required for encryption tests")
from nio.crypto.attachments import encrypt_attachment
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
@@ -1563,10 +1518,7 @@ class TestMatrixEncryptedMedia:
@pytest.mark.asyncio
async def test_on_room_message_media_decrypts_encrypted_voice_and_caches_audio(self):
try:
from nio.crypto.attachments import encrypt_attachment
except (ImportError, ModuleNotFoundError):
pytest.skip("matrix-nio[e2e] required for encryption tests")
from nio.crypto.attachments import encrypt_attachment
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
@@ -1635,10 +1587,7 @@ class TestMatrixEncryptedMedia:
@pytest.mark.asyncio
async def test_on_room_message_media_decrypts_encrypted_file_and_caches_document(self):
try:
from nio.crypto.attachments import encrypt_attachment
except (ImportError, ModuleNotFoundError):
pytest.skip("matrix-nio[e2e] required for encryption tests")
from nio.crypto.attachments import encrypt_attachment
adapter = _make_adapter()
adapter._user_id = "@bot:example.org"
@@ -1934,15 +1883,14 @@ class TestMatrixReactions:
@pytest.mark.asyncio
async def test_send_reaction(self):
"""_send_reaction should call room_send with m.reaction."""
fake_nio = _make_fake_nio()
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_client.room_send = AsyncMock(
return_value=fake_nio.RoomSendResponse("$reaction1")
return_value=MagicMock(spec=nio.RoomSendResponse)
)
self.adapter._client = mock_client
with patch.dict("sys.modules", {"nio": fake_nio}):
result = await self.adapter._send_reaction("!room:ex", "$event1", "👍")
result = await self.adapter._send_reaction("!room:ex", "$event1", "👍")
assert result is True
mock_client.room_send.assert_called_once()
args = mock_client.room_send.call_args
@@ -1954,8 +1902,7 @@ class TestMatrixReactions:
@pytest.mark.asyncio
async def test_send_reaction_no_client(self):
self.adapter._client = None
with patch.dict("sys.modules", {"nio": _make_fake_nio()}):
result = await self.adapter._send_reaction("!room:ex", "$ev", "👍")
result = await self.adapter._send_reaction("!room:ex", "$ev", "👍")
assert result is False
@pytest.mark.asyncio
@@ -2052,23 +1999,21 @@ class TestMatrixRedaction:
@pytest.mark.asyncio
async def test_redact_message(self):
fake_nio = _make_fake_nio()
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_client.room_redact = AsyncMock(
return_value=fake_nio.RoomRedactResponse()
return_value=MagicMock(spec=nio.RoomRedactResponse)
)
self.adapter._client = mock_client
with patch.dict("sys.modules", {"nio": fake_nio}):
result = await self.adapter.redact_message("!room:ex", "$ev1", "oops")
result = await self.adapter.redact_message("!room:ex", "$ev1", "oops")
assert result is True
mock_client.room_redact.assert_called_once()
@pytest.mark.asyncio
async def test_redact_no_client(self):
self.adapter._client = None
with patch.dict("sys.modules", {"nio": _make_fake_nio()}):
result = await self.adapter.redact_message("!room:ex", "$ev1")
result = await self.adapter.redact_message("!room:ex", "$ev1")
assert result is False
@@ -2082,35 +2027,33 @@ class TestMatrixRoomManagement:
@pytest.mark.asyncio
async def test_create_room(self):
fake_nio = _make_fake_nio()
mock_resp = fake_nio.RoomCreateResponse(room_id="!new:example.org")
nio = pytest.importorskip("nio")
mock_resp = MagicMock(spec=nio.RoomCreateResponse)
mock_resp.room_id = "!new:example.org"
mock_client = MagicMock()
mock_client.room_create = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
with patch.dict("sys.modules", {"nio": fake_nio}):
room_id = await self.adapter.create_room(name="Test Room", topic="A test")
room_id = await self.adapter.create_room(name="Test Room", topic="A test")
assert room_id == "!new:example.org"
assert "!new:example.org" in self.adapter._joined_rooms
@pytest.mark.asyncio
async def test_invite_user(self):
fake_nio = _make_fake_nio()
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_client.room_invite = AsyncMock(
return_value=fake_nio.RoomInviteResponse()
return_value=MagicMock(spec=nio.RoomInviteResponse)
)
self.adapter._client = mock_client
with patch.dict("sys.modules", {"nio": fake_nio}):
result = await self.adapter.invite_user("!room:ex", "@user:ex")
result = await self.adapter.invite_user("!room:ex", "@user:ex")
assert result is True
@pytest.mark.asyncio
async def test_create_room_no_client(self):
self.adapter._client = None
with patch.dict("sys.modules", {"nio": _make_fake_nio()}):
result = await self.adapter.create_room()
result = await self.adapter.create_room()
assert result is None
@@ -2156,28 +2099,28 @@ class TestMatrixMessageTypes:
@pytest.mark.asyncio
async def test_send_emote(self):
fake_nio = _make_fake_nio()
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_resp = fake_nio.RoomSendResponse(event_id="$emote1")
mock_resp = MagicMock(spec=nio.RoomSendResponse)
mock_resp.event_id = "$emote1"
mock_client.room_send = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
with patch.dict("sys.modules", {"nio": fake_nio}):
result = await self.adapter.send_emote("!room:ex", "waves hello")
result = await self.adapter.send_emote("!room:ex", "waves hello")
assert result.success is True
call_args = mock_client.room_send.call_args[0]
assert call_args[2]["msgtype"] == "m.emote"
@pytest.mark.asyncio
async def test_send_notice(self):
fake_nio = _make_fake_nio()
nio = pytest.importorskip("nio")
mock_client = MagicMock()
mock_resp = fake_nio.RoomSendResponse(event_id="$notice1")
mock_resp = MagicMock(spec=nio.RoomSendResponse)
mock_resp.event_id = "$notice1"
mock_client.room_send = AsyncMock(return_value=mock_resp)
self.adapter._client = mock_client
with patch.dict("sys.modules", {"nio": fake_nio}):
result = await self.adapter.send_notice("!room:ex", "System message")
result = await self.adapter.send_notice("!room:ex", "System message")
assert result.success is True
call_args = mock_client.room_send.call_args[0]
assert call_args[2]["msgtype"] == "m.notice"
@@ -2185,6 +2128,5 @@ class TestMatrixMessageTypes:
@pytest.mark.asyncio
async def test_send_emote_empty_text(self):
self.adapter._client = MagicMock()
with patch.dict("sys.modules", {"nio": _make_fake_nio()}):
result = await self.adapter.send_emote("!room:ex", "")
result = await self.adapter.send_emote("!room:ex", "")
assert result.success is False
+2 -10
View File
@@ -1,18 +1,10 @@
"""Tests for Matrix voice message support (MSC3245)."""
import io
import types
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import AsyncMock, MagicMock
# Try importing real nio; skip entire file if not available.
# A MagicMock in sys.modules (from another test) is not the real package.
try:
import nio as _nio_probe
if not isinstance(_nio_probe, types.ModuleType) or not hasattr(_nio_probe, "__file__"):
pytest.skip("nio in sys.modules is a mock, not the real package", allow_module_level=True)
except ImportError:
pytest.skip("matrix-nio not installed", allow_module_level=True)
nio = pytest.importorskip("nio", reason="matrix-nio not installed")
from gateway.platforms.base import MessageType
-1
View File
@@ -59,7 +59,6 @@ def _make_runner():
runner._honcho_managers = {}
runner._honcho_configs = {}
runner._shutdown_all_gateway_honcho = lambda: None
runner.session_store = MagicMock()
return runner
-5
View File
@@ -36,16 +36,11 @@ def _make_runner():
)
runner.adapters = {Platform.TELEGRAM: _FakeAdapter()}
runner._running_agents = {}
runner._running_agents_ts = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._voice_mode = {}
runner._background_tasks = set()
runner._is_user_authorized = lambda _source: True
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
runner.session_store = MagicMock()
runner.delivery_router = MagicMock()
return runner
@@ -1,426 +0,0 @@
"""Tests for Slack Block Kit approval buttons and thread context fetching."""
import asyncio
import os
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Ensure the repo root is importable
# ---------------------------------------------------------------------------
_repo = str(Path(__file__).resolve().parents[2])
if _repo not in sys.path:
sys.path.insert(0, _repo)
# ---------------------------------------------------------------------------
# Minimal Slack SDK mock so SlackAdapter can be imported
# ---------------------------------------------------------------------------
def _ensure_slack_mock():
"""Wire up the minimal mocks required to import SlackAdapter."""
if "slack_bolt" in sys.modules:
return
slack_bolt = MagicMock()
slack_bolt.async_app.AsyncApp = MagicMock
sys.modules["slack_bolt"] = slack_bolt
sys.modules["slack_bolt.async_app"] = slack_bolt.async_app
handler_mod = MagicMock()
handler_mod.AsyncSocketModeHandler = MagicMock
sys.modules["slack_bolt.adapter"] = MagicMock()
sys.modules["slack_bolt.adapter.socket_mode"] = MagicMock()
sys.modules["slack_bolt.adapter.socket_mode.async_handler"] = handler_mod
sdk_mod = MagicMock()
sdk_mod.web = MagicMock()
sdk_mod.web.async_client = MagicMock()
sdk_mod.web.async_client.AsyncWebClient = MagicMock
sys.modules["slack_sdk"] = sdk_mod
sys.modules["slack_sdk.web"] = sdk_mod.web
sys.modules["slack_sdk.web.async_client"] = sdk_mod.web.async_client
_ensure_slack_mock()
from gateway.platforms.slack import SlackAdapter
from gateway.config import Platform, PlatformConfig
def _make_adapter():
"""Create a SlackAdapter instance with mocked internals."""
config = PlatformConfig(enabled=True, token="xoxb-test-token")
adapter = SlackAdapter(config)
adapter._app = MagicMock()
adapter._bot_user_id = "U_BOT"
adapter._team_clients = {"T1": AsyncMock()}
adapter._team_bot_user_ids = {"T1": "U_BOT"}
adapter._channel_team = {"C1": "T1"}
return adapter
# ===========================================================================
# send_exec_approval — Block Kit buttons
# ===========================================================================
class TestSlackExecApproval:
"""Test the send_exec_approval method sends Block Kit buttons."""
@pytest.mark.asyncio
async def test_sends_blocks_with_buttons(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.chat_postMessage = AsyncMock(return_value={"ts": "1234.5678"})
result = await adapter.send_exec_approval(
chat_id="C1",
command="rm -rf /important",
session_key="agent:main:slack:group:C1:1111",
description="dangerous deletion",
)
assert result.success is True
assert result.message_id == "1234.5678"
# Verify chat_postMessage was called with blocks
mock_client.chat_postMessage.assert_called_once()
kwargs = mock_client.chat_postMessage.call_args[1]
assert "blocks" in kwargs
blocks = kwargs["blocks"]
assert len(blocks) == 2
assert blocks[0]["type"] == "section"
assert "rm -rf /important" in blocks[0]["text"]["text"]
assert "dangerous deletion" in blocks[0]["text"]["text"]
assert blocks[1]["type"] == "actions"
elements = blocks[1]["elements"]
assert len(elements) == 4
action_ids = [e["action_id"] for e in elements]
assert "hermes_approve_once" in action_ids
assert "hermes_approve_session" in action_ids
assert "hermes_approve_always" in action_ids
assert "hermes_deny" in action_ids
# Each button carries the session key as value
for e in elements:
assert e["value"] == "agent:main:slack:group:C1:1111"
@pytest.mark.asyncio
async def test_sends_in_thread(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.chat_postMessage = AsyncMock(return_value={"ts": "1234.5678"})
await adapter.send_exec_approval(
chat_id="C1",
command="echo test",
session_key="test-session",
metadata={"thread_id": "9999.0000"},
)
kwargs = mock_client.chat_postMessage.call_args[1]
assert kwargs.get("thread_ts") == "9999.0000"
@pytest.mark.asyncio
async def test_not_connected(self):
adapter = _make_adapter()
adapter._app = None
result = await adapter.send_exec_approval(
chat_id="C1", command="ls", session_key="s"
)
assert result.success is False
@pytest.mark.asyncio
async def test_truncates_long_command(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.chat_postMessage = AsyncMock(return_value={"ts": "1.2"})
long_cmd = "x" * 5000
await adapter.send_exec_approval(
chat_id="C1", command=long_cmd, session_key="s"
)
kwargs = mock_client.chat_postMessage.call_args[1]
section_text = kwargs["blocks"][0]["text"]["text"]
assert "..." in section_text
assert len(section_text) < 5000
# ===========================================================================
# _handle_approval_action — button click handler
# ===========================================================================
class TestSlackApprovalAction:
"""Test the approval button click handler."""
@pytest.mark.asyncio
async def test_resolves_approval(self):
adapter = _make_adapter()
adapter._approval_resolved["1234.5678"] = False
ack = AsyncMock()
body = {
"message": {
"ts": "1234.5678",
"blocks": [
{"type": "section", "text": {"type": "mrkdwn", "text": "original text"}},
{"type": "actions", "elements": []},
],
},
"channel": {"id": "C1"},
"user": {"name": "norbert"},
}
action = {
"action_id": "hermes_approve_once",
"value": "agent:main:slack:group:C1:1111",
}
mock_client = adapter._team_clients["T1"]
mock_client.chat_update = AsyncMock()
with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve:
await adapter._handle_approval_action(ack, body, action)
ack.assert_called_once()
mock_resolve.assert_called_once_with("agent:main:slack:group:C1:1111", "once")
# Message should be updated with decision
mock_client.chat_update.assert_called_once()
update_kwargs = mock_client.chat_update.call_args[1]
assert "Approved once by norbert" in update_kwargs["text"]
@pytest.mark.asyncio
async def test_prevents_double_click(self):
adapter = _make_adapter()
adapter._approval_resolved["1234.5678"] = True # Already resolved
ack = AsyncMock()
body = {
"message": {"ts": "1234.5678", "blocks": []},
"channel": {"id": "C1"},
"user": {"name": "norbert"},
}
action = {
"action_id": "hermes_approve_once",
"value": "some-session",
}
with patch("tools.approval.resolve_gateway_approval") as mock_resolve:
await adapter._handle_approval_action(ack, body, action)
# Should have acked but NOT resolved
ack.assert_called_once()
mock_resolve.assert_not_called()
@pytest.mark.asyncio
async def test_deny_action(self):
adapter = _make_adapter()
adapter._approval_resolved["1.2"] = False
ack = AsyncMock()
body = {
"message": {"ts": "1.2", "blocks": [
{"type": "section", "text": {"type": "mrkdwn", "text": "cmd"}},
]},
"channel": {"id": "C1"},
"user": {"name": "alice"},
}
action = {"action_id": "hermes_deny", "value": "session-key"}
mock_client = adapter._team_clients["T1"]
mock_client.chat_update = AsyncMock()
with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve:
await adapter._handle_approval_action(ack, body, action)
mock_resolve.assert_called_once_with("session-key", "deny")
update_kwargs = mock_client.chat_update.call_args[1]
assert "Denied by alice" in update_kwargs["text"]
# ===========================================================================
# _fetch_thread_context
# ===========================================================================
class TestSlackThreadContext:
"""Test thread context fetching."""
@pytest.mark.asyncio
async def test_fetches_and_formats_context(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.conversations_replies = AsyncMock(return_value={
"messages": [
{"ts": "1000.0", "user": "U1", "text": "This is the parent message"},
{"ts": "1000.1", "user": "U2", "text": "I think we should refactor"},
{"ts": "1000.2", "user": "U1", "text": "Good idea, <@U_BOT> what do you think?"},
]
})
# Mock user name resolution
adapter._user_name_cache = {"U1": "Alice", "U2": "Bob"}
context = await adapter._fetch_thread_context(
channel_id="C1",
thread_ts="1000.0",
current_ts="1000.2", # The message that triggered the fetch
team_id="T1",
)
assert "[Thread context" in context
assert "[thread parent] Alice: This is the parent message" in context
assert "Bob: I think we should refactor" in context
# Current message should be excluded
assert "what do you think" not in context
# Bot mention should be stripped from context
assert "<@U_BOT>" not in context
@pytest.mark.asyncio
async def test_skips_bot_messages(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.conversations_replies = AsyncMock(return_value={
"messages": [
{"ts": "1000.0", "user": "U1", "text": "Parent"},
{"ts": "1000.1", "bot_id": "B1", "text": "Bot reply (should be skipped)"},
{"ts": "1000.2", "user": "U1", "text": "Current"},
]
})
adapter._user_name_cache = {"U1": "Alice"}
context = await adapter._fetch_thread_context(
channel_id="C1", thread_ts="1000.0", current_ts="1000.2", team_id="T1"
)
assert "Bot reply" not in context
assert "Alice: Parent" in context
@pytest.mark.asyncio
async def test_empty_thread(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.conversations_replies = AsyncMock(return_value={"messages": []})
context = await adapter._fetch_thread_context(
channel_id="C1", thread_ts="1000.0", current_ts="1000.1", team_id="T1"
)
assert context == ""
@pytest.mark.asyncio
async def test_api_failure_returns_empty(self):
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.conversations_replies = AsyncMock(side_effect=Exception("API error"))
context = await adapter._fetch_thread_context(
channel_id="C1", thread_ts="1000.0", current_ts="1000.1", team_id="T1"
)
assert context == ""
# ===========================================================================
# _has_active_session_for_thread — session key fix (#5833)
# ===========================================================================
class TestSessionKeyFix:
"""Test that _has_active_session_for_thread uses build_session_key."""
def test_uses_build_session_key(self):
"""Verify the fix uses build_session_key instead of manual key construction."""
adapter = _make_adapter()
# Mock session store with a known entry
mock_store = MagicMock()
mock_store._entries = {
"agent:main:slack:group:C1:1000.0": MagicMock()
}
mock_store._ensure_loaded = MagicMock()
mock_store.config = MagicMock()
mock_store.config.group_sessions_per_user = False # threads don't include user_id
mock_store.config.thread_sessions_per_user = False
adapter._session_store = mock_store
# With the fix, build_session_key should be called which respects
# group_sessions_per_user=False (no user_id appended)
result = adapter._has_active_session_for_thread(
channel_id="C1", thread_ts="1000.0", user_id="U123"
)
# Should find the session because build_session_key with
# group_sessions_per_user=False doesn't append user_id
assert result is True
def test_no_session_returns_false(self):
adapter = _make_adapter()
mock_store = MagicMock()
mock_store._entries = {}
mock_store._ensure_loaded = MagicMock()
mock_store.config = MagicMock()
mock_store.config.group_sessions_per_user = True
mock_store.config.thread_sessions_per_user = False
adapter._session_store = mock_store
result = adapter._has_active_session_for_thread(
channel_id="C1", thread_ts="1000.0", user_id="U123"
)
assert result is False
def test_no_session_store(self):
adapter = _make_adapter()
# No _session_store attribute
result = adapter._has_active_session_for_thread(
channel_id="C1", thread_ts="1000.0", user_id="U123"
)
assert result is False
# ===========================================================================
# Thread engagement — bot-started threads & mentioned threads
# ===========================================================================
class TestThreadEngagement:
"""Test _bot_message_ts and _mentioned_threads tracking."""
@pytest.mark.asyncio
async def test_send_tracks_bot_message_ts(self):
"""Bot's sent messages are tracked so thread replies work without @mention."""
adapter = _make_adapter()
mock_client = adapter._team_clients["T1"]
mock_client.chat_postMessage = AsyncMock(return_value={"ts": "9000.1"})
await adapter.send(chat_id="C1", content="Hello!", metadata={"thread_id": "8000.0"})
assert "9000.1" in adapter._bot_message_ts
# Thread root should also be tracked
assert "8000.0" in adapter._bot_message_ts
@pytest.mark.asyncio
async def test_bot_message_ts_cap(self):
"""Verify memory is bounded when many messages are sent."""
adapter = _make_adapter()
adapter._BOT_TS_MAX = 10 # low cap for testing
mock_client = adapter._team_clients["T1"]
for i in range(20):
mock_client.chat_postMessage = AsyncMock(return_value={"ts": f"{i}.0"})
await adapter.send(chat_id="C1", content=f"msg {i}")
assert len(adapter._bot_message_ts) <= 10
def test_mentioned_threads_populated_on_mention(self):
"""When bot is @mentioned in a thread, that thread is tracked."""
adapter = _make_adapter()
# Simulate what _handle_slack_message does on mention
adapter._mentioned_threads.add("1000.0")
assert "1000.0" in adapter._mentioned_threads
def test_mentioned_threads_cap(self):
"""Verify _mentioned_threads is bounded."""
adapter = _make_adapter()
adapter._MENTIONED_THREADS_MAX = 10
for i in range(15):
adapter._mentioned_threads.add(f"{i}.0")
if len(adapter._mentioned_threads) > adapter._MENTIONED_THREADS_MAX:
to_remove = list(adapter._mentioned_threads)[:adapter._MENTIONED_THREADS_MAX // 2]
for t in to_remove:
adapter._mentioned_threads.discard(t)
assert len(adapter._mentioned_threads) <= 10
@@ -1,291 +0,0 @@
"""Tests for Telegram inline keyboard approval buttons."""
import asyncio
import os
import sys
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Ensure the repo root is importable
# ---------------------------------------------------------------------------
_repo = str(Path(__file__).resolve().parents[2])
if _repo not in sys.path:
sys.path.insert(0, _repo)
# ---------------------------------------------------------------------------
# Minimal Telegram mock so TelegramAdapter can be imported
# ---------------------------------------------------------------------------
def _ensure_telegram_mock():
"""Wire up the minimal mocks required to import TelegramAdapter."""
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
mod = MagicMock()
mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
mod.constants.ParseMode.MARKDOWN = "Markdown"
mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
mod.constants.ParseMode.HTML = "HTML"
mod.constants.ChatType.PRIVATE = "private"
mod.constants.ChatType.GROUP = "group"
mod.constants.ChatType.SUPERGROUP = "supergroup"
mod.constants.ChatType.CHANNEL = "channel"
# Provide real exception classes so ``except (NetworkError, ...)`` in
# connect() doesn't blow up under xdist when this mock leaks.
mod.error.NetworkError = type("NetworkError", (OSError,), {})
mod.error.TimedOut = type("TimedOut", (OSError,), {})
mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, mod)
sys.modules.setdefault("telegram.error", mod.error)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter
from gateway.config import Platform, PlatformConfig
def _make_adapter():
"""Create a TelegramAdapter with mocked internals."""
config = PlatformConfig(enabled=True, token="test-token")
adapter = TelegramAdapter(config)
adapter._bot = AsyncMock()
adapter._app = MagicMock()
return adapter
# ===========================================================================
# send_exec_approval — inline keyboard buttons
# ===========================================================================
class TestTelegramExecApproval:
"""Test the send_exec_approval method sends InlineKeyboard buttons."""
@pytest.mark.asyncio
async def test_sends_inline_keyboard(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 42
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
result = await adapter.send_exec_approval(
chat_id="12345",
command="rm -rf /important",
session_key="agent:main:telegram:group:12345:99",
description="dangerous deletion",
)
assert result.success is True
assert result.message_id == "42"
adapter._bot.send_message.assert_called_once()
kwargs = adapter._bot.send_message.call_args[1]
assert kwargs["chat_id"] == 12345
assert "rm -rf /important" in kwargs["text"]
assert "dangerous deletion" in kwargs["text"]
assert kwargs["reply_markup"] is not None # InlineKeyboardMarkup
@pytest.mark.asyncio
async def test_stores_approval_state(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 42
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
await adapter.send_exec_approval(
chat_id="12345",
command="echo test",
session_key="my-session-key",
)
# The approval_id should map to the session_key
assert len(adapter._approval_state) == 1
approval_id = list(adapter._approval_state.keys())[0]
assert adapter._approval_state[approval_id] == "my-session-key"
@pytest.mark.asyncio
async def test_sends_in_thread(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 42
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
await adapter.send_exec_approval(
chat_id="12345",
command="ls",
session_key="s",
metadata={"thread_id": "999"},
)
kwargs = adapter._bot.send_message.call_args[1]
assert kwargs.get("message_thread_id") == 999
@pytest.mark.asyncio
async def test_not_connected(self):
adapter = _make_adapter()
adapter._bot = None
result = await adapter.send_exec_approval(
chat_id="12345", command="ls", session_key="s"
)
assert result.success is False
@pytest.mark.asyncio
async def test_truncates_long_command(self):
adapter = _make_adapter()
mock_msg = MagicMock()
mock_msg.message_id = 1
adapter._bot.send_message = AsyncMock(return_value=mock_msg)
long_cmd = "x" * 5000
await adapter.send_exec_approval(
chat_id="12345", command=long_cmd, session_key="s"
)
kwargs = adapter._bot.send_message.call_args[1]
assert "..." in kwargs["text"]
assert len(kwargs["text"]) < 5000
# ===========================================================================
# _handle_callback_query — approval button clicks
# ===========================================================================
class TestTelegramApprovalCallback:
"""Test the approval callback handling in _handle_callback_query."""
@pytest.mark.asyncio
async def test_resolves_approval_on_click(self):
adapter = _make_adapter()
# Set up approval state
adapter._approval_state[1] = "agent:main:telegram:group:12345:99"
# Mock callback query
query = AsyncMock()
query.data = "ea:once:1"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.first_name = "Norbert"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve:
await adapter._handle_callback_query(update, context)
mock_resolve.assert_called_once_with("agent:main:telegram:group:12345:99", "once")
query.answer.assert_called_once()
query.edit_message_text.assert_called_once()
# State should be cleaned up
assert 1 not in adapter._approval_state
@pytest.mark.asyncio
async def test_deny_button(self):
adapter = _make_adapter()
adapter._approval_state[2] = "some-session"
query = AsyncMock()
query.data = "ea:deny:2"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.first_name = "Alice"
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("tools.approval.resolve_gateway_approval", return_value=1) as mock_resolve:
await adapter._handle_callback_query(update, context)
mock_resolve.assert_called_once_with("some-session", "deny")
edit_kwargs = query.edit_message_text.call_args[1]
assert "Denied" in edit_kwargs["text"]
@pytest.mark.asyncio
async def test_already_resolved(self):
adapter = _make_adapter()
# No state for approval_id 99 — already resolved
query = AsyncMock()
query.data = "ea:once:99"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.first_name = "Bob"
query.answer = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("tools.approval.resolve_gateway_approval") as mock_resolve:
await adapter._handle_callback_query(update, context)
# Should NOT resolve — already handled
mock_resolve.assert_not_called()
# Should still ack with "already resolved" message
query.answer.assert_called_once()
assert "already been resolved" in query.answer.call_args[1]["text"]
@pytest.mark.asyncio
async def test_model_picker_callback_not_affected(self):
"""Ensure model picker callbacks still route correctly."""
adapter = _make_adapter()
query = AsyncMock()
query.data = "mp:some_provider"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
# Model picker callback should be handled (not crash)
# We just verify it doesn't try to resolve an approval
with patch("tools.approval.resolve_gateway_approval") as mock_resolve:
with patch.object(adapter, "_handle_model_picker_callback", new_callable=AsyncMock):
await adapter._handle_callback_query(update, context)
mock_resolve.assert_not_called()
@pytest.mark.asyncio
async def test_update_prompt_callback_not_affected(self):
"""Ensure update prompt callbacks still work."""
adapter = _make_adapter()
query = AsyncMock()
query.data = "update_prompt:y"
query.message = MagicMock()
query.message.chat_id = 12345
query.from_user = MagicMock()
query.from_user.id = 123
query.answer = AsyncMock()
query.edit_message_text = AsyncMock()
update = MagicMock()
update.callback_query = query
context = MagicMock()
with patch("tools.approval.resolve_gateway_approval") as mock_resolve:
with patch("hermes_constants.get_hermes_home", return_value=Path("/tmp/test")):
try:
await adapter._handle_callback_query(update, context)
except Exception:
pass # May fail on file write, that's fine
# Should NOT have triggered approval resolution
mock_resolve.assert_not_called()
-8
View File
@@ -20,16 +20,8 @@ def _ensure_telegram_mock():
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
# Provide real exception classes so ``except (NetworkError, ...)`` in
# connect() doesn't blow up with "catching classes that do not inherit
# from BaseException" when another xdist worker pollutes sys.modules.
telegram_mod.error.NetworkError = type("NetworkError", (OSError,), {})
telegram_mod.error.TimedOut = type("TimedOut", (OSError,), {})
telegram_mod.error.BadRequest = type("BadRequest", (Exception,), {})
for name in ("telegram", "telegram.ext", "telegram.constants", "telegram.request"):
sys.modules.setdefault(name, telegram_mod)
sys.modules.setdefault("telegram.error", telegram_mod.error)
_ensure_telegram_mock()
+1 -233
View File
@@ -1,15 +1,6 @@
"""Tests for the hermes_cli models module."""
from unittest.mock import patch, MagicMock
from hermes_cli.models import (
OPENROUTER_MODELS, menu_labels, model_ids, detect_provider_for_model,
filter_nous_free_models, _NOUS_ALLOWED_FREE_MODELS,
is_nous_free_tier, partition_nous_models_by_tier,
check_nous_free_tier, clear_nous_free_tier_cache,
_FREE_TIER_CACHE_TTL,
)
import hermes_cli.models as _models_mod
from hermes_cli.models import OPENROUTER_MODELS, menu_labels, model_ids, detect_provider_for_model
class TestModelIds:
@@ -133,226 +124,3 @@ class TestDetectProviderForModel:
result = detect_provider_for_model("claude-opus-4-6", "openai-codex")
assert result is not None
assert result[0] not in ("nous",) # nous has claude models but shouldn't be suggested
class TestFilterNousFreeModels:
"""Tests for filter_nous_free_models — Nous Portal free-model policy."""
_PAID = {"prompt": "0.000003", "completion": "0.000015"}
_FREE = {"prompt": "0", "completion": "0"}
def test_paid_models_kept(self):
"""Regular paid models pass through unchanged."""
models = ["anthropic/claude-opus-4.6", "openai/gpt-5.4"]
pricing = {m: self._PAID for m in models}
assert filter_nous_free_models(models, pricing) == models
def test_free_non_allowlist_models_removed(self):
"""Free models NOT in the allowlist are filtered out."""
models = ["anthropic/claude-opus-4.6", "arcee-ai/trinity-large-preview:free"]
pricing = {
"anthropic/claude-opus-4.6": self._PAID,
"arcee-ai/trinity-large-preview:free": self._FREE,
}
result = filter_nous_free_models(models, pricing)
assert result == ["anthropic/claude-opus-4.6"]
def test_allowlist_model_kept_when_free(self):
"""Allowlist models are kept when they report as free."""
models = ["anthropic/claude-opus-4.6", "xiaomi/mimo-v2-pro"]
pricing = {
"anthropic/claude-opus-4.6": self._PAID,
"xiaomi/mimo-v2-pro": self._FREE,
}
result = filter_nous_free_models(models, pricing)
assert result == ["anthropic/claude-opus-4.6", "xiaomi/mimo-v2-pro"]
def test_allowlist_model_removed_when_paid(self):
"""Allowlist models are removed when they are NOT free."""
models = ["anthropic/claude-opus-4.6", "xiaomi/mimo-v2-pro"]
pricing = {
"anthropic/claude-opus-4.6": self._PAID,
"xiaomi/mimo-v2-pro": self._PAID,
}
result = filter_nous_free_models(models, pricing)
assert result == ["anthropic/claude-opus-4.6"]
def test_no_pricing_returns_all(self):
"""When pricing data is unavailable, all models pass through."""
models = ["anthropic/claude-opus-4.6", "nvidia/nemotron-3-super-120b-a12b:free"]
assert filter_nous_free_models(models, {}) == models
def test_model_with_no_pricing_entry_treated_as_paid(self):
"""A model missing from the pricing dict is kept (assumed paid)."""
models = ["anthropic/claude-opus-4.6", "openai/gpt-5.4"]
pricing = {"anthropic/claude-opus-4.6": self._PAID} # gpt-5.4 not in pricing
result = filter_nous_free_models(models, pricing)
assert result == models
def test_mixed_scenario(self):
"""End-to-end: mix of paid, free-allowed, free-disallowed, allowlist-not-free."""
models = [
"anthropic/claude-opus-4.6", # paid, not allowlist → keep
"nvidia/nemotron-3-super-120b-a12b:free", # free, not allowlist → drop
"xiaomi/mimo-v2-pro", # free, allowlist → keep
"xiaomi/mimo-v2-omni", # paid, allowlist → drop
"openai/gpt-5.4", # paid, not allowlist → keep
]
pricing = {
"anthropic/claude-opus-4.6": self._PAID,
"nvidia/nemotron-3-super-120b-a12b:free": self._FREE,
"xiaomi/mimo-v2-pro": self._FREE,
"xiaomi/mimo-v2-omni": self._PAID,
"openai/gpt-5.4": self._PAID,
}
result = filter_nous_free_models(models, pricing)
assert result == [
"anthropic/claude-opus-4.6",
"xiaomi/mimo-v2-pro",
"openai/gpt-5.4",
]
def test_allowlist_contains_expected_models(self):
"""Sanity: the allowlist has the models we expect."""
assert "xiaomi/mimo-v2-pro" in _NOUS_ALLOWED_FREE_MODELS
assert "xiaomi/mimo-v2-omni" in _NOUS_ALLOWED_FREE_MODELS
class TestIsNousFreeTier:
"""Tests for is_nous_free_tier — account tier detection."""
def test_paid_plus_tier(self):
assert is_nous_free_tier({"subscription": {"plan": "Plus", "tier": 2, "monthly_charge": 20}}) is False
def test_free_tier_by_charge(self):
assert is_nous_free_tier({"subscription": {"plan": "Free", "tier": 0, "monthly_charge": 0}}) is True
def test_no_charge_field_not_free(self):
"""Missing monthly_charge defaults to not-free (don't block users)."""
assert is_nous_free_tier({"subscription": {"plan": "Free", "tier": 0}}) is False
def test_plan_name_alone_not_free(self):
"""Plan name alone is not enough — monthly_charge is required."""
assert is_nous_free_tier({"subscription": {"plan": "free"}}) is False
def test_empty_subscription_not_free(self):
"""Empty subscription dict defaults to not-free (don't block users)."""
assert is_nous_free_tier({"subscription": {}}) is False
def test_no_subscription_not_free(self):
"""Missing subscription key returns False."""
assert is_nous_free_tier({}) is False
def test_empty_response_not_free(self):
"""Completely empty response defaults to not-free."""
assert is_nous_free_tier({}) is False
class TestPartitionNousModelsByTier:
"""Tests for partition_nous_models_by_tier — free vs paid tier model split."""
_PAID = {"prompt": "0.000003", "completion": "0.000015"}
_FREE = {"prompt": "0", "completion": "0"}
def test_paid_tier_all_selectable(self):
"""Paid users get all models as selectable, none unavailable."""
models = ["anthropic/claude-opus-4.6", "xiaomi/mimo-v2-pro"]
pricing = {"anthropic/claude-opus-4.6": self._PAID, "xiaomi/mimo-v2-pro": self._FREE}
sel, unav = partition_nous_models_by_tier(models, pricing, free_tier=False)
assert sel == models
assert unav == []
def test_free_tier_splits_correctly(self):
"""Free users see only free models; paid ones are unavailable."""
models = ["anthropic/claude-opus-4.6", "xiaomi/mimo-v2-pro", "openai/gpt-5.4"]
pricing = {
"anthropic/claude-opus-4.6": self._PAID,
"xiaomi/mimo-v2-pro": self._FREE,
"openai/gpt-5.4": self._PAID,
}
sel, unav = partition_nous_models_by_tier(models, pricing, free_tier=True)
assert sel == ["xiaomi/mimo-v2-pro"]
assert unav == ["anthropic/claude-opus-4.6", "openai/gpt-5.4"]
def test_no_pricing_returns_all(self):
"""Without pricing data, all models are selectable."""
models = ["anthropic/claude-opus-4.6", "openai/gpt-5.4"]
sel, unav = partition_nous_models_by_tier(models, {}, free_tier=True)
assert sel == models
assert unav == []
def test_all_free_models(self):
"""When all models are free, free-tier users can select all."""
models = ["xiaomi/mimo-v2-pro", "xiaomi/mimo-v2-omni"]
pricing = {m: self._FREE for m in models}
sel, unav = partition_nous_models_by_tier(models, pricing, free_tier=True)
assert sel == models
assert unav == []
def test_all_paid_models(self):
"""When all models are paid, free-tier users have none selectable."""
models = ["anthropic/claude-opus-4.6", "openai/gpt-5.4"]
pricing = {m: self._PAID for m in models}
sel, unav = partition_nous_models_by_tier(models, pricing, free_tier=True)
assert sel == []
assert unav == models
class TestCheckNousFreeTierCache:
"""Tests for the TTL cache on check_nous_free_tier()."""
def setup_method(self):
"""Reset cache before each test."""
clear_nous_free_tier_cache()
def teardown_method(self):
"""Reset cache after each test."""
clear_nous_free_tier_cache()
@patch("hermes_cli.models.fetch_nous_account_tier")
@patch("hermes_cli.models.is_nous_free_tier", return_value=True)
def test_result_is_cached(self, mock_is_free, mock_fetch):
"""Second call within TTL returns cached result without API call."""
mock_fetch.return_value = {"subscription": {"monthly_charge": 0}}
with patch("hermes_cli.auth.get_provider_auth_state", return_value={"access_token": "tok"}), \
patch("hermes_cli.auth.resolve_nous_runtime_credentials"):
result1 = check_nous_free_tier()
result2 = check_nous_free_tier()
assert result1 is True
assert result2 is True
# fetch_nous_account_tier should only be called once (cached on second call)
assert mock_fetch.call_count == 1
@patch("hermes_cli.models.fetch_nous_account_tier")
@patch("hermes_cli.models.is_nous_free_tier", return_value=False)
def test_cache_expires_after_ttl(self, mock_is_free, mock_fetch):
"""After TTL expires, the API is called again."""
mock_fetch.return_value = {"subscription": {"monthly_charge": 20}}
with patch("hermes_cli.auth.get_provider_auth_state", return_value={"access_token": "tok"}), \
patch("hermes_cli.auth.resolve_nous_runtime_credentials"):
result1 = check_nous_free_tier()
assert mock_fetch.call_count == 1
# Simulate TTL expiry by backdating the cache timestamp
cached_result, cached_at = _models_mod._free_tier_cache
_models_mod._free_tier_cache = (cached_result, cached_at - _FREE_TIER_CACHE_TTL - 1)
result2 = check_nous_free_tier()
assert mock_fetch.call_count == 2
assert result1 is False
assert result2 is False
def test_clear_cache_forces_refresh(self):
"""clear_nous_free_tier_cache() invalidates the cached result."""
# Manually seed the cache
import time
_models_mod._free_tier_cache = (True, time.monotonic())
clear_nous_free_tier_cache()
assert _models_mod._free_tier_cache is None
def test_cache_ttl_is_short(self):
"""TTL should be short enough to catch upgrades quickly (<=5 min)."""
assert _FREE_TIER_CACHE_TTL <= 300
@@ -184,8 +184,6 @@ class TestSetupWizardOpenclawIntegration:
patch("hermes_cli.auth.get_active_provider", return_value=None),
# User presses Enter to start
patch("builtins.input", return_value=""),
# Select "Full setup" (index 1) so we exercise the full path
patch.object(setup_mod, "prompt_choice", return_value=1),
# Mock the migration offer
patch.object(
setup_mod, "_offer_openclaw_migration", return_value=False
@@ -198,7 +196,6 @@ class TestSetupWizardOpenclawIntegration:
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
patch.object(setup_mod, "_offer_launch_chat"),
):
setup_mod.run_setup_wizard(args)
@@ -221,7 +218,6 @@ class TestSetupWizardOpenclawIntegration:
patch.object(setup_mod, "is_interactive_stdin", return_value=True),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "prompt_choice", return_value=1),
patch.object(setup_mod, "_offer_openclaw_migration", return_value=True),
patch.object(setup_mod, "setup_model_provider"),
patch.object(setup_mod, "setup_terminal_backend"),
@@ -230,7 +226,6 @@ class TestSetupWizardOpenclawIntegration:
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
patch.object(setup_mod, "_offer_launch_chat"),
):
setup_mod.run_setup_wizard(args)
@@ -254,7 +249,6 @@ class TestSetupWizardOpenclawIntegration:
patch.object(setup_mod, "is_interactive_stdin", return_value=True),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "prompt_choice", return_value=1),
patch.object(setup_mod, "_offer_openclaw_migration", return_value=True),
patch.object(setup_mod, "setup_model_provider") as setup_model_provider,
patch.object(setup_mod, "setup_terminal_backend"),
@@ -263,7 +257,6 @@ class TestSetupWizardOpenclawIntegration:
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
patch.object(setup_mod, "_offer_launch_chat"),
):
setup_mod.run_setup_wizard(args)
@@ -445,7 +438,6 @@ class TestSetupWizardSkipsConfiguredSections:
patch.object(setup_mod, "is_interactive_stdin", return_value=True),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "prompt_choice", return_value=1),
# Migration succeeds and flips the env_side flag
patch.object(
setup_mod, "_offer_openclaw_migration",
+28 -22
View File
@@ -15,7 +15,7 @@ def test_version_string_no_v_prefix():
assert not __version__.startswith("v"), f"__version__ should not start with 'v', got {__version__!r}"
def test_check_for_updates_uses_cache(tmp_path, monkeypatch):
def test_check_for_updates_uses_cache(tmp_path):
"""When cache is fresh, check_for_updates should return cached value without calling git."""
from hermes_cli.banner import check_for_updates
@@ -27,15 +27,15 @@ def test_check_for_updates_uses_cache(tmp_path, monkeypatch):
cache_file = tmp_path / ".update_check"
cache_file.write_text(json.dumps({"ts": time.time(), "behind": 3}))
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with patch("hermes_cli.banner.subprocess.run") as mock_run:
result = check_for_updates()
with patch("hermes_cli.banner.os.getenv", return_value=str(tmp_path)):
with patch("hermes_cli.banner.subprocess.run") as mock_run:
result = check_for_updates()
assert result == 3
mock_run.assert_not_called()
def test_check_for_updates_expired_cache(tmp_path, monkeypatch):
def test_check_for_updates_expired_cache(tmp_path):
"""When cache is expired, check_for_updates should call git fetch."""
from hermes_cli.banner import check_for_updates
@@ -49,15 +49,15 @@ def test_check_for_updates_expired_cache(tmp_path, monkeypatch):
mock_result = MagicMock(returncode=0, stdout="5\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with patch("hermes_cli.banner.subprocess.run", return_value=mock_result) as mock_run:
result = check_for_updates()
with patch("hermes_cli.banner.os.getenv", return_value=str(tmp_path)):
with patch("hermes_cli.banner.subprocess.run", return_value=mock_result) as mock_run:
result = check_for_updates()
assert result == 5
assert mock_run.call_count == 2 # git fetch + git rev-list
def test_check_for_updates_no_git_dir(tmp_path, monkeypatch):
def test_check_for_updates_no_git_dir(tmp_path):
"""Returns None when .git directory doesn't exist anywhere."""
import hermes_cli.banner as banner
@@ -66,15 +66,19 @@ def test_check_for_updates_no_git_dir(tmp_path, monkeypatch):
fake_banner.parent.mkdir(parents=True, exist_ok=True)
fake_banner.touch()
monkeypatch.setattr(banner, "__file__", str(fake_banner))
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with patch("hermes_cli.banner.subprocess.run") as mock_run:
result = banner.check_for_updates()
assert result is None
mock_run.assert_not_called()
original = banner.__file__
try:
banner.__file__ = str(fake_banner)
with patch("hermes_cli.banner.os.getenv", return_value=str(tmp_path)):
with patch("hermes_cli.banner.subprocess.run") as mock_run:
result = banner.check_for_updates()
assert result is None
mock_run.assert_not_called()
finally:
banner.__file__ = original
def test_check_for_updates_fallback_to_project_root(tmp_path, monkeypatch):
def test_check_for_updates_fallback_to_project_root():
"""Dev install: falls back to Path(__file__).parent.parent when HERMES_HOME has no git repo."""
import hermes_cli.banner as banner
@@ -83,12 +87,14 @@ def test_check_for_updates_fallback_to_project_root(tmp_path, monkeypatch):
pytest.skip("Not running from a git checkout")
# Point HERMES_HOME at a temp dir with no hermes-agent/.git
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with patch("hermes_cli.banner.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="0\n")
result = banner.check_for_updates()
# Should have fallen back to project root and run git commands
assert mock_run.call_count >= 1
import tempfile
with tempfile.TemporaryDirectory() as td:
with patch("hermes_cli.banner.os.getenv", return_value=td):
with patch("hermes_cli.banner.subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="0\n")
result = banner.check_for_updates()
# Should have fallen back to project root and run git commands
assert mock_run.call_count >= 1
def test_prefetch_non_blocking():
View File
@@ -16,7 +16,7 @@ from unittest.mock import MagicMock
import pytest
# Ensure repo root is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
try:
from environments.agent_loop import (
@@ -31,7 +31,7 @@ import pytest
# pytestmark removed — tests skip gracefully via OPENROUTER_API_KEY check on line 59
# Ensure repo root is importable
_repo_root = Path(__file__).resolve().parent.parent.parent
_repo_root = Path(__file__).resolve().parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))

Some files were not shown because too many files have changed in this diff Show More