Compare commits

..

3 Commits

Author SHA1 Message Date
Teknium 3cd61bf7bf docs: add Private Chat Topics section to Telegram docs
Documents DM topic configuration, skill binding, session isolation,
and how topic creation/persistence works. Updates the Recent Bot API
Features section to include Bot API 9.4.
2026-03-26 02:03:34 -07:00
Teknium 99b1343892 feat: make DM topic skill binding functional
- Add auto_skill field to MessageEvent for topic-skill bindings
- Gateway auto-loads the bound skill on new sessions via _load_skill_payload
- Skill content is injected into the first message (same as /skill commands)
- Subsequent messages in the session see it in conversation history
- Clean chat_topic (no [skill: ...] suffix) — skill flows via auto_skill field
- Add 3 tests for _build_message_event auto_skill behavior
2026-03-26 01:59:14 -07:00
Teknium 602dd4f2fa feat(telegram): Private Chat Topics support (Bot API 9.4)
Cherry-picked from PR #3005 by web3blind.
Adds DM topic creation, persistence, and session isolation via Bot API 9.4.
Closes #2598
2026-03-26 01:46:26 -07:00
61 changed files with 308 additions and 2025 deletions
-1
View File
@@ -173,7 +173,6 @@ if canonical == "mycommand":
- `args_hint` — argument placeholder shown in help (e.g. `"<prompt>"`, `"[name]"`)
- `cli_only` — only available in the interactive CLI
- `gateway_only` — only available in messaging platforms
- `gateway_config_gate` — config dotpath (e.g. `"display.tool_progress_command"`); when set on a `cli_only` command, the command becomes available in the gateway if the config value is truthy. `GATEWAY_KNOWN_COMMANDS` always includes config-gated commands so the gateway can dispatch them; help/menus only show them when the gate is open.
**Adding an alias** requires only adding it to the `aliases` tuple on the existing `CommandDef`. No other file changes needed — dispatch, help text, Telegram menu, Slack mapping, and autocomplete all update automatically.
+26 -33
View File
@@ -210,12 +210,9 @@ def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]:
Only works for credentials that have a refresh token (from claude /login
or claude setup-token with OAuth flow).
Tries the new platform.claude.com endpoint first (Claude Code >=2.1.81),
then falls back to console.anthropic.com for older tokens.
Returns the new access token, or None if refresh fails.
"""
import time
import urllib.parse
import urllib.request
refresh_token = creds.get("refreshToken", "")
@@ -226,42 +223,38 @@ def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]:
# Client ID used by Claude Code's OAuth flow
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e"
# Anthropic migrated OAuth from console.anthropic.com to platform.claude.com
# (Claude Code v2.1.81+). Try new endpoint first, fall back to old.
token_endpoints = [
"https://platform.claude.com/v1/oauth/token",
"https://console.anthropic.com/v1/oauth/token",
]
payload = json.dumps({
data = urllib.parse.urlencode({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
}).encode()
headers = {
"Content-Type": "application/json",
"User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)",
}
req = urllib.request.Request(
"https://console.anthropic.com/v1/oauth/token",
data=data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)",
},
method="POST",
)
for endpoint in token_endpoints:
req = urllib.request.Request(
endpoint, data=payload, headers=headers, method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
new_access = result.get("access_token", "")
new_refresh = result.get("refresh_token", refresh_token)
expires_in = result.get("expires_in", 3600)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
new_access = result.get("access_token", "")
new_refresh = result.get("refresh_token", refresh_token)
expires_in = result.get("expires_in", 3600) # seconds
if new_access:
new_expires_ms = int(time.time() * 1000) + (expires_in * 1000)
_write_claude_code_credentials(new_access, new_refresh, new_expires_ms)
logger.debug("Refreshed Claude Code OAuth token via %s", endpoint)
return new_access
except Exception as e:
logger.debug("Token refresh failed at %s: %s", endpoint, e)
if new_access:
import time
new_expires_ms = int(time.time() * 1000) + (expires_in * 1000)
# Write refreshed credentials back to ~/.claude/.credentials.json
_write_claude_code_credentials(new_access, new_refresh, new_expires_ms)
logger.debug("Successfully refreshed Claude Code OAuth token")
return new_access
except Exception as e:
logger.debug("Failed to refresh Claude Code token: %s", e)
return None
+1 -1
View File
@@ -82,7 +82,7 @@ auxiliary_is_nous: bool = False
# Default auxiliary models per provider
_OPENROUTER_MODEL = "google/gemini-3-flash-preview"
_NOUS_MODEL = "google/gemini-3-flash-preview"
_NOUS_MODEL = "gemini-3-flash"
_NOUS_DEFAULT_BASE_URL = "https://inference-api.nousresearch.com/v1"
_ANTHROPIC_DEFAULT_BASE_URL = "https://api.anthropic.com"
_AUTH_JSON_PATH = get_hermes_home() / "auth.json"
-23
View File
@@ -895,26 +895,3 @@ def estimate_messages_tokens_rough(messages: List[Dict[str, Any]]) -> int:
"""Rough token estimate for a message list (pre-flight only)."""
total_chars = sum(len(str(msg)) for msg in messages)
return total_chars // 4
def estimate_request_tokens_rough(
messages: List[Dict[str, Any]],
*,
system_prompt: str = "",
tools: Optional[List[Dict[str, Any]]] = None,
) -> int:
"""Rough token estimate for a full chat-completions request.
Includes the major payload buckets Hermes sends to providers:
system prompt, conversation messages, and tool schemas. With 50+
tools enabled, schemas alone can add 20-30K tokens — a significant
blind spot when only counting messages.
"""
total_chars = 0
if system_prompt:
total_chars += len(system_prompt)
if messages:
total_chars += sum(len(str(msg)) for msg in messages)
if tools:
total_chars += len(str(tools))
return total_chars // 4
+9 -6
View File
@@ -2916,7 +2916,7 @@ class HermesCLI:
try:
self._session_db.create_session(
session_id=self.session_id,
source=os.environ.get("HERMES_SESSION_SOURCE", "cli"),
source="cli",
model=self.model,
model_config={
"max_iterations": self.max_turns,
@@ -7163,13 +7163,13 @@ class HermesCLI:
if self.agent and getattr(self.agent, '_honcho', None):
try:
self.agent._honcho.shutdown()
except (Exception, KeyboardInterrupt):
except Exception:
pass
# Close session in SQLite
if hasattr(self, '_session_db') and self._session_db and self.agent:
try:
self._session_db.end_session(self.agent.session_id, "cli_close")
except (Exception, KeyboardInterrupt) as e:
except Exception as e:
logger.debug("Could not close session in DB: %s", e)
_run_cleanup()
self._print_exit_summary()
@@ -7288,9 +7288,12 @@ def main(
else:
toolsets_list.append(str(t))
else:
# Use the shared resolver so MCP servers are included at runtime
from hermes_cli.tools_config import _get_platform_tools
toolsets_list = sorted(_get_platform_tools(CLI_CONFIG, "cli"))
# Check config for CLI toolsets, fallback to hermes-cli
config_cli_toolsets = CLI_CONFIG.get("platform_toolsets", {}).get("cli")
if config_cli_toolsets and isinstance(config_cli_toolsets, list):
toolsets_list = config_cli_toolsets
else:
toolsets_list = ["hermes-cli"]
parsed_skills = _parse_skills_argument(skills)
+2 -2
View File
@@ -474,11 +474,11 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
if _session_db:
try:
_session_db.end_session(_cron_session_id, "cron_complete")
except (Exception, KeyboardInterrupt) as e:
except Exception as e:
logger.debug("Job '%s': failed to end session: %s", job_id, e)
try:
_session_db.close()
except (Exception, KeyboardInterrupt) as e:
except Exception as e:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
-1
View File
@@ -383,7 +383,6 @@ class APIServerAdapter(BasePlatformAdapter):
quiet_mode=True,
verbose_logging=False,
ephemeral_system_prompt=ephemeral_system_prompt or None,
enabled_toolsets=["hermes-api-server"],
session_id=session_id,
platform="api_server",
stream_delta_callback=stream_delta_callback,
+1 -2
View File
@@ -446,7 +446,6 @@ class DiscordAdapter(BasePlatformAdapter):
# Persistent typing indicator loops per channel (DMs don't reliably
# show the standard typing gateway event for bots)
self._typing_tasks: Dict[str, asyncio.Task] = {}
self._bot_task: Optional[asyncio.Task] = None
# Cap to prevent unbounded growth (Discord threads get archived).
self._MAX_TRACKED_THREADS = 500
@@ -589,7 +588,7 @@ class DiscordAdapter(BasePlatformAdapter):
self._register_slash_commands()
# Start the bot in background
self._bot_task = asyncio.create_task(self._client.start(self.config.token))
asyncio.create_task(self._client.start(self.config.token))
# Wait for ready
await asyncio.wait_for(self._ready_event.wait(), timeout=30)
+5 -5
View File
@@ -224,7 +224,7 @@ class EmailAdapter(BasePlatformAdapter):
"""Connect to the IMAP server and start polling for new messages."""
try:
# Test IMAP connection
imap = imaplib.IMAP4_SSL(self._imap_host, self._imap_port, timeout=30)
imap = imaplib.IMAP4_SSL(self._imap_host, self._imap_port)
imap.login(self._address, self._password)
# Mark all existing messages as seen so we only process new ones
imap.select("INBOX")
@@ -240,7 +240,7 @@ class EmailAdapter(BasePlatformAdapter):
try:
# Test SMTP connection
smtp = smtplib.SMTP(self._smtp_host, self._smtp_port, timeout=30)
smtp = smtplib.SMTP(self._smtp_host, self._smtp_port)
smtp.starttls(context=ssl.create_default_context())
smtp.login(self._address, self._password)
smtp.quit()
@@ -289,7 +289,7 @@ class EmailAdapter(BasePlatformAdapter):
"""Fetch new (unseen) messages from IMAP. Runs in executor thread."""
results = []
try:
imap = imaplib.IMAP4_SSL(self._imap_host, self._imap_port, timeout=30)
imap = imaplib.IMAP4_SSL(self._imap_host, self._imap_port)
imap.login(self._address, self._password)
imap.select("INBOX")
@@ -442,7 +442,7 @@ class EmailAdapter(BasePlatformAdapter):
msg.attach(MIMEText(body, "plain", "utf-8"))
smtp = smtplib.SMTP(self._smtp_host, self._smtp_port, timeout=30)
smtp = smtplib.SMTP(self._smtp_host, self._smtp_port)
smtp.starttls(context=ssl.create_default_context())
smtp.login(self._address, self._password)
smtp.send_message(msg)
@@ -529,7 +529,7 @@ class EmailAdapter(BasePlatformAdapter):
part.add_header("Content-Disposition", f"attachment; filename={fname}")
msg.attach(part)
smtp = smtplib.SMTP(self._smtp_host, self._smtp_port, timeout=30)
smtp = smtplib.SMTP(self._smtp_host, self._smtp_port)
smtp.starttls(context=ssl.create_default_context())
smtp.login(self._address, self._password)
smtp.send_message(msg)
+3 -7
View File
@@ -114,9 +114,7 @@ class HomeAssistantAdapter(BasePlatformAdapter):
return False
# Dedicated REST session for send() calls
self._rest_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
self._rest_session = aiohttp.ClientSession()
# Warn if no event filters are configured
if not self._watch_domains and not self._watch_entities and not self._watch_all:
@@ -142,10 +140,8 @@ class HomeAssistantAdapter(BasePlatformAdapter):
ws_url = self._hass_url.replace("http://", "ws://").replace("https://", "wss://")
ws_url = f"{ws_url}/api/websocket"
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
self._ws = await self._session.ws_connect(ws_url, heartbeat=30, timeout=30)
self._session = aiohttp.ClientSession()
self._ws = await self._session.ws_connect(ws_url, heartbeat=30)
# Step 1: Receive auth_required
msg = await self._ws.receive_json()
+4 -7
View File
@@ -116,7 +116,7 @@ class MattermostAdapter(BasePlatformAdapter):
import aiohttp
url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
try:
async with self._session.get(url, headers=self._headers(), timeout=aiohttp.ClientTimeout(total=30)) as resp:
async with self._session.get(url, headers=self._headers()) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error("MM API GET %s%s: %s", path, resp.status, body[:200])
@@ -134,8 +134,7 @@ class MattermostAdapter(BasePlatformAdapter):
url = f"{self._base_url}/api/v4/{path.lstrip('/')}"
try:
async with self._session.post(
url, headers=self._headers(), json=payload,
timeout=aiohttp.ClientTimeout(total=30)
url, headers=self._headers(), json=payload
) as resp:
if resp.status >= 400:
body = await resp.text()
@@ -181,7 +180,7 @@ class MattermostAdapter(BasePlatformAdapter):
content_type=content_type,
)
headers = {"Authorization": f"Bearer {self._token}"}
async with self._session.post(url, headers=headers, data=form, timeout=aiohttp.ClientTimeout(total=60)) as resp:
async with self._session.post(url, headers=headers, data=form) as resp:
if resp.status >= 400:
body = await resp.text()
logger.error("MM file upload → %s: %s", resp.status, body[:200])
@@ -202,9 +201,7 @@ class MattermostAdapter(BasePlatformAdapter):
logger.error("Mattermost: URL or token not configured")
return False
self._session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
self._session = aiohttp.ClientSession()
self._closing = False
# Verify credentials and fetch bot identity.
+1 -3
View File
@@ -344,9 +344,7 @@ class SignalAdapter(BasePlatformAdapter):
"""Force SSE reconnection by closing the current response."""
if self._sse_response and not self._sse_response.is_stream_consumed:
try:
task = asyncio.create_task(self._sse_response.aclose())
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
asyncio.create_task(self._sse_response.aclose())
except Exception:
pass
self._sse_response = None
+1 -2
View File
@@ -72,7 +72,6 @@ class SlackAdapter(BasePlatformAdapter):
self._handler: Optional[AsyncSocketModeHandler] = None
self._bot_user_id: Optional[str] = None
self._user_name_cache: Dict[str, str] = {} # user_id → display name
self._socket_mode_task: Optional[asyncio.Task] = None
async def connect(self) -> bool:
"""Connect to Slack via Socket Mode."""
@@ -120,7 +119,7 @@ class SlackAdapter(BasePlatformAdapter):
# Start Socket Mode handler in background
self._handler = AsyncSocketModeHandler(self._app, app_token)
self._socket_mode_task = asyncio.create_task(self._handler.start_async())
asyncio.create_task(self._handler.start_async())
self._running = True
logger.info("[Slack] Connected as @%s (Socket Mode)", bot_name)
+3 -9
View File
@@ -106,9 +106,7 @@ class SmsAdapter(BasePlatformAdapter):
await self._runner.setup()
site = web.TCPSite(self._runner, "0.0.0.0", self._webhook_port)
await site.start()
self._http_session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
)
self._http_session = aiohttp.ClientSession()
self._running = True
logger.info(
@@ -146,9 +144,7 @@ class SmsAdapter(BasePlatformAdapter):
"Authorization": self._basic_auth_header(),
}
session = self._http_session or aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
)
session = self._http_session or aiohttp.ClientSession()
try:
for chunk in chunks:
form_data = aiohttp.FormData()
@@ -265,9 +261,7 @@ class SmsAdapter(BasePlatformAdapter):
)
# Non-blocking: Twilio expects a fast response
task = asyncio.create_task(self.handle_message(event))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
asyncio.create_task(self.handle_message(event))
# Return empty TwiML — we send replies via the REST API, not inline TwiML
return web.Response(
+1 -8
View File
@@ -219,14 +219,7 @@ class TelegramAdapter(BasePlatformAdapter):
self._polling_network_error_count = 0
except Exception as retry_err:
logger.warning("[%s] Telegram polling reconnect failed: %s", self.name, retry_err)
# start_polling failed — polling is dead and no further error
# callbacks will fire, so schedule the next retry ourselves.
if not self.has_fatal_error:
task = asyncio.ensure_future(
self._handle_polling_network_error(retry_err)
)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
# The next network error will trigger another attempt.
async def _handle_polling_conflict(self, error: Exception) -> None:
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
+1 -3
View File
@@ -363,9 +363,7 @@ class WebhookAdapter(BasePlatformAdapter):
)
# Non-blocking — return 202 Accepted immediately
task = asyncio.create_task(self.handle_message(event))
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)
asyncio.create_task(self.handle_message(event))
return web.json_response(
{
+2 -3
View File
@@ -140,7 +140,6 @@ class WhatsAppAdapter(BasePlatformAdapter):
self._message_queue: asyncio.Queue = asyncio.Queue()
self._bridge_log_fh = None
self._bridge_log: Optional[Path] = None
self._poll_task: Optional[asyncio.Task] = None
async def connect(self) -> bool:
"""
@@ -199,7 +198,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
self._mark_connected()
self._bridge_process = None # Not managed by us
self._poll_task = asyncio.create_task(self._poll_messages())
asyncio.create_task(self._poll_messages())
return True
else:
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
@@ -305,7 +304,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
print(f"[{self.name}] If session expired, re-pair: hermes whatsapp")
# Start message polling task
self._poll_task = asyncio.create_task(self._poll_messages())
asyncio.create_task(self._poll_messages())
self._mark_connected()
print(f"[{self.name}] Bridge started on port {self._bridge_port}")
+120 -127
View File
@@ -257,25 +257,7 @@ def _resolve_runtime_agent_kwargs() -> dict:
}
def _platform_config_key(platform: "Platform") -> str:
"""Map a Platform enum to its config.yaml key (LOCAL→"cli", rest→enum value)."""
return "cli" if platform == Platform.LOCAL else platform.value
def _load_gateway_config() -> dict:
"""Load and parse ~/.hermes/config.yaml, returning {} on any error."""
try:
config_path = _hermes_home / 'config.yaml'
if config_path.exists():
import yaml
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f) or {}
except Exception:
logger.debug("Could not load gateway config from %s", _hermes_home / 'config.yaml')
return {}
def _resolve_gateway_model(config: dict | None = None) -> str:
def _resolve_gateway_model() -> str:
"""Read model from env/config — mirrors the resolution in _run_agent_sync.
Without this, temporary AIAgent instances (memory flush, /compress) fall
@@ -283,12 +265,19 @@ def _resolve_gateway_model(config: dict | None = None) -> str:
when the active provider is openai-codex.
"""
model = os.getenv("HERMES_MODEL") or os.getenv("LLM_MODEL") or "anthropic/claude-opus-4.6"
cfg = config if config is not None else _load_gateway_config()
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, str):
model = model_cfg
elif isinstance(model_cfg, dict):
model = model_cfg.get("default", model)
try:
import yaml as _y
_cfg_path = _hermes_home / "config.yaml"
if _cfg_path.exists():
with open(_cfg_path, encoding="utf-8") as _f:
_cfg = _y.safe_load(_f) or {}
_model_cfg = _cfg.get("model", {})
if isinstance(_model_cfg, str):
model = _model_cfg
elif isinstance(_model_cfg, dict):
model = _model_cfg.get("default", model)
except Exception:
pass
return model
@@ -414,9 +403,6 @@ class GatewayRunner:
# Per-chat voice reply mode: "off" | "voice_only" | "all"
self._voice_mode: Dict[str, str] = self._load_voice_modes()
# Track background tasks to prevent garbage collection mid-execution
self._background_tasks: set = set()
def _get_or_create_gateway_honcho(self, session_key: str):
"""Return a persistent Honcho manager/config pair for this gateway session."""
if not hasattr(self, "_honcho_managers"):
@@ -1301,11 +1287,6 @@ class GatewayRunner:
except Exception as e:
logger.error("%s disconnect error: %s", platform.value, e)
# Cancel any pending background tasks
for _task in list(self._background_tasks):
_task.cancel()
self._background_tasks.clear()
self.adapters.clear()
self._running_agents.clear()
self._pending_messages.clear()
@@ -1716,9 +1697,6 @@ class GatewayRunner:
if canonical == "reasoning":
return await self._handle_reasoning_command(event)
if canonical == "verbose":
return await self._handle_verbose_command(event)
if canonical == "provider":
return await self._handle_provider_command(event)
@@ -2748,11 +2726,9 @@ class GatewayRunner:
try:
old_entry = self.session_store._entries.get(session_key)
if old_entry:
_flush_task = asyncio.create_task(
asyncio.create_task(
self._async_flush_memories(old_entry.session_id, session_key)
)
self._background_tasks.add(_flush_task)
_flush_task.add_done_callback(self._background_tasks.discard)
except Exception as e:
logger.debug("Gateway memory flush on reset failed: %s", e)
@@ -3565,11 +3541,9 @@ class GatewayRunner:
task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{os.urandom(3).hex()}"
# Fire-and-forget the background task
_task = asyncio.create_task(
asyncio.create_task(
self._run_background_task(prompt, source, task_id)
)
self._background_tasks.add(_task)
_task.add_done_callback(self._background_tasks.discard)
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")
return f'🔄 Background task started: "{preview}"\nTask ID: {task_id}\nYou can keep chatting — results will appear when done.'
@@ -3597,12 +3571,52 @@ class GatewayRunner:
)
return
user_config = _load_gateway_config()
model = _resolve_gateway_model(user_config)
platform_key = _platform_config_key(source.platform)
# Read model from config via shared helper
model = _resolve_gateway_model()
from hermes_cli.tools_config import _get_platform_tools
enabled_toolsets = sorted(_get_platform_tools(user_config, platform_key))
# Determine toolset (same logic as _run_agent)
default_toolset_map = {
Platform.LOCAL: "hermes-cli",
Platform.TELEGRAM: "hermes-telegram",
Platform.DISCORD: "hermes-discord",
Platform.WHATSAPP: "hermes-whatsapp",
Platform.SLACK: "hermes-slack",
Platform.SIGNAL: "hermes-signal",
Platform.HOMEASSISTANT: "hermes-homeassistant",
Platform.EMAIL: "hermes-email",
Platform.DINGTALK: "hermes-dingtalk",
}
platform_toolsets_config = {}
try:
config_path = _hermes_home / 'config.yaml'
if config_path.exists():
import yaml
with open(config_path, 'r', encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
platform_toolsets_config = user_config.get("platform_toolsets", {})
except Exception:
pass
platform_config_key = {
Platform.LOCAL: "cli",
Platform.TELEGRAM: "telegram",
Platform.DISCORD: "discord",
Platform.WHATSAPP: "whatsapp",
Platform.SLACK: "slack",
Platform.SIGNAL: "signal",
Platform.HOMEASSISTANT: "homeassistant",
Platform.EMAIL: "email",
Platform.DINGTALK: "dingtalk",
}.get(source.platform, "telegram")
config_toolsets = platform_toolsets_config.get(platform_config_key)
if config_toolsets and isinstance(config_toolsets, list):
enabled_toolsets = config_toolsets
else:
default_toolset = default_toolset_map.get(source.platform, "hermes-telegram")
enabled_toolsets = [default_toolset]
platform_key = "cli" if source.platform == Platform.LOCAL else source.platform.value
pr = self._provider_routing
max_iterations = int(os.getenv("HERMES_MAX_ITERATIONS", "90"))
@@ -3787,68 +3801,6 @@ class GatewayRunner:
else:
return f"🧠 ✓ Reasoning effort set to `{effort}` (this session only)"
async def _handle_verbose_command(self, event: MessageEvent) -> str:
"""Handle /verbose command — cycle tool progress display mode.
Gated by ``display.tool_progress_command`` in config.yaml (default off).
When enabled, cycles the tool progress mode through off new all
verbose off, same as the CLI.
"""
import yaml
config_path = _hermes_home / "config.yaml"
# --- check config gate ------------------------------------------------
try:
user_config = {}
if config_path.exists():
with open(config_path, encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
gate_enabled = user_config.get("display", {}).get("tool_progress_command", False)
except Exception:
gate_enabled = False
if not gate_enabled:
return (
"The `/verbose` command is not enabled for messaging platforms.\n\n"
"Enable it in `config.yaml`:\n```yaml\n"
"display:\n tool_progress_command: true\n```"
)
# --- cycle mode -------------------------------------------------------
cycle = ["off", "new", "all", "verbose"]
descriptions = {
"off": "⚙️ Tool progress: **OFF** — no tool activity shown.",
"new": "⚙️ Tool progress: **NEW** — shown when tool changes.",
"all": "⚙️ Tool progress: **ALL** — every tool call shown.",
"verbose": "⚙️ Tool progress: **VERBOSE** — full args and results.",
}
raw_progress = user_config.get("display", {}).get("tool_progress", "all")
# YAML 1.1 parses bare "off" as boolean False — normalise back
if raw_progress is False:
current = "off"
elif raw_progress is True:
current = "all"
else:
current = str(raw_progress).lower()
if current not in cycle:
current = "all"
idx = (cycle.index(current) + 1) % len(cycle)
new_mode = cycle[idx]
# Save to config.yaml
try:
if "display" not in user_config or not isinstance(user_config.get("display"), dict):
user_config["display"] = {}
user_config["display"]["tool_progress"] = new_mode
with open(config_path, "w", encoding="utf-8") as f:
yaml.dump(user_config, f, default_flow_style=False, sort_keys=False)
return f"{descriptions[new_mode]}\n_(saved to config — takes effect on next message)_"
except Exception as e:
logger.warning("Failed to save tool_progress mode: %s", e)
return f"{descriptions[new_mode]}\n_(could not save to config: {e})_"
async def _handle_compress_command(self, event: MessageEvent) -> str:
"""Handle /compress command -- manually compress conversation context."""
source = event.source
@@ -4006,11 +3958,9 @@ class GatewayRunner:
# Flush memories for current session before switching
try:
_flush_task = asyncio.create_task(
asyncio.create_task(
self._async_flush_memories(current_entry.session_id, session_key)
)
self._background_tasks.add(_flush_task)
_flush_task.add_done_callback(self._background_tasks.discard)
except Exception as e:
logger.debug("Memory flush on resume failed: %s", e)
@@ -4734,18 +4684,10 @@ class GatewayRunner:
prompt cache hits.
"""
import hashlib, json as _j
# Fingerprint the FULL credential string instead of using a short
# prefix. OAuth/JWT-style tokens frequently share a common prefix
# (e.g. "eyJhbGci"), which can cause false cache hits across auth
# switches if only the first few characters are considered.
_api_key = str(runtime.get("api_key", "") or "")
_api_key_fingerprint = hashlib.sha256(_api_key.encode()).hexdigest() if _api_key else ""
blob = _j.dumps(
[
model,
_api_key_fingerprint,
runtime.get("api_key", "")[:8], # first 8 chars only
runtime.get("base_url", ""),
runtime.get("provider", ""),
runtime.get("api_mode", ""),
@@ -4792,16 +4734,67 @@ class GatewayRunner:
from run_agent import AIAgent
import queue
user_config = _load_gateway_config()
platform_key = _platform_config_key(source.platform)
# Determine toolset based on platform.
# Check config.yaml for per-platform overrides, fallback to hardcoded defaults.
default_toolset_map = {
Platform.LOCAL: "hermes-cli",
Platform.TELEGRAM: "hermes-telegram",
Platform.DISCORD: "hermes-discord",
Platform.WHATSAPP: "hermes-whatsapp",
Platform.SLACK: "hermes-slack",
Platform.SIGNAL: "hermes-signal",
Platform.HOMEASSISTANT: "hermes-homeassistant",
Platform.EMAIL: "hermes-email",
Platform.DINGTALK: "hermes-dingtalk",
}
from hermes_cli.tools_config import _get_platform_tools
enabled_toolsets = sorted(_get_platform_tools(user_config, platform_key))
# Try to load platform_toolsets from config
platform_toolsets_config = {}
try:
config_path = _hermes_home / 'config.yaml'
if config_path.exists():
import yaml
with open(config_path, 'r', encoding="utf-8") as f:
user_config = yaml.safe_load(f) or {}
platform_toolsets_config = user_config.get("platform_toolsets", {})
except Exception as e:
logger.debug("Could not load platform_toolsets config: %s", e)
# Map platform enum to config key
platform_config_key = {
Platform.LOCAL: "cli",
Platform.TELEGRAM: "telegram",
Platform.DISCORD: "discord",
Platform.WHATSAPP: "whatsapp",
Platform.SLACK: "slack",
Platform.SIGNAL: "signal",
Platform.HOMEASSISTANT: "homeassistant",
Platform.EMAIL: "email",
Platform.DINGTALK: "dingtalk",
}.get(source.platform, "telegram")
# Use config override if present (list of toolsets), otherwise hardcoded default
config_toolsets = platform_toolsets_config.get(platform_config_key)
if config_toolsets and isinstance(config_toolsets, list):
enabled_toolsets = config_toolsets
else:
default_toolset = default_toolset_map.get(source.platform, "hermes-telegram")
enabled_toolsets = [default_toolset]
# Tool progress mode from config.yaml: "all", "new", "verbose", "off"
# Falls back to env vars for backward compatibility
_progress_cfg = {}
try:
_tp_cfg_path = _hermes_home / "config.yaml"
if _tp_cfg_path.exists():
import yaml as _tp_yaml
with open(_tp_cfg_path, encoding="utf-8") as _tp_f:
_tp_data = _tp_yaml.safe_load(_tp_f) or {}
_progress_cfg = _tp_data.get("display", {})
except Exception:
pass
progress_mode = (
user_config.get("display", {}).get("tool_progress")
_progress_cfg.get("tool_progress")
or os.getenv("HERMES_TOOL_PROGRESS_MODE")
or "all"
)
@@ -5024,7 +5017,7 @@ class GatewayRunner:
except Exception:
pass
model = _resolve_gateway_model(user_config)
model = _resolve_gateway_model()
try:
runtime_kwargs = _resolve_runtime_agent_kwargs()
+23 -39
View File
@@ -974,51 +974,35 @@ class SessionStore:
def load_transcript(self, session_id: str) -> List[Dict[str, Any]]:
"""Load all messages from a session's transcript."""
db_messages = []
# Try SQLite first
if self._db:
try:
db_messages = self._db.get_messages_as_conversation(session_id)
messages = self._db.get_messages_as_conversation(session_id)
if messages:
return messages
except Exception as e:
logger.debug("Could not load messages from DB: %s", e)
# Load legacy JSONL transcript (may contain more history than SQLite
# for sessions created before the DB layer was introduced).
# Fall back to legacy JSONL
transcript_path = self.get_transcript_path(session_id)
jsonl_messages = []
if transcript_path.exists():
with open(transcript_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
jsonl_messages.append(json.loads(line))
except json.JSONDecodeError:
logger.warning(
"Skipping corrupt line in transcript %s: %s",
session_id, line[:120],
)
# Prefer whichever source has more messages.
#
# Background: when a session pre-dates SQLite storage (or when the DB
# layer was added while a long-lived session was already active), the
# first post-migration turn writes only the *new* messages to SQLite
# (because _flush_messages_to_session_db skips messages already in
# conversation_history, assuming they're persisted). On the *next*
# turn load_transcript returns those few SQLite rows and ignores the
# full JSONL history — the model sees a context of 1-4 messages instead
# of hundreds. Using the longer source prevents this silent truncation.
if len(jsonl_messages) > len(db_messages):
if db_messages:
logger.debug(
"Session %s: JSONL has %d messages vs SQLite %d"
"using JSONL (legacy session not yet fully migrated)",
session_id, len(jsonl_messages), len(db_messages),
)
return jsonl_messages
return db_messages
if not transcript_path.exists():
return []
messages = []
with open(transcript_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
messages.append(json.loads(line))
except json.JSONDecodeError:
logger.warning(
"Skipping corrupt line in transcript %s: %s",
session_id, line[:120],
)
return messages
def build_session_context(
+7 -67
View File
@@ -36,7 +36,6 @@ class CommandDef:
subcommands: tuple[str, ...] = () # tab-completable subcommands
cli_only: bool = False # only available in CLI
gateway_only: bool = False # only available in gateway/messaging
gateway_config_gate: str | None = None # config dotpath; when truthy, overrides cli_only for gateway
# ---------------------------------------------------------------------------
@@ -88,8 +87,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
CommandDef("statusbar", "Toggle the context/model status bar", "Configuration",
cli_only=True, aliases=("sb",)),
CommandDef("verbose", "Cycle tool progress display: off -> new -> all -> verbose",
"Configuration", cli_only=True,
gateway_config_gate="display.tool_progress_command"),
"Configuration", cli_only=True),
CommandDef("reasoning", "Manage reasoning effort and display", "Configuration",
args_hint="[level|show|hide]",
subcommands=("none", "low", "minimal", "medium", "high", "xhigh", "show", "hide", "on", "off")),
@@ -207,7 +205,7 @@ def rebuild_lookups() -> None:
GATEWAY_KNOWN_COMMANDS = frozenset(
name
for cmd in COMMAND_REGISTRY
if not cmd.cli_only or cmd.gateway_config_gate
if not cmd.cli_only
for name in (cmd.name, *cmd.aliases)
)
@@ -261,76 +259,20 @@ for _cmd in COMMAND_REGISTRY:
# Gateway helpers
# ---------------------------------------------------------------------------
# Set of all command names + aliases recognized by the gateway.
# Includes config-gated commands so the gateway can dispatch them
# (the handler checks the config gate at runtime).
# Set of all command names + aliases recognized by the gateway
GATEWAY_KNOWN_COMMANDS: frozenset[str] = frozenset(
name
for cmd in COMMAND_REGISTRY
if not cmd.cli_only or cmd.gateway_config_gate
if not cmd.cli_only
for name in (cmd.name, *cmd.aliases)
)
def _resolve_config_gates() -> set[str]:
"""Return canonical names of commands whose ``gateway_config_gate`` is truthy.
Reads ``config.yaml`` and walks the dot-separated key path for each
config-gated command. Returns an empty set on any error so callers
degrade gracefully.
"""
gated = [c for c in COMMAND_REGISTRY if c.gateway_config_gate]
if not gated:
return set()
try:
import yaml
config_path = os.path.join(
os.getenv("HERMES_HOME", os.path.expanduser("~/.hermes")),
"config.yaml",
)
if os.path.exists(config_path):
with open(config_path, encoding="utf-8") as f:
cfg = yaml.safe_load(f) or {}
else:
cfg = {}
except Exception:
return set()
result: set[str] = set()
for cmd in gated:
val: Any = cfg
for key in cmd.gateway_config_gate.split("."):
if isinstance(val, dict):
val = val.get(key)
else:
val = None
break
if val:
result.add(cmd.name)
return result
def _is_gateway_available(cmd: CommandDef, config_overrides: set[str] | None = None) -> bool:
"""Check if *cmd* should appear in gateway surfaces (help, menus, mappings).
Unconditionally available when ``cli_only`` is False. When ``cli_only``
is True but ``gateway_config_gate`` is set, the command is available only
when the config value is truthy. Pass *config_overrides* (from
``_resolve_config_gates()``) to avoid re-reading config for every command.
"""
if not cmd.cli_only:
return True
if cmd.gateway_config_gate:
overrides = config_overrides if config_overrides is not None else _resolve_config_gates()
return cmd.name in overrides
return False
def gateway_help_lines() -> list[str]:
"""Generate gateway help text lines from the registry."""
overrides = _resolve_config_gates()
lines: list[str] = []
for cmd in COMMAND_REGISTRY:
if not _is_gateway_available(cmd, overrides):
if cmd.cli_only:
continue
args = f" {cmd.args_hint}" if cmd.args_hint else ""
alias_parts: list[str] = []
@@ -351,10 +293,9 @@ def telegram_bot_commands() -> list[tuple[str, str]]:
underscores. Aliases are skipped -- Telegram shows one menu entry per
canonical command.
"""
overrides = _resolve_config_gates()
result: list[tuple[str, str]] = []
for cmd in COMMAND_REGISTRY:
if not _is_gateway_available(cmd, overrides):
if cmd.cli_only:
continue
tg_name = cmd.name.replace("-", "_")
result.append((tg_name, cmd.description))
@@ -367,10 +308,9 @@ def slack_subcommand_map() -> dict[str, str]:
Maps both canonical names and aliases so /hermes bg do stuff works
the same as /hermes background do stuff.
"""
overrides = _resolve_config_gates()
mapping: dict[str, str] = {}
for cmd in COMMAND_REGISTRY:
if not _is_gateway_available(cmd, overrides):
if cmd.cli_only:
continue
mapping[cmd.name] = f"/{cmd.name}"
for alias in cmd.aliases:
-1
View File
@@ -269,7 +269,6 @@ DEFAULT_CONFIG = {
"streaming": False,
"show_cost": False, # Show $ cost in the status bar (off by default)
"skin": "default",
"tool_progress_command": False, # Enable /verbose command in messaging gateway
},
# Privacy settings
+2 -22
View File
@@ -513,10 +513,6 @@ def cmd_chat(args):
if getattr(args, "yolo", False):
os.environ["HERMES_YOLO_MODE"] = "1"
# --source: tag session source for filtering (e.g. 'tool' for third-party integrations)
if getattr(args, "source", None):
os.environ["HERMES_SESSION_SOURCE"] = args.source
# Import and run the CLI
from cli import main as cli_main
@@ -2387,12 +2383,6 @@ def _update_via_zip(args):
print("→ Extracting...")
with zipfile.ZipFile(zip_path, 'r') as zf:
# Validate paths to prevent zip-slip (path traversal)
tmp_dir_real = os.path.realpath(tmp_dir)
for member in zf.infolist():
member_path = os.path.realpath(os.path.join(tmp_dir, member.filename))
if not member_path.startswith(tmp_dir_real + os.sep) and member_path != tmp_dir_real:
raise ValueError(f"Zip-slip detected: {member.filename} escapes extraction directory")
zf.extractall(tmp_dir)
# GitHub ZIPs extract to hermes-agent-<branch>/
@@ -3174,11 +3164,6 @@ For more help on a command:
default=False,
help="Include the session ID in the agent's system prompt"
)
chat_parser.add_argument(
"--source",
default=None,
help="Session source tag for filtering (default: cli). Use 'tool' for third-party integrations that should not appear in user session lists."
)
chat_parser.set_defaults(func=cmd_chat)
# =========================================================================
@@ -3877,12 +3862,8 @@ For more help on a command:
action = args.sessions_action
# Hide third-party tool sessions by default, but honour explicit --source
_source = getattr(args, "source", None)
_exclude = None if _source else ["tool"]
if action == "list":
sessions = db.list_sessions_rich(source=args.source, exclude_sources=_exclude, limit=args.limit)
sessions = db.list_sessions_rich(source=args.source, limit=args.limit)
if not sessions:
print("No sessions found.")
return
@@ -3965,8 +3946,7 @@ For more help on a command:
elif action == "browse":
limit = getattr(args, "limit", 50) or 50
source = getattr(args, "source", None)
_browse_exclude = None if source else ["tool"]
sessions = db.list_sessions_rich(source=source, exclude_sources=_browse_exclude, limit=limit)
sessions = db.list_sessions_rich(source=source, limit=limit)
db.close()
if not sessions:
print("No sessions found.")
+6 -23
View File
@@ -53,29 +53,12 @@ OPENROUTER_MODELS: list[tuple[str, str]] = [
_PROVIDER_MODELS: dict[str, list[str]] = {
"nous": [
"anthropic/claude-opus-4.6",
"anthropic/claude-sonnet-4.5",
"anthropic/claude-haiku-4.5",
"openai/gpt-5.4",
"openai/gpt-5.4-mini",
"xiaomi/mimo-v2-pro",
"openai/gpt-5.3-codex",
"google/gemini-3-pro-preview",
"google/gemini-3-flash-preview",
"qwen/qwen3.5-plus-02-15",
"qwen/qwen3.5-35b-a3b",
"stepfun/step-3.5-flash",
"minimax/minimax-m2.7",
"minimax/minimax-m2.5",
"z-ai/glm-5",
"z-ai/glm-5-turbo",
"moonshotai/kimi-k2.5",
"x-ai/grok-4.20-beta",
"nvidia/nemotron-3-super-120b-a12b",
"nvidia/nemotron-3-super-120b-a12b:free",
"arcee-ai/trinity-large-preview:free",
"openai/gpt-5.4-pro",
"openai/gpt-5.4-nano",
"claude-opus-4-6",
"claude-sonnet-4-6",
"gpt-5.4",
"gemini-3-flash",
"gemini-3.0-pro-preview",
"deepseek-v3.2",
],
"openai-codex": [
"gpt-5.3-codex",
+1 -2
View File
@@ -357,8 +357,7 @@ def do_install(identifier: str, category: str = "", force: bool = False,
# Scan
c.print("[bold]Running security scan...[/]")
scan_source = getattr(bundle, "identifier", "") or getattr(meta, "identifier", "") or identifier
result = scan_skill(q_path, source=scan_source)
result = scan_skill(q_path, source=identifier)
c.print(format_scan_report(result))
# Check install policy
+10 -65
View File
@@ -131,10 +131,8 @@ PLATFORMS = {
"slack": {"label": "💼 Slack", "default_toolset": "hermes-slack"},
"whatsapp": {"label": "📱 WhatsApp", "default_toolset": "hermes-whatsapp"},
"signal": {"label": "📡 Signal", "default_toolset": "hermes-signal"},
"homeassistant": {"label": "🏠 Home Assistant", "default_toolset": "hermes-homeassistant"},
"email": {"label": "📧 Email", "default_toolset": "hermes-email"},
"dingtalk": {"label": "💬 DingTalk", "default_toolset": "hermes-dingtalk"},
"api_server": {"label": "🌐 API Server", "default_toolset": "hermes-api-server"},
}
@@ -380,29 +378,7 @@ def _platform_toolset_summary(config: dict, platforms: Optional[List[str]] = Non
return summary
def _parse_enabled_flag(value, default: bool = True) -> bool:
"""Parse bool-like config values used by tool/platform settings."""
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, int):
return value != 0
if isinstance(value, str):
lowered = value.strip().lower()
if lowered in {"true", "1", "yes", "on"}:
return True
if lowered in {"false", "0", "no", "off"}:
return False
return default
def _get_platform_tools(
config: dict,
platform: str,
*,
include_default_mcp_servers: bool = True,
) -> Set[str]:
def _get_platform_tools(config: dict, platform: str) -> Set[str]:
"""Resolve which individual toolset names are enabled for a platform."""
from toolsets import resolve_toolset
@@ -454,37 +430,6 @@ def _get_platform_tools(
enabled_toolsets.add(pts)
# else: known but not in config = user disabled it
# Preserve any explicit non-configurable toolset entries (for example,
# custom toolsets or MCP server names saved in platform_toolsets).
platform_default_keys = {p["default_toolset"] for p in PLATFORMS.values()}
explicit_passthrough = {
ts
for ts in toolset_names
if ts not in configurable_keys
and ts not in plugin_ts_keys
and ts not in platform_default_keys
}
# MCP servers are expected to be available on all platforms by default.
# If the platform explicitly lists one or more MCP server names, treat that
# as an allowlist. Otherwise include every globally enabled MCP server.
mcp_servers = config.get("mcp_servers", {})
enabled_mcp_servers = {
name
for name, server_cfg in mcp_servers.items()
if isinstance(server_cfg, dict)
and _parse_enabled_flag(server_cfg.get("enabled", True), default=True)
}
explicit_mcp_servers = explicit_passthrough & enabled_mcp_servers
enabled_toolsets.update(explicit_passthrough - enabled_mcp_servers)
if include_default_mcp_servers:
if explicit_mcp_servers:
enabled_toolsets.update(explicit_mcp_servers)
else:
enabled_toolsets.update(enabled_mcp_servers)
else:
enabled_toolsets.update(explicit_mcp_servers)
return enabled_toolsets
@@ -1077,7 +1022,7 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
if first_install:
for pkey in enabled_platforms:
pinfo = PLATFORMS[pkey]
current_enabled = _get_platform_tools(config, pkey, include_default_mcp_servers=False)
current_enabled = _get_platform_tools(config, pkey)
# Uncheck toolsets that should be off by default
checklist_preselected = current_enabled - _DEFAULT_OFF_TOOLSETS
@@ -1129,7 +1074,7 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
platform_keys = []
for pkey in enabled_platforms:
pinfo = PLATFORMS[pkey]
current = _get_platform_tools(config, pkey, include_default_mcp_servers=False)
current = _get_platform_tools(config, pkey)
count = len(current)
total = len(_get_effective_configurable_toolsets())
platform_choices.append(f"Configure {pinfo['label']} ({count}/{total} enabled)")
@@ -1176,11 +1121,11 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
# Use the union of all platforms' current tools as the starting state
all_current = set()
for pk in platform_keys:
all_current |= _get_platform_tools(config, pk, include_default_mcp_servers=False)
all_current |= _get_platform_tools(config, pk)
new_enabled = _prompt_toolset_checklist("All platforms", all_current)
if new_enabled != all_current:
for pk in platform_keys:
prev = _get_platform_tools(config, pk, include_default_mcp_servers=False)
prev = _get_platform_tools(config, pk)
added = new_enabled - prev
removed = prev - new_enabled
pinfo_inner = PLATFORMS[pk]
@@ -1202,7 +1147,7 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
print(color(" ✓ Saved configuration for all platforms", Colors.GREEN))
# Update choice labels
for ci, pk in enumerate(platform_keys):
new_count = len(_get_platform_tools(config, pk, include_default_mcp_servers=False))
new_count = len(_get_platform_tools(config, pk))
total = len(_get_effective_configurable_toolsets())
platform_choices[ci] = f"Configure {PLATFORMS[pk]['label']} ({new_count}/{total} enabled)"
else:
@@ -1214,7 +1159,7 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
pinfo = PLATFORMS[pkey]
# Get current enabled toolsets for this platform
current_enabled = _get_platform_tools(config, pkey, include_default_mcp_servers=False)
current_enabled = _get_platform_tools(config, pkey)
# Show checklist
new_enabled = _prompt_toolset_checklist(pinfo["label"], current_enabled)
@@ -1247,7 +1192,7 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
print()
# Update the choice label with new count
new_count = len(_get_platform_tools(config, pkey, include_default_mcp_servers=False))
new_count = len(_get_platform_tools(config, pkey))
total = len(_get_effective_configurable_toolsets())
platform_choices[idx] = f"Configure {pinfo['label']} ({new_count}/{total} enabled)"
@@ -1393,7 +1338,7 @@ def _configure_mcp_tools_interactive(config: dict):
def _apply_toolset_change(config: dict, platform: str, toolset_names: List[str], action: str):
"""Add or remove built-in toolsets for a platform."""
enabled = _get_platform_tools(config, platform, include_default_mcp_servers=False)
enabled = _get_platform_tools(config, platform)
if action == "disable":
updated = enabled - set(toolset_names)
else:
@@ -1479,7 +1424,7 @@ def tools_disable_enable_command(args):
return
if action == "list":
_print_tools_list(_get_platform_tools(config, platform, include_default_mcp_servers=False),
_print_tools_list(_get_platform_tools(config, platform),
config.get("mcp_servers") or {}, platform)
return
+11 -55
View File
@@ -124,10 +124,7 @@ class SessionDB:
self._conn = sqlite3.connect(
str(self.db_path),
check_same_thread=False,
# 30s gives the WAL writer (CLI or gateway) time to finish a batch
# flush before the concurrent reader/writer gives up. 10s was too
# short when the CLI is doing frequent memory flushes.
timeout=30.0,
timeout=10.0,
)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
@@ -258,7 +255,7 @@ class SessionDB:
"""Create a new session record. Returns the session_id."""
with self._lock:
self._conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
"""INSERT INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
@@ -354,27 +351,6 @@ class SessionDB:
)
self._conn.commit()
def ensure_session(
self,
session_id: str,
source: str = "unknown",
model: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
"""
with self._lock:
self._conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
)
self._conn.commit()
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get a session by ID."""
with self._lock:
@@ -572,7 +548,6 @@ class SessionDB:
def list_sessions_rich(
self,
source: str = None,
exclude_sources: List[str] = None,
limit: int = 20,
offset: int = 0,
) -> List[Dict[str, Any]]:
@@ -584,18 +559,7 @@ class SessionDB:
Uses a single query with correlated subqueries instead of N+2 queries.
"""
where_clauses = []
params = []
if source:
where_clauses.append("s.source = ?")
params.append(source)
if exclude_sources:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
source_clause = "WHERE s.source = ?" if source else ""
query = f"""
SELECT s.*,
COALESCE(
@@ -610,11 +574,11 @@ class SessionDB:
s.started_at
) AS last_active
FROM sessions s
{where_sql}
{source_clause}
ORDER BY s.started_at DESC
LIMIT ? OFFSET ?
"""
params.extend([limit, offset])
params = (source, limit, offset) if source else (limit, offset)
with self._lock:
cursor = self._conn.execute(query, params)
rows = cursor.fetchall()
@@ -830,7 +794,6 @@ class SessionDB:
self,
query: str,
source_filter: List[str] = None,
exclude_sources: List[str] = None,
role_filter: List[str] = None,
limit: int = 20,
offset: int = 0,
@@ -863,11 +826,6 @@ class SessionDB:
where_clauses.append(f"s.source IN ({source_placeholders})")
params.extend(source_filter)
if exclude_sources is not None:
exclude_placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({exclude_placeholders})")
params.extend(exclude_sources)
if role_filter:
role_placeholders = ",".join("?" for _ in role_filter)
where_clauses.append(f"m.role IN ({role_placeholders})")
@@ -904,11 +862,9 @@ class SessionDB:
return []
matches = [dict(row) for row in cursor.fetchall()]
# Add surrounding context (1 message before + after each match).
# Done outside the lock so we don't hold it across N sequential queries.
for match in matches:
try:
with self._lock:
# Add surrounding context (1 message before + after each match)
for match in matches:
try:
ctx_cursor = self._conn.execute(
"""SELECT role, content FROM messages
WHERE session_id = ? AND id >= ? - 1 AND id <= ? + 1
@@ -919,9 +875,9 @@ class SessionDB:
{"role": r["role"], "content": (r["content"] or "")[:200]}
for r in ctx_cursor.fetchall()
]
match["context"] = context_msgs
except Exception:
match["context"] = []
match["context"] = context_msgs
except Exception:
match["context"] = []
# Remove full content from result (snippet is enough, saves tokens)
for match in matches:
+20 -62
View File
@@ -77,7 +77,7 @@ from agent.prompt_builder import (
)
from agent.model_metadata import (
fetch_model_metadata,
estimate_tokens_rough, estimate_messages_tokens_rough, estimate_request_tokens_rough,
estimate_tokens_rough, estimate_messages_tokens_rough,
get_next_probe_tier, parse_context_limit_from_error,
save_context_length,
)
@@ -883,7 +883,7 @@ class AIAgent:
try:
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
source=self.platform or "cli",
model=self.model,
model_config={
"max_iterations": self.max_iterations,
@@ -893,15 +893,8 @@ class AIAgent:
user_id=None,
)
except Exception as e:
# Transient SQLite lock contention (e.g. CLI and gateway writing
# concurrently) must NOT permanently disable session_search for
# this agent. Keep _session_db alive — subsequent message
# flushes and session_search calls will still work once the
# lock clears. The session row may be missing from the index
# for this run, but that is recoverable (flushes upsert rows).
logger.warning(
"Session DB create_session failed (session_search still available): %s", e
)
logger.warning("Session DB create_session failed — messages will NOT be indexed: %s", e)
self._session_db = None # prevent silent data loss on every subsequent flush
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore
@@ -1140,7 +1133,6 @@ class AIAgent:
self.context_compressor.last_total_tokens = 0
self.context_compressor.compression_count = 0
self.context_compressor._context_probed = False
self.context_compressor._context_probe_persistable = False
# Iterative summary from previous session must not bleed into new one (#2635)
self.context_compressor._previous_summary = None
@@ -1585,14 +1577,6 @@ class AIAgent:
return
self._apply_persist_user_message_override(messages)
try:
# If create_session() failed at startup (e.g. transient lock), the
# session row may not exist yet. ensure_session() uses INSERT OR
# IGNORE so it is a no-op when the row is already there.
self._session_db.ensure_session(
self.session_id,
source=self.platform or "cli",
model=self.model,
)
start_idx = len(conversation_history) if conversation_history else 0
flush_from = max(start_idx, self._last_flushed_db_idx)
for msg in messages[flush_from:]:
@@ -2274,7 +2258,7 @@ class AIAgent:
return
try:
manager.flush_all()
except (Exception, KeyboardInterrupt) as exc:
except Exception as exc:
logger.debug("Honcho flush on exit failed (non-fatal): %s", exc)
atexit.register(_flush_honcho_on_exit)
@@ -4859,7 +4843,7 @@ class AIAgent:
self.session_id = f"{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:6]}"
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
source=self.platform or "cli",
model=self.model,
parent_session_id=old_session_id,
)
@@ -5836,13 +5820,9 @@ class AIAgent:
and len(messages) > self.context_compressor.protect_first_n
+ self.context_compressor.protect_last_n + 1
):
# Include tool schema tokens — with many tools these can add
# 20-30K+ tokens that the old sys+msg estimate missed entirely.
_preflight_tokens = estimate_request_tokens_rough(
messages,
system_prompt=active_system_prompt or "",
tools=self.tools or None,
)
_sys_tok_est = estimate_tokens_rough(active_system_prompt or "")
_msg_tok_est = estimate_messages_tokens_rough(messages)
_preflight_tokens = _sys_tok_est + _msg_tok_est
if _preflight_tokens >= self.context_compressor.threshold_tokens:
logger.info(
@@ -5868,11 +5848,9 @@ class AIAgent:
if len(messages) >= _orig_len:
break # Cannot compress further
# Re-estimate after compression
_preflight_tokens = estimate_request_tokens_rough(
messages,
system_prompt=active_system_prompt or "",
tools=self.tools or None,
)
_sys_tok_est = estimate_tokens_rough(active_system_prompt or "")
_msg_tok_est = estimate_messages_tokens_rough(messages)
_preflight_tokens = _sys_tok_est + _msg_tok_est
if _preflight_tokens < self.context_compressor.threshold_tokens:
break # Under threshold
@@ -6335,16 +6313,12 @@ class AIAgent:
}
self.context_compressor.update_from_response(usage_dict)
# Cache discovered context length after successful call.
# Only persist limits confirmed by the provider (parsed
# from the error message), not guessed probe tiers.
# Cache discovered context length after successful call
if self.context_compressor._context_probed:
ctx = self.context_compressor.context_length
if getattr(self.context_compressor, "_context_probe_persistable", False):
save_context_length(self.model, self.base_url, ctx)
self._safe_print(f"{self.log_prefix}💾 Cached context length: {ctx:,} tokens for {self.model}")
save_context_length(self.model, self.base_url, ctx)
self._safe_print(f"{self.log_prefix}💾 Cached context length: {ctx:,} tokens for {self.model}")
self.context_compressor._context_probed = False
self.context_compressor._context_probe_persistable = False
self.session_prompt_tokens += prompt_tokens
self.session_completion_tokens += completion_tokens
@@ -6645,14 +6619,6 @@ class AIAgent:
compressor.context_length = new_ctx
compressor.threshold_tokens = int(new_ctx * compressor.threshold_percent)
compressor._context_probed = True
# Only persist limits parsed from the provider's
# error message (a real number). Guessed fallback
# tiers from get_next_probe_tier() should stay
# in-memory only — persisting them pollutes the
# cache with wrong values.
compressor._context_probe_persistable = bool(
parsed_limit and parsed_limit == new_ctx
)
self._vprint(f"{self.log_prefix}⚠️ Context length exceeded — stepping down: {old_ctx:,}{new_ctx:,} tokens", force=True)
else:
self._vprint(f"{self.log_prefix}⚠️ Context length exceeded at minimum tier — attempting compression...", force=True)
@@ -7131,19 +7097,11 @@ class AIAgent:
turn_content = assistant_message.content or ""
if turn_content and self._has_content_after_think_block(turn_content):
self._last_content_with_tools = turn_content
# Only mute subsequent output when EVERY tool call in
# this turn is post-response housekeeping (memory, todo,
# skill_manage, etc.). If any substantive tool is present
# (search_files, read_file, write_file, terminal, ...),
# keep output visible so the user sees progress.
_HOUSEKEEPING_TOOLS = frozenset({
"memory", "todo", "skill_manage", "session_search",
})
_all_housekeeping = all(
tc.function.name in _HOUSEKEEPING_TOOLS
for tc in assistant_message.tool_calls
)
if _all_housekeeping and self._has_stream_consumers():
# The response was already streamed to the user in the
# response box. The remaining tool calls (memory, skill,
# todo, etc.) are post-response housekeeping — mute all
# subsequent CLI output so they run invisibly.
if self._has_stream_consumers():
self._mute_post_response = True
elif self.quiet_mode:
clean = self._strip_think_blocks(turn_content).strip()
+3 -3
View File
@@ -461,7 +461,7 @@ class TestGetTextAuxiliaryClient:
patch("agent.auxiliary_client.OpenAI") as mock_openai:
mock_nous.return_value = {"access_token": "nous-tok"}
client, model = get_text_auxiliary_client()
assert model == "google/gemini-3-flash-preview"
assert model == "gemini-3-flash"
def test_custom_endpoint_over_codex(self, monkeypatch, codex_auth_dir):
monkeypatch.setenv("OPENAI_BASE_URL", "http://localhost:1234/v1")
@@ -693,7 +693,7 @@ class TestVisionClientFallback:
patch("agent.auxiliary_client.OpenAI"):
mock_nous.return_value = {"access_token": "nous-tok"}
client, model = get_vision_auxiliary_client()
assert model == "google/gemini-3-flash-preview"
assert model == "gemini-3-flash"
assert client is not None
def test_vision_forced_main_uses_custom_endpoint(self, monkeypatch):
@@ -789,7 +789,7 @@ class TestResolveForcedProvider:
patch("agent.auxiliary_client.OpenAI"):
mock_nous.return_value = {"access_token": "nous-tok"}
client, model = _resolve_forced_provider("nous")
assert model == "google/gemini-3-flash-preview"
assert model == "gemini-3-flash"
assert client is not None
def test_forced_nous_not_configured(self, monkeypatch):
-22
View File
@@ -48,28 +48,6 @@ class TestAgentConfigSignature:
sig2 = GatewayRunner._agent_config_signature("claude-opus-4.6", runtime, ["hermes-telegram"], "")
assert sig1 != sig2
def test_same_token_prefix_different_full_token_changes_signature(self):
"""Tokens sharing a JWT-style prefix must not collide."""
from gateway.run import GatewayRunner
rt1 = {
"api_key": "eyJhbGci.token-for-account-a",
"base_url": "https://chatgpt.com/backend-api/codex",
"provider": "openai-codex",
"api_mode": "codex_responses",
}
rt2 = {
"api_key": "eyJhbGci.token-for-account-b",
"base_url": "https://chatgpt.com/backend-api/codex",
"provider": "openai-codex",
"api_mode": "codex_responses",
}
assert rt1["api_key"][:8] == rt2["api_key"][:8]
sig1 = GatewayRunner._agent_config_signature("gpt-5.3-codex", rt1, ["hermes-telegram"], "")
sig2 = GatewayRunner._agent_config_signature("gpt-5.3-codex", rt2, ["hermes-telegram"], "")
assert sig1 != sig2
def test_provider_change_different_signature(self):
from gateway.run import GatewayRunner
-93
View File
@@ -1,93 +0,0 @@
"""Tests for hermes-api-server toolset and API server tool availability."""
import os
import json
from unittest.mock import patch, MagicMock
import pytest
from toolsets import resolve_toolset, get_toolset, validate_toolset
class TestHermesApiServerToolset:
"""Tests for the hermes-api-server toolset definition."""
def test_toolset_exists(self):
ts = get_toolset("hermes-api-server")
assert ts is not None
def test_toolset_validates(self):
assert validate_toolset("hermes-api-server")
def test_toolset_includes_web_tools(self):
tools = resolve_toolset("hermes-api-server")
assert "web_search" in tools
assert "web_extract" in tools
def test_toolset_includes_core_tools(self):
tools = resolve_toolset("hermes-api-server")
expected = [
"terminal", "process",
"read_file", "write_file", "patch", "search_files",
"vision_analyze", "image_generate",
"execute_code", "delegate_task",
"todo", "memory", "session_search", "cronjob",
]
for tool in expected:
assert tool in tools, f"Missing expected tool: {tool}"
def test_toolset_includes_browser_tools(self):
tools = resolve_toolset("hermes-api-server")
for tool in ["browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close"]:
assert tool in tools, f"Missing browser tool: {tool}"
def test_toolset_includes_homeassistant_tools(self):
tools = resolve_toolset("hermes-api-server")
for tool in ["ha_list_entities", "ha_get_state", "ha_list_services", "ha_call_service"]:
assert tool in tools, f"Missing HA tool: {tool}"
def test_toolset_excludes_clarify(self):
tools = resolve_toolset("hermes-api-server")
assert "clarify" not in tools
def test_toolset_excludes_send_message(self):
tools = resolve_toolset("hermes-api-server")
assert "send_message" not in tools
def test_toolset_excludes_text_to_speech(self):
tools = resolve_toolset("hermes-api-server")
assert "text_to_speech" not in tools
class TestApiServerPlatformConfig:
def test_platforms_dict_includes_api_server(self):
from hermes_cli.tools_config import PLATFORMS
assert "api_server" in PLATFORMS
assert PLATFORMS["api_server"]["default_toolset"] == "hermes-api-server"
class TestApiServerAdapterToolset:
@patch("gateway.platforms.api_server.AIOHTTP_AVAILABLE", True)
def test_create_agent_uses_api_server_toolset(self):
from gateway.platforms.api_server import APIServerAdapter
from gateway.config import PlatformConfig
adapter = APIServerAdapter(PlatformConfig())
with patch("gateway.run._resolve_runtime_agent_kwargs") as mock_kwargs, \
patch("gateway.run._resolve_gateway_model") as mock_model, \
patch("run_agent.AIAgent") as mock_agent_cls:
mock_kwargs.return_value = {"api_key": "test-key", "base_url": None,
"provider": None, "api_mode": None,
"command": None, "args": []}
mock_model.return_value = "test/model"
mock_agent_cls.return_value = MagicMock()
adapter._create_agent()
mock_agent_cls.assert_called_once()
call_kwargs = mock_agent_cls.call_args
assert call_kwargs.kwargs.get("enabled_toolsets") == ["hermes-api-server"]
assert call_kwargs.kwargs.get("platform") == "api_server"
-1
View File
@@ -38,7 +38,6 @@ def _make_runner():
runner._provider_routing = {}
runner._fallback_model = None
runner._running_agents = {}
runner._background_tasks = set()
mock_store = MagicMock()
runner.session_store = mock_store
-1
View File
@@ -72,7 +72,6 @@ async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks(
runner._exit_reason = None
runner._pending_messages = {"session": "pending text"}
runner._pending_approvals = {"session": {"command": "rm -rf /tmp/x"}}
runner._background_tasks = set()
runner._shutdown_all_gateway_honcho = lambda: None
adapter = StubAdapter()
-109
View File
@@ -218,112 +218,3 @@ class TestReasoningCommand:
assert result["final_response"] == "ok"
assert _CapturingAgent.last_init is not None
assert _CapturingAgent.last_init["reasoning_config"] == {"enabled": False}
def test_run_agent_includes_enabled_mcp_servers_in_gateway_toolsets(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text(
"platform_toolsets:\n"
" cli: [web, memory]\n"
"mcp_servers:\n"
" exa:\n"
" url: https://mcp.exa.ai/mcp\n"
" web-search-prime:\n"
" url: https://api.z.ai/api/mcp/web_search_prime/mcp\n",
encoding="utf-8",
)
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
monkeypatch.setattr(gateway_run, "_env_path", hermes_home / ".env")
monkeypatch.setattr(gateway_run, "load_dotenv", lambda *args, **kwargs: None)
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "test-key",
},
)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = _CapturingAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
_CapturingAgent.last_init = None
runner = _make_runner()
source = SessionSource(
platform=Platform.LOCAL,
chat_id="cli",
chat_name="CLI",
chat_type="dm",
user_id="user-1",
)
result = asyncio.run(
runner._run_agent(
message="ping",
context_prompt="",
history=[],
source=source,
session_id="session-1",
session_key="agent:main:local:dm",
)
)
assert result["final_response"] == "ok"
assert _CapturingAgent.last_init is not None
enabled_toolsets = set(_CapturingAgent.last_init["enabled_toolsets"])
assert "web" in enabled_toolsets
assert "memory" in enabled_toolsets
assert "exa" in enabled_toolsets
assert "web-search-prime" in enabled_toolsets
def test_run_agent_homeassistant_uses_default_platform_toolset(self, tmp_path, monkeypatch):
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
(hermes_home / "config.yaml").write_text("", encoding="utf-8")
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
monkeypatch.setattr(gateway_run, "_env_path", hermes_home / ".env")
monkeypatch.setattr(gateway_run, "load_dotenv", lambda *args, **kwargs: None)
monkeypatch.setattr(
gateway_run,
"_resolve_runtime_agent_kwargs",
lambda: {
"provider": "openrouter",
"api_mode": "chat_completions",
"base_url": "https://openrouter.ai/api/v1",
"api_key": "test-key",
},
)
fake_run_agent = types.ModuleType("run_agent")
fake_run_agent.AIAgent = _CapturingAgent
monkeypatch.setitem(sys.modules, "run_agent", fake_run_agent)
_CapturingAgent.last_init = None
runner = _make_runner()
source = SessionSource(
platform=Platform.HOMEASSISTANT,
chat_id="ha",
chat_name="Home Assistant",
chat_type="dm",
user_id="user-1",
)
result = asyncio.run(
runner._run_agent(
message="ping",
context_prompt="",
history=[],
source=source,
session_id="session-1",
session_key="agent:main:homeassistant:dm",
)
)
assert result["final_response"] == "ok"
assert _CapturingAgent.last_init is not None
assert "homeassistant" in set(_CapturingAgent.last_init["enabled_toolsets"])
-94
View File
@@ -386,100 +386,6 @@ class TestLoadTranscriptCorruptLines:
assert messages[1]["content"] == "b"
class TestLoadTranscriptPreferLongerSource:
"""Regression: load_transcript must return whichever source (SQLite or JSONL)
has more messages to prevent silent truncation. GH-3212."""
@pytest.fixture()
def store_with_db(self, tmp_path):
"""SessionStore with both SQLite and JSONL active."""
from hermes_state import SessionDB
config = GatewayConfig()
with patch("gateway.session.SessionStore._ensure_loaded"):
s = SessionStore(sessions_dir=tmp_path, config=config)
s._db = SessionDB(db_path=tmp_path / "state.db")
s._loaded = True
return s
def test_jsonl_longer_than_sqlite_returns_jsonl(self, store_with_db):
"""Legacy session: JSONL has full history, SQLite has only recent turn."""
sid = "legacy_session"
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
# JSONL has 10 messages (legacy history — written before SQLite existed)
for i in range(10):
role = "user" if i % 2 == 0 else "assistant"
store_with_db.append_to_transcript(
sid, {"role": role, "content": f"msg-{i}"}, skip_db=True,
)
# SQLite has only 2 messages (recent turn after migration)
store_with_db._db.append_message(session_id=sid, role="user", content="new-q")
store_with_db._db.append_message(session_id=sid, role="assistant", content="new-a")
result = store_with_db.load_transcript(sid)
assert len(result) == 10
assert result[0]["content"] == "msg-0"
def test_sqlite_longer_than_jsonl_returns_sqlite(self, store_with_db):
"""Fully migrated session: SQLite has more (JSONL stopped growing)."""
sid = "migrated_session"
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
# JSONL has 2 old messages
store_with_db.append_to_transcript(
sid, {"role": "user", "content": "old-q"}, skip_db=True,
)
store_with_db.append_to_transcript(
sid, {"role": "assistant", "content": "old-a"}, skip_db=True,
)
# SQLite has 4 messages (superset after migration)
for i in range(4):
role = "user" if i % 2 == 0 else "assistant"
store_with_db._db.append_message(session_id=sid, role=role, content=f"db-{i}")
result = store_with_db.load_transcript(sid)
assert len(result) == 4
assert result[0]["content"] == "db-0"
def test_sqlite_empty_falls_back_to_jsonl(self, store_with_db):
"""No SQLite rows — falls back to JSONL (original behavior preserved)."""
sid = "no_db_rows"
store_with_db.append_to_transcript(
sid, {"role": "user", "content": "hello"}, skip_db=True,
)
store_with_db.append_to_transcript(
sid, {"role": "assistant", "content": "hi"}, skip_db=True,
)
result = store_with_db.load_transcript(sid)
assert len(result) == 2
assert result[0]["content"] == "hello"
def test_both_empty_returns_empty(self, store_with_db):
"""Neither source has data — returns empty list."""
result = store_with_db.load_transcript("nonexistent")
assert result == []
def test_equal_length_prefers_sqlite(self, store_with_db):
"""When both have same count, SQLite wins (has richer fields like reasoning)."""
sid = "equal_session"
store_with_db._db.create_session(session_id=sid, source="gateway", model="m")
# Write 2 messages to JSONL only
store_with_db.append_to_transcript(
sid, {"role": "user", "content": "jsonl-q"}, skip_db=True,
)
store_with_db.append_to_transcript(
sid, {"role": "assistant", "content": "jsonl-a"}, skip_db=True,
)
# Write 2 different messages to SQLite only
store_with_db._db.append_message(session_id=sid, role="user", content="db-q")
store_with_db._db.append_message(session_id=sid, role="assistant", content="db-a")
result = store_with_db.load_transcript(sid)
assert len(result) == 2
# Should be the SQLite version (equal count → prefers SQLite)
assert result[0]["content"] == "db-q"
class TestWhatsAppDMSessionKeyConsistency:
"""Regression: all session-key construction must go through build_session_key
so DMs are isolated by chat_id across platforms."""
-1
View File
@@ -39,7 +39,6 @@ def _make_runner():
runner._pending_messages = {}
runner._pending_approvals = {}
runner._voice_mode = {}
runner._background_tasks = set()
runner._is_user_authorized = lambda _source: True
return runner
@@ -1,154 +0,0 @@
"""
Tests for Telegram polling network error recovery.
Specifically tests the fix for #3173 — when start_polling() fails after a
network error, the adapter must self-reschedule the next reconnect attempt
rather than silently leaving polling dead.
"""
import asyncio
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import PlatformConfig
def _ensure_telegram_mock():
if "telegram" in sys.modules and hasattr(sys.modules["telegram"], "__file__"):
return
telegram_mod = MagicMock()
telegram_mod.ext.ContextTypes.DEFAULT_TYPE = type(None)
telegram_mod.constants.ParseMode.MARKDOWN_V2 = "MarkdownV2"
telegram_mod.constants.ChatType.GROUP = "group"
telegram_mod.constants.ChatType.SUPERGROUP = "supergroup"
telegram_mod.constants.ChatType.CHANNEL = "channel"
telegram_mod.constants.ChatType.PRIVATE = "private"
for name in ("telegram", "telegram.ext", "telegram.constants"):
sys.modules.setdefault(name, telegram_mod)
_ensure_telegram_mock()
from gateway.platforms.telegram import TelegramAdapter # noqa: E402
def _make_adapter() -> TelegramAdapter:
return TelegramAdapter(PlatformConfig(enabled=True, token="test-token"))
@pytest.mark.asyncio
async def test_reconnect_self_schedules_on_start_polling_failure():
"""
When start_polling() raises during a network error retry, the adapter must
schedule a new _handle_polling_network_error task otherwise polling stays
dead with no further error callbacks to trigger recovery.
Regression test for #3173: gateway becomes unresponsive after Telegram 502.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 1
mock_updater = MagicMock()
mock_updater.running = True
mock_updater.stop = AsyncMock()
mock_updater.start_polling = AsyncMock(side_effect=Exception("Timed out"))
mock_app = MagicMock()
mock_app.updater = mock_updater
adapter._app = mock_app
with patch("asyncio.sleep", new_callable=AsyncMock):
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
# A retry task must have been added to _background_tasks
pending = [t for t in adapter._background_tasks if not t.done()]
assert len(pending) >= 1, (
"Expected at least one self-rescheduled retry task in _background_tasks "
f"after start_polling failure, got {len(pending)}"
)
# Clean up — cancel the pending retry so it doesn't run after the test
for t in pending:
t.cancel()
try:
await t
except (asyncio.CancelledError, Exception):
pass
@pytest.mark.asyncio
async def test_reconnect_does_not_self_schedule_when_fatal_error_set():
"""
When a fatal error is already set, the failed reconnect should NOT create
another retry task the gateway is already shutting down this adapter.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 1
adapter._set_fatal_error("telegram_network_error", "already fatal", retryable=True)
mock_updater = MagicMock()
mock_updater.running = True
mock_updater.stop = AsyncMock()
mock_updater.start_polling = AsyncMock(side_effect=Exception("Timed out"))
mock_app = MagicMock()
mock_app.updater = mock_updater
adapter._app = mock_app
initial_count = len(adapter._background_tasks)
with patch("asyncio.sleep", new_callable=AsyncMock):
await adapter._handle_polling_network_error(Exception("Timed out"))
assert len(adapter._background_tasks) == initial_count, (
"Should not schedule a retry when a fatal error is already set"
)
@pytest.mark.asyncio
async def test_reconnect_success_resets_error_count():
"""
When start_polling() succeeds, _polling_network_error_count should reset to 0.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 3
mock_updater = MagicMock()
mock_updater.running = True
mock_updater.stop = AsyncMock()
mock_updater.start_polling = AsyncMock() # succeeds
mock_app = MagicMock()
mock_app.updater = mock_updater
adapter._app = mock_app
with patch("asyncio.sleep", new_callable=AsyncMock):
await adapter._handle_polling_network_error(Exception("Bad Gateway"))
assert adapter._polling_network_error_count == 0
@pytest.mark.asyncio
async def test_reconnect_triggers_fatal_after_max_retries():
"""
After MAX_NETWORK_RETRIES attempts, the adapter should set a fatal error
rather than retrying forever.
"""
adapter = _make_adapter()
adapter._polling_network_error_count = 10 # MAX_NETWORK_RETRIES
fatal_handler = AsyncMock()
adapter.set_fatal_error_handler(fatal_handler)
mock_app = MagicMock()
adapter._app = mock_app
await adapter._handle_polling_network_error(Exception("still failing"))
assert adapter.has_fatal_error
assert adapter.fatal_error_code == "telegram_network_error"
fatal_handler.assert_called_once()
-146
View File
@@ -1,146 +0,0 @@
"""Tests for gateway /verbose command (config-gated tool progress cycling)."""
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
import yaml
import gateway.run as gateway_run
from gateway.config import Platform
from gateway.platforms.base import MessageEvent
from gateway.session import SessionSource
def _make_event(text="/verbose", platform=Platform.TELEGRAM, user_id="12345", chat_id="67890"):
"""Build a MessageEvent for testing."""
source = SessionSource(
platform=platform,
user_id=user_id,
chat_id=chat_id,
user_name="testuser",
)
return MessageEvent(text=text, source=source)
def _make_runner():
"""Create a bare GatewayRunner without calling __init__."""
runner = object.__new__(gateway_run.GatewayRunner)
runner.adapters = {}
runner._ephemeral_system_prompt = ""
runner._prefill_messages = []
runner._reasoning_config = None
runner._show_reasoning = False
runner._provider_routing = {}
runner._fallback_model = None
runner._running_agents = {}
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
runner.hooks.loaded_hooks = []
runner._session_db = None
runner._get_or_create_gateway_honcho = lambda session_key: (None, None)
return runner
class TestVerboseCommand:
"""Tests for _handle_verbose_command in the gateway."""
@pytest.mark.asyncio
async def test_disabled_by_default(self, tmp_path, monkeypatch):
"""When tool_progress_command is false, /verbose returns an info message."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("display:\n tool_progress: all\n", encoding="utf-8")
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
runner = _make_runner()
result = await runner._handle_verbose_command(_make_event())
assert "not enabled" in result.lower()
assert "tool_progress_command" in result
@pytest.mark.asyncio
async def test_enabled_cycles_mode(self, tmp_path, monkeypatch):
"""When enabled, /verbose cycles tool_progress mode."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text(
"display:\n tool_progress_command: true\n tool_progress: all\n",
encoding="utf-8",
)
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
runner = _make_runner()
result = await runner._handle_verbose_command(_make_event())
# all -> verbose
assert "VERBOSE" in result
# Verify config was saved
saved = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert saved["display"]["tool_progress"] == "verbose"
@pytest.mark.asyncio
async def test_cycles_through_all_modes(self, tmp_path, monkeypatch):
"""Calling /verbose repeatedly cycles through all four modes."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text(
"display:\n tool_progress_command: true\n tool_progress: 'off'\n",
encoding="utf-8",
)
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
runner = _make_runner()
# off -> new -> all -> verbose -> off
expected = ["new", "all", "verbose", "off"]
for mode in expected:
result = await runner._handle_verbose_command(_make_event())
saved = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert saved["display"]["tool_progress"] == mode, \
f"Expected {mode}, got {saved['display']['tool_progress']}"
@pytest.mark.asyncio
async def test_defaults_to_all_when_no_tool_progress_set(self, tmp_path, monkeypatch):
"""When tool_progress is not in config, defaults to 'all' then cycles to verbose."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text(
"display:\n tool_progress_command: true\n",
encoding="utf-8",
)
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
runner = _make_runner()
result = await runner._handle_verbose_command(_make_event())
# default "all" -> verbose
assert "VERBOSE" in result
saved = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert saved["display"]["tool_progress"] == "verbose"
@pytest.mark.asyncio
async def test_no_config_file_returns_disabled(self, tmp_path, monkeypatch):
"""When config.yaml doesn't exist, command reports disabled."""
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
# No config.yaml
monkeypatch.setattr(gateway_run, "_hermes_home", hermes_home)
runner = _make_runner()
result = await runner._handle_verbose_command(_make_event())
assert "not enabled" in result.lower()
def test_verbose_is_in_gateway_known_commands(self):
"""The /verbose command is recognized by the gateway dispatch."""
from hermes_cli.commands import GATEWAY_KNOWN_COMMANDS
assert "verbose" in GATEWAY_KNOWN_COMMANDS
+8 -86
View File
@@ -134,19 +134,12 @@ class TestDerivedDicts:
# ---------------------------------------------------------------------------
class TestGatewayKnownCommands:
def test_excludes_cli_only_without_config_gate(self):
def test_excludes_cli_only(self):
for cmd in COMMAND_REGISTRY:
if cmd.cli_only and not cmd.gateway_config_gate:
if cmd.cli_only:
assert cmd.name not in GATEWAY_KNOWN_COMMANDS, \
f"cli_only command '{cmd.name}' should not be in GATEWAY_KNOWN_COMMANDS"
def test_includes_config_gated_cli_only(self):
"""Commands with gateway_config_gate are always in GATEWAY_KNOWN_COMMANDS."""
for cmd in COMMAND_REGISTRY:
if cmd.gateway_config_gate:
assert cmd.name in GATEWAY_KNOWN_COMMANDS, \
f"config-gated command '{cmd.name}' should be in GATEWAY_KNOWN_COMMANDS"
def test_includes_gateway_commands(self):
for cmd in COMMAND_REGISTRY:
if not cmd.cli_only:
@@ -167,11 +160,11 @@ class TestGatewayHelpLines:
lines = gateway_help_lines()
assert len(lines) > 10
def test_excludes_cli_only_commands_without_config_gate(self):
def test_excludes_cli_only_commands(self):
lines = gateway_help_lines()
joined = "\n".join(lines)
for cmd in COMMAND_REGISTRY:
if cmd.cli_only and not cmd.gateway_config_gate:
if cmd.cli_only:
assert f"`/{cmd.name}" not in joined, \
f"cli_only command /{cmd.name} should not be in gateway help"
@@ -195,10 +188,10 @@ class TestTelegramBotCommands:
for name, _ in telegram_bot_commands():
assert "-" not in name, f"Telegram command '{name}' contains a hyphen"
def test_excludes_cli_only_without_config_gate(self):
def test_excludes_cli_only(self):
names = {name for name, _ in telegram_bot_commands()}
for cmd in COMMAND_REGISTRY:
if cmd.cli_only and not cmd.gateway_config_gate:
if cmd.cli_only:
tg_name = cmd.name.replace("-", "_")
assert tg_name not in names
@@ -218,84 +211,13 @@ class TestSlackSubcommandMap:
assert "bg" in mapping
assert "reset" in mapping
def test_excludes_cli_only_without_config_gate(self):
def test_excludes_cli_only(self):
mapping = slack_subcommand_map()
for cmd in COMMAND_REGISTRY:
if cmd.cli_only and not cmd.gateway_config_gate:
if cmd.cli_only:
assert cmd.name not in mapping
# ---------------------------------------------------------------------------
# Config-gated gateway commands
# ---------------------------------------------------------------------------
class TestGatewayConfigGate:
"""Tests for the gateway_config_gate mechanism on CommandDef."""
def test_verbose_has_config_gate(self):
cmd = resolve_command("verbose")
assert cmd is not None
assert cmd.cli_only is True
assert cmd.gateway_config_gate == "display.tool_progress_command"
def test_verbose_in_gateway_known_commands(self):
"""Config-gated commands are always recognized by the gateway."""
assert "verbose" in GATEWAY_KNOWN_COMMANDS
def test_config_gate_excluded_from_help_when_off(self, tmp_path, monkeypatch):
"""When the config gate is falsy, the command should not appear in help."""
# Write a config with the gate off (default)
config_file = tmp_path / "config.yaml"
config_file.write_text("display:\n tool_progress_command: false\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
lines = gateway_help_lines()
joined = "\n".join(lines)
assert "`/verbose" not in joined
def test_config_gate_included_in_help_when_on(self, tmp_path, monkeypatch):
"""When the config gate is truthy, the command should appear in help."""
config_file = tmp_path / "config.yaml"
config_file.write_text("display:\n tool_progress_command: true\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
lines = gateway_help_lines()
joined = "\n".join(lines)
assert "`/verbose" in joined
def test_config_gate_excluded_from_telegram_when_off(self, tmp_path, monkeypatch):
config_file = tmp_path / "config.yaml"
config_file.write_text("display:\n tool_progress_command: false\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
names = {name for name, _ in telegram_bot_commands()}
assert "verbose" not in names
def test_config_gate_included_in_telegram_when_on(self, tmp_path, monkeypatch):
config_file = tmp_path / "config.yaml"
config_file.write_text("display:\n tool_progress_command: true\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
names = {name for name, _ in telegram_bot_commands()}
assert "verbose" in names
def test_config_gate_excluded_from_slack_when_off(self, tmp_path, monkeypatch):
config_file = tmp_path / "config.yaml"
config_file.write_text("display:\n tool_progress_command: false\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
mapping = slack_subcommand_map()
assert "verbose" not in mapping
def test_config_gate_included_in_slack_when_on(self, tmp_path, monkeypatch):
config_file = tmp_path / "config.yaml"
config_file.write_text("display:\n tool_progress_command: true\n")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
mapping = slack_subcommand_map()
assert "verbose" in mapping
# ---------------------------------------------------------------------------
# Autocomplete (SlashCommandCompleter)
# ---------------------------------------------------------------------------
+1 -55
View File
@@ -3,7 +3,7 @@ from io import StringIO
import pytest
from rich.console import Console
from hermes_cli.skills_hub import do_check, do_install, do_list, do_update, handle_skills_slash
from hermes_cli.skills_hub import do_check, do_list, do_update, handle_skills_slash
class _DummyLockFile:
@@ -177,57 +177,3 @@ def test_do_update_reinstalls_outdated_skills(monkeypatch):
assert installs == [("skills-sh/example/repo/hub-skill", "category", True)]
assert "Updated 1 skill" in output
def test_do_install_scans_with_resolved_identifier(monkeypatch, tmp_path, hub_env):
import tools.skills_guard as guard
import tools.skills_hub as hub
canonical_identifier = "skills-sh/anthropics/skills/frontend-design"
class _ResolvedSource:
def inspect(self, identifier):
return type("Meta", (), {
"extra": {},
"identifier": canonical_identifier,
})()
def fetch(self, identifier):
return type("Bundle", (), {
"name": "frontend-design",
"files": {"SKILL.md": "# Frontend Design"},
"source": "skills.sh",
"identifier": canonical_identifier,
"trust_level": "trusted",
"metadata": {},
})()
q_path = tmp_path / "skills" / ".hub" / "quarantine" / "frontend-design"
q_path.mkdir(parents=True)
(q_path / "SKILL.md").write_text("# Frontend Design")
scanned = {}
def _scan_skill(skill_path, source="community"):
scanned["source"] = source
return guard.ScanResult(
skill_name="frontend-design",
source=source,
trust_level="trusted",
verdict="safe",
)
monkeypatch.setattr(hub, "ensure_hub_dirs", lambda: None)
monkeypatch.setattr(hub, "create_source_router", lambda auth: [_ResolvedSource()])
monkeypatch.setattr(hub, "quarantine_bundle", lambda bundle: q_path)
monkeypatch.setattr(hub, "HubLockFile", lambda: type("Lock", (), {"get_installed": lambda self, name: None})())
monkeypatch.setattr(guard, "scan_skill", _scan_skill)
monkeypatch.setattr(guard, "format_scan_report", lambda result: "scan ok")
monkeypatch.setattr(guard, "should_allow_install", lambda result, force=False: (False, "stop after scan"))
sink = StringIO()
console = Console(file=sink, force_terminal=False, color_system=None)
do_install("skils-sh/anthropics/skills/frontend-design", console=console, skip_confirm=True)
assert scanned["source"] == canonical_identifier
-33
View File
@@ -35,39 +35,6 @@ def test_platform_toolset_summary_uses_explicit_platform_list():
assert summary["cli"] == _get_platform_tools(config, "cli")
def test_get_platform_tools_includes_enabled_mcp_servers_by_default():
config = {
"mcp_servers": {
"exa": {"url": "https://mcp.exa.ai/mcp"},
"web-search-prime": {"url": "https://api.z.ai/api/mcp/web_search_prime/mcp"},
"disabled-server": {"url": "https://example.com/mcp", "enabled": False},
}
}
enabled = _get_platform_tools(config, "cli")
assert "exa" in enabled
assert "web-search-prime" in enabled
assert "disabled-server" not in enabled
def test_get_platform_tools_keeps_enabled_mcp_servers_with_explicit_builtin_selection():
config = {
"platform_toolsets": {"cli": ["web", "memory"]},
"mcp_servers": {
"exa": {"url": "https://mcp.exa.ai/mcp"},
"web-search-prime": {"url": "https://api.z.ai/api/mcp/web_search_prime/mcp"},
},
}
enabled = _get_platform_tools(config, "cli")
assert "web" in enabled
assert "memory" in enabled
assert "exa" in enabled
assert "web-search-prime" in enabled
def test_toolset_has_keys_for_vision_accepts_codex_auth(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "auth.json").write_text(
@@ -134,23 +134,6 @@ class TestToolsMixedTargets:
assert "web" not in saved["platform_toolsets"]["cli"]
assert "create_issue" in saved["mcp_servers"]["github"]["tools"]["exclude"]
def test_builtin_toggle_does_not_persist_implicit_mcp_defaults(self):
config = {
"platform_toolsets": {"cli": ["web", "memory"]},
"mcp_servers": {"exa": {"url": "https://mcp.exa.ai/mcp"}},
}
with patch("hermes_cli.tools_config.load_config", return_value=config), \
patch("hermes_cli.tools_config.save_config") as mock_save:
tools_disable_enable_command(Namespace(
tools_action="disable",
names=["web"],
platform="cli",
))
saved = mock_save.call_args[0][0]
assert "web" not in saved["platform_toolsets"]["cli"]
assert "memory" in saved["platform_toolsets"]["cli"]
assert "exa" not in saved["platform_toolsets"]["cli"]
# ── List output ──────────────────────────────────────────────────────────────
-105
View File
@@ -1,105 +0,0 @@
"""Tests for KeyboardInterrupt handling in exit cleanup paths.
``except Exception`` does not catch ``KeyboardInterrupt`` (which inherits
from ``BaseException``). A second Ctrl+C during exit cleanup must not
abort remaining cleanup steps. These tests exercise the actual production
code paths not a copy of the try/except pattern.
"""
import atexit
import weakref
from unittest.mock import MagicMock, patch, call
import pytest
class TestHonchoAtexitFlush:
"""run_agent.py — _register_honcho_exit_hook atexit handler."""
def test_keyboard_interrupt_during_flush_does_not_propagate(self):
"""The atexit handler must swallow KeyboardInterrupt from flush_all()."""
mock_manager = MagicMock()
mock_manager.flush_all.side_effect = KeyboardInterrupt
# Capture functions passed to atexit.register
registered_fns = []
original_register = atexit.register
def capturing_register(fn, *args, **kwargs):
registered_fns.append(fn)
# Don't actually register — we don't want side effects
with patch("atexit.register", side_effect=capturing_register):
from run_agent import AIAgent
agent = object.__new__(AIAgent)
agent._honcho = mock_manager
agent._honcho_exit_hook_registered = False
agent._register_honcho_exit_hook()
# Our handler is the last one registered
assert len(registered_fns) >= 1, "atexit handler was not registered"
flush_handler = registered_fns[-1]
# Invoke the registered handler — must not raise
flush_handler()
mock_manager.flush_all.assert_called_once()
class TestCronJobCleanup:
"""cron/scheduler.py — end_session + close in the finally block."""
def test_keyboard_interrupt_in_end_session_does_not_skip_close(self):
"""If end_session raises KeyboardInterrupt, close() must still run."""
mock_db = MagicMock()
mock_db.end_session.side_effect = KeyboardInterrupt
from cron import scheduler
job = {
"id": "test-job-1",
"name": "test cleanup",
"prompt": "hello",
"schedule": "0 9 * * *",
"model": "test/model",
}
with patch("hermes_state.SessionDB", return_value=mock_db), \
patch.object(scheduler, "_build_job_prompt", return_value="hello"), \
patch.object(scheduler, "_resolve_origin", return_value=None), \
patch.object(scheduler, "_resolve_delivery_target", return_value=None), \
patch("dotenv.load_dotenv", return_value=None), \
patch("run_agent.AIAgent") as MockAgent:
# Make the agent raise immediately so we hit the finally block
MockAgent.return_value.run_conversation.side_effect = RuntimeError("boom")
scheduler.run_job(job)
mock_db.end_session.assert_called_once()
mock_db.close.assert_called_once()
def test_keyboard_interrupt_in_close_does_not_propagate(self):
"""If close() raises KeyboardInterrupt, it must not escape run_job."""
mock_db = MagicMock()
mock_db.close.side_effect = KeyboardInterrupt
from cron import scheduler
job = {
"id": "test-job-2",
"name": "test close interrupt",
"prompt": "hello",
"schedule": "0 9 * * *",
"model": "test/model",
}
with patch("hermes_state.SessionDB", return_value=mock_db), \
patch.object(scheduler, "_build_job_prompt", return_value="hello"), \
patch.object(scheduler, "_resolve_origin", return_value=None), \
patch.object(scheduler, "_resolve_delivery_target", return_value=None), \
patch("dotenv.load_dotenv", return_value=None), \
patch("run_agent.AIAgent") as MockAgent:
MockAgent.return_value.run_conversation.side_effect = RuntimeError("boom")
# Must not raise
scheduler.run_job(job)
mock_db.end_session.assert_called_once()
mock_db.close.assert_called_once()
-146
View File
@@ -1102,89 +1102,6 @@ class TestListSessionsRich:
assert "Line one Line two" in sessions[0]["preview"]
# =========================================================================
# Session source exclusion (--source flag for third-party isolation)
# =========================================================================
class TestExcludeSources:
"""Tests for exclude_sources on list_sessions_rich and search_messages."""
def test_list_sessions_rich_excludes_tool_source(self, db):
db.create_session("s1", "cli")
db.create_session("s2", "tool")
db.create_session("s3", "telegram")
sessions = db.list_sessions_rich(exclude_sources=["tool"])
ids = [s["id"] for s in sessions]
assert "s1" in ids
assert "s3" in ids
assert "s2" not in ids
def test_list_sessions_rich_no_exclusion_returns_all(self, db):
db.create_session("s1", "cli")
db.create_session("s2", "tool")
sessions = db.list_sessions_rich()
ids = [s["id"] for s in sessions]
assert "s1" in ids
assert "s2" in ids
def test_list_sessions_rich_source_and_exclude_combined(self, db):
"""When source= is explicit, exclude_sources should not conflict."""
db.create_session("s1", "cli")
db.create_session("s2", "tool")
db.create_session("s3", "telegram")
# Explicit source filter: only tool sessions, no exclusion
sessions = db.list_sessions_rich(source="tool")
ids = [s["id"] for s in sessions]
assert ids == ["s2"]
def test_list_sessions_rich_exclude_multiple_sources(self, db):
db.create_session("s1", "cli")
db.create_session("s2", "tool")
db.create_session("s3", "cron")
db.create_session("s4", "telegram")
sessions = db.list_sessions_rich(exclude_sources=["tool", "cron"])
ids = [s["id"] for s in sessions]
assert "s1" in ids
assert "s4" in ids
assert "s2" not in ids
assert "s3" not in ids
def test_search_messages_excludes_tool_source(self, db):
db.create_session("s1", "cli")
db.append_message("s1", "user", "Python deployment question")
db.create_session("s2", "tool")
db.append_message("s2", "user", "Python automated question")
results = db.search_messages("Python", exclude_sources=["tool"])
sources = [r["source"] for r in results]
assert "cli" in sources
assert "tool" not in sources
def test_search_messages_no_exclusion_returns_all_sources(self, db):
db.create_session("s1", "cli")
db.append_message("s1", "user", "Rust deployment question")
db.create_session("s2", "tool")
db.append_message("s2", "user", "Rust automated question")
results = db.search_messages("Rust")
sources = [r["source"] for r in results]
assert "cli" in sources
assert "tool" in sources
def test_search_messages_source_include_and_exclude(self, db):
"""source_filter (include) and exclude_sources can coexist."""
db.create_session("s1", "cli")
db.append_message("s1", "user", "Golang test")
db.create_session("s2", "telegram")
db.append_message("s2", "user", "Golang test")
db.create_session("s3", "tool")
db.append_message("s3", "user", "Golang test")
# Include cli+tool, but exclude tool → should only return cli
results = db.search_messages(
"Golang", source_filter=["cli", "tool"], exclude_sources=["tool"]
)
sources = [r["source"] for r in results]
assert sources == ["cli"]
class TestResolveSessionByNameOrId:
"""Tests for the main.py helper that resolves names or IDs."""
@@ -1199,66 +1116,3 @@ class TestResolveSessionByNameOrId:
db.set_session_title("s1", "my project")
result = db.resolve_session_by_title("my project")
assert result == "s1"
# =========================================================================
# Concurrent write safety / lock contention fixes (#3139)
# =========================================================================
class TestConcurrentWriteSafety:
def test_create_session_insert_or_ignore_is_idempotent(self, db):
"""create_session with the same ID twice must not raise (INSERT OR IGNORE)."""
db.create_session(session_id="dup-1", source="cli", model="m")
# Second call should be silent — no IntegrityError
db.create_session(session_id="dup-1", source="gateway", model="m2")
session = db.get_session("dup-1")
# Row should exist (first write wins with OR IGNORE)
assert session is not None
assert session["source"] == "cli"
def test_ensure_session_creates_missing_row(self, db):
"""ensure_session must create a minimal row when the session doesn't exist."""
assert db.get_session("orphan-session") is None
db.ensure_session("orphan-session", source="gateway", model="test-model")
row = db.get_session("orphan-session")
assert row is not None
assert row["source"] == "gateway"
assert row["model"] == "test-model"
def test_ensure_session_is_idempotent(self, db):
"""ensure_session on an existing row must be a no-op (no overwrite)."""
db.create_session(session_id="existing", source="cli", model="original-model")
db.ensure_session("existing", source="gateway", model="overwrite-model")
row = db.get_session("existing")
# First write wins — ensure_session must not overwrite
assert row["source"] == "cli"
assert row["model"] == "original-model"
def test_ensure_session_allows_append_message_after_failed_create(self, db):
"""Messages can be flushed even when create_session failed at startup.
Simulates the #3139 scenario: create_session raises (lock), then
ensure_session is called during flush, then append_message succeeds.
"""
# Simulate failed create_session — row absent
db.ensure_session("late-session", source="gateway", model="gpt-4")
db.append_message(
session_id="late-session",
role="user",
content="hello after lock",
)
msgs = db.get_messages("late-session")
assert len(msgs) == 1
assert msgs[0]["content"] == "hello after lock"
def test_sqlite_timeout_is_at_least_30s(self, db):
"""Connection timeout should be >= 30s to survive CLI/gateway contention."""
# Access the underlying connection timeout via sqlite3 introspection.
# There is no public API, so we check the kwarg via the module default.
import sqlite3
import inspect
from hermes_state import SessionDB as _SessionDB
src = inspect.getsource(_SessionDB.__init__)
assert "30" in src, (
"SQLite timeout should be at least 30s to handle CLI/gateway lock contention"
)
+1 -1
View File
@@ -556,7 +556,7 @@ class TestAuxiliaryClientProviderPriority:
with patch("agent.auxiliary_client._read_nous_auth", return_value={"access_token": "nous-tok"}), \
patch("agent.auxiliary_client.OpenAI") as mock:
client, model = get_text_auxiliary_client()
assert model == "google/gemini-3-flash-preview"
assert model == "gemini-3-flash"
def test_custom_endpoint_when_no_nous(self, monkeypatch):
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
-70
View File
@@ -512,73 +512,3 @@ class TestGatewayProtection:
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is False
class TestNormalizationBypass:
"""Obfuscation techniques must not bypass dangerous command detection."""
def test_fullwidth_unicode_rm(self):
"""Fullwidth Unicode 'rm -rf /' must be caught after NFKC normalization."""
cmd = "\uff52\uff4d -\uff52\uff46 /" # rm -rf /
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True, f"Fullwidth 'rm -rf /' was not detected: {cmd!r}"
def test_fullwidth_unicode_dd(self):
"""Fullwidth 'dd if=/dev/zero' must be caught."""
cmd = "\uff44\uff44 if=/dev/zero of=/dev/sda"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True
def test_fullwidth_unicode_chmod(self):
"""Fullwidth 'chmod 777' must be caught."""
cmd = "\uff43\uff48\uff4d\uff4f\uff44 777 /tmp/test"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True
def test_ansi_csi_wrapped_rm(self):
"""ANSI CSI color codes wrapping 'rm' must be stripped and caught."""
cmd = "\x1b[31mrm\x1b[0m -rf /"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True, f"ANSI-wrapped 'rm -rf /' was not detected"
def test_ansi_osc_embedded_rm(self):
"""ANSI OSC sequences embedded in command must be stripped."""
cmd = "\x1b]0;title\x07rm -rf /"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True
def test_ansi_8bit_c1_wrapped_rm(self):
"""8-bit C1 CSI (0x9b) wrapping 'rm' must be stripped and caught."""
cmd = "\x9b31mrm\x9b0m -rf /"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True, "8-bit C1 CSI bypass was not caught"
def test_null_byte_in_rm(self):
"""Null bytes injected into 'rm' must be stripped and caught."""
cmd = "r\x00m -rf /"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True, f"Null-byte 'rm' was not detected: {cmd!r}"
def test_null_byte_in_dd(self):
"""Null bytes in 'dd' must be stripped."""
cmd = "d\x00d if=/dev/sda"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True
def test_mixed_fullwidth_and_ansi(self):
"""Combined fullwidth + ANSI obfuscation must still be caught."""
cmd = "\x1b[1m\uff52\uff4d\x1b[0m -rf /"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is True
def test_safe_command_after_normalization(self):
"""Normal safe commands must not be flagged after normalization."""
cmd = "ls -la /tmp"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is False
def test_fullwidth_safe_command_not_flagged(self):
"""Fullwidth 'ls -la' is safe and must not be flagged."""
cmd = "\uff4c\uff53 -\uff4c\uff41 /tmp"
dangerous, key, desc = detect_dangerous_command(cmd)
assert dangerous is False
@@ -1,66 +0,0 @@
"""Tests for delegate_tool toolset scoping.
Verifies that subagents cannot gain tools that the parent does not have.
The LLM controls the `toolsets` parameter without intersection with the
parent's enabled_toolsets, it can escalate privileges by requesting
arbitrary toolsets.
"""
from unittest.mock import MagicMock, patch
from types import SimpleNamespace
from tools.delegate_tool import _strip_blocked_tools
class TestToolsetIntersection:
"""Subagent toolsets must be a subset of parent's enabled_toolsets."""
def test_requested_toolsets_intersected_with_parent(self):
"""LLM requests toolsets parent doesn't have — extras are dropped."""
parent = SimpleNamespace(enabled_toolsets=["terminal", "file"])
# Simulate the intersection logic from _build_child_agent
parent_toolsets = set(parent.enabled_toolsets)
requested = ["terminal", "file", "web", "browser", "rl"]
scoped = [t for t in requested if t in parent_toolsets]
assert sorted(scoped) == ["file", "terminal"]
assert "web" not in scoped
assert "browser" not in scoped
assert "rl" not in scoped
def test_all_requested_toolsets_available_on_parent(self):
"""LLM requests subset of parent tools — all pass through."""
parent = SimpleNamespace(enabled_toolsets=["terminal", "file", "web", "browser"])
parent_toolsets = set(parent.enabled_toolsets)
requested = ["terminal", "web"]
scoped = [t for t in requested if t in parent_toolsets]
assert sorted(scoped) == ["terminal", "web"]
def test_no_toolsets_requested_inherits_parent(self):
"""When toolsets is None/empty, child inherits parent's set."""
parent_toolsets = ["terminal", "file", "web"]
child = _strip_blocked_tools(parent_toolsets)
assert "terminal" in child
assert "file" in child
assert "web" in child
def test_strip_blocked_removes_delegation(self):
"""Blocked toolsets (delegation, clarify, etc.) are always removed."""
child = _strip_blocked_tools(["terminal", "delegation", "clarify", "memory"])
assert "delegation" not in child
assert "clarify" not in child
assert "memory" not in child
assert "terminal" in child
def test_empty_intersection_yields_empty_toolsets(self):
"""If parent has no overlap with requested, child gets nothing extra."""
parent = SimpleNamespace(enabled_toolsets=["terminal"])
parent_toolsets = set(parent.enabled_toolsets)
requested = ["web", "browser"]
scoped = [t for t in requested if t in parent_toolsets]
assert scoped == []
-12
View File
@@ -8,7 +8,6 @@ from tools.session_search_tool import (
_format_timestamp,
_format_conversation,
_truncate_around_matches,
_HIDDEN_SESSION_SOURCES,
MAX_SESSION_CHARS,
SESSION_SEARCH_SCHEMA,
)
@@ -18,17 +17,6 @@ from tools.session_search_tool import (
# Tool schema guidance
# =========================================================================
class TestHiddenSessionSources:
"""Verify the _HIDDEN_SESSION_SOURCES constant used for third-party isolation."""
def test_tool_source_is_hidden(self):
assert "tool" in _HIDDEN_SESSION_SOURCES
def test_standard_sources_not_hidden(self):
for src in ("cli", "telegram", "discord", "slack", "cron"):
assert src not in _HIDDEN_SESSION_SOURCES
class TestSessionSearchSchema:
def test_keeps_cross_session_recall_guidance_without_current_session_nudge(self):
description = SESSION_SEARCH_SCHEMA["description"]
-7
View File
@@ -55,13 +55,6 @@ class TestResolveTrustLevel:
assert _resolve_trust_level("anthropics/skills") == "trusted"
assert _resolve_trust_level("openai/skills/some-skill") == "trusted"
def test_skills_sh_wrapped_trusted_repos(self):
assert _resolve_trust_level("skills-sh/openai/skills/skill-creator") == "trusted"
assert _resolve_trust_level("skills-sh/anthropics/skills/frontend-design") == "trusted"
def test_common_skills_sh_prefix_typo_still_maps_to_trusted_repo(self):
assert _resolve_trust_level("skils-sh/anthropics/skills/frontend-design") == "trusted"
def test_community_default(self):
assert _resolve_trust_level("random-user/my-skill") == "community"
assert _resolve_trust_level("") == "community"
-101
View File
@@ -179,24 +179,6 @@ class TestSkillsShSource:
assert bundle.identifier == "skills-sh/vercel-labs/agent-skills/vercel-react-best-practices"
mock_fetch.assert_called_once_with("vercel-labs/agent-skills/vercel-react-best-practices")
@patch.object(GitHubSource, "fetch")
def test_fetch_accepts_common_skills_sh_prefix_typo(self, mock_fetch):
expected_identifier = "anthropics/skills/frontend-design"
mock_fetch.side_effect = lambda identifier: SkillBundle(
name="frontend-design",
files={"SKILL.md": "# Frontend Design"},
source="github",
identifier=expected_identifier,
trust_level="trusted",
) if identifier == expected_identifier else None
bundle = self._source().fetch("skils-sh/anthropics/skills/frontend-design")
assert bundle is not None
assert bundle.source == "skills.sh"
assert bundle.identifier == "skills-sh/anthropics/skills/frontend-design"
assert mock_fetch.call_args_list[0] == ((expected_identifier,), {})
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch("tools.skills_hub.httpx.get")
@@ -231,26 +213,6 @@ class TestSkillsShSource:
assert meta.extra["security_audits"]["socket"] == "Pass"
mock_inspect.assert_called_once_with("vercel-labs/agent-skills/vercel-react-best-practices")
@patch.object(GitHubSource, "inspect")
def test_inspect_accepts_common_skills_sh_prefix_typo(self, mock_inspect):
expected_identifier = "anthropics/skills/frontend-design"
mock_inspect.side_effect = lambda identifier: SkillMeta(
name="frontend-design",
description="Distinctive frontend interfaces.",
source="github",
identifier=expected_identifier,
trust_level="trusted",
repo="anthropics/skills",
path="frontend-design",
) if identifier == expected_identifier else None
meta = self._source().inspect("skils-sh/anthropics/skills/frontend-design")
assert meta is not None
assert meta.source == "skills.sh"
assert meta.identifier == "skills-sh/anthropics/skills/frontend-design"
assert mock_inspect.call_args_list[0] == ((expected_identifier,), {})
@patch.object(GitHubSource, "_list_skills_in_repo")
@patch.object(GitHubSource, "inspect")
def test_inspect_falls_back_to_repo_skill_catalog_when_slug_differs(self, mock_inspect, mock_list_skills):
@@ -345,39 +307,6 @@ class TestSkillsShSource:
assert bundle.files["SKILL.md"] == "# react"
assert mock_get.called
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch.object(SkillsShSource, "_discover_identifier")
@patch.object(SkillsShSource, "_fetch_detail_page")
@patch.object(GitHubSource, "fetch")
def test_fetch_downloads_only_the_resolved_identifier(
self,
mock_fetch,
mock_detail,
mock_discover,
_mock_read_cache,
_mock_write_cache,
):
resolved_identifier = "owner/repo/product-team/product-designer"
mock_detail.return_value = {"repo": "owner/repo", "install_skill": "product-designer"}
mock_discover.return_value = resolved_identifier
resolved_bundle = SkillBundle(
name="product-designer",
files={"SKILL.md": "# Product Designer"},
source="github",
identifier=resolved_identifier,
trust_level="community",
)
mock_fetch.side_effect = lambda identifier: resolved_bundle if identifier == resolved_identifier else None
bundle = self._source().fetch("skills-sh/owner/repo/product-designer")
assert bundle is not None
assert bundle.identifier == "skills-sh/owner/repo/product-designer"
# All candidate identifiers are tried before falling back to discovery
assert mock_fetch.call_args_list[-1] == ((resolved_identifier,), {})
assert mock_fetch.call_args_list[0] == (("owner/repo/product-designer",), {})
@patch("tools.skills_hub._write_index_cache")
@patch("tools.skills_hub._read_index_cache", return_value=None)
@patch("tools.skills_hub.httpx.get")
@@ -440,36 +369,6 @@ class TestSkillsShSource:
# Verify the tree-resolved identifier was used for the final GitHub fetch
mock_fetch.assert_any_call("owner/repo/cli-tool/components/skills/development/my-skill")
@patch.object(GitHubSource, "_find_skill_in_repo_tree")
@patch.object(GitHubSource, "_list_skills_in_repo")
@patch("tools.skills_hub.httpx.get")
def test_discover_identifier_uses_tree_search_before_root_scan(
self,
mock_get,
mock_list_skills,
mock_find_in_tree,
):
root_url = "https://api.github.com/repos/owner/repo/contents/"
mock_list_skills.return_value = []
mock_find_in_tree.return_value = "owner/repo/product-team/product-designer"
def _httpx_get_side_effect(url, **kwargs):
resp = MagicMock()
if url == root_url:
resp.status_code = 200
resp.json = lambda: []
return resp
resp.status_code = 404
return resp
mock_get.side_effect = _httpx_get_side_effect
result = self._source()._discover_identifier("owner/repo/product-designer")
assert result == "owner/repo/product-team/product-designer"
requested_urls = [call.args[0] for call in mock_get.call_args_list]
assert root_url not in requested_urls
class TestFindSkillInRepoTree:
"""Tests for GitHubSource._find_skill_in_repo_tree."""
+1 -20
View File
@@ -13,7 +13,6 @@ import os
import re
import sys
import threading
import unicodedata
from typing import Optional
logger = logging.getLogger(__name__)
@@ -83,31 +82,13 @@ def _approval_key_aliases(pattern_key: str) -> set[str]:
# Detection
# =========================================================================
def _normalize_command_for_detection(command: str) -> str:
"""Normalize a command string before dangerous-pattern matching.
Strips ANSI escape sequences (full ECMA-48 via tools.ansi_strip),
null bytes, and normalizes Unicode fullwidth characters so that
obfuscation techniques cannot bypass the pattern-based detection.
"""
from tools.ansi_strip import strip_ansi
# Strip all ANSI escape sequences (CSI, OSC, DCS, 8-bit C1, etc.)
command = strip_ansi(command)
# Strip null bytes
command = command.replace('\x00', '')
# Normalize Unicode (fullwidth Latin, halfwidth Katakana, etc.)
command = unicodedata.normalize('NFKC', command)
return command
def detect_dangerous_command(command: str) -> tuple:
"""Check if a command matches any dangerous patterns.
Returns:
(is_dangerous, pattern_key, description) or (False, None, None)
"""
command_lower = _normalize_command_for_detection(command).lower()
command_lower = command.lower()
for pattern, description in DANGEROUS_PATTERNS:
if re.search(pattern, command_lower, re.IGNORECASE | re.DOTALL):
pattern_key = description
+1 -3
View File
@@ -174,10 +174,8 @@ def _build_child_agent(
# When no explicit toolsets given, inherit from parent's enabled toolsets
# so disabled tools (e.g. web) don't leak to subagents.
parent_toolsets = set(getattr(parent_agent, "enabled_toolsets", None) or DEFAULT_TOOLSETS)
if toolsets:
# Intersect with parent — subagent must not gain tools the parent lacks
child_toolsets = _strip_blocked_tools([t for t in toolsets if t in parent_toolsets])
child_toolsets = _strip_blocked_tools(toolsets)
elif parent_agent and getattr(parent_agent, "enabled_toolsets", None):
child_toolsets = _strip_blocked_tools(parent_agent.enabled_toolsets)
else:
+3 -3
View File
@@ -490,7 +490,7 @@ async def _send_discord(token, chat_id, message):
try:
url = f"https://discord.com/api/v10/channels/{chat_id}/messages"
headers = {"Authorization": f"Bot {token}", "Content-Type": "application/json"}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json={"content": message}) as resp:
if resp.status not in (200, 201):
body = await resp.text()
@@ -510,7 +510,7 @@ async def _send_slack(token, chat_id, message):
try:
url = "https://slack.com/api/chat.postMessage"
headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json={"channel": chat_id, "text": message}) as resp:
data = await resp.json()
if data.get("ok"):
@@ -649,7 +649,7 @@ async def _send_sms(auth_token, chat_id, message):
url = f"https://api.twilio.com/2010-04-01/Accounts/{account_sid}/Messages.json"
headers = {"Authorization": f"Basic {encoded}"}
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=30)) as session:
async with aiohttp.ClientSession() as session:
form_data = aiohttp.FormData()
form_data.add_field("From", from_number)
form_data.add_field("To", chat_id)
+1 -8
View File
@@ -178,16 +178,10 @@ async def _summarize_session(
return None
# Sources that are excluded from session browsing/searching by default.
# Third-party integrations (Paperclip agents, etc.) tag their sessions with
# HERMES_SESSION_SOURCE=tool so they don't clutter the user's session history.
_HIDDEN_SESSION_SOURCES = ("tool",)
def _list_recent_sessions(db, limit: int, current_session_id: str = None) -> str:
"""Return metadata for the most recent sessions (no LLM calls)."""
try:
sessions = db.list_sessions_rich(limit=limit + 5, exclude_sources=list(_HIDDEN_SESSION_SOURCES)) # fetch extra to skip current
sessions = db.list_sessions_rich(limit=limit + 5) # fetch extra to skip current
# Resolve current session lineage to exclude it
current_root = None
@@ -271,7 +265,6 @@ def session_search(
raw_results = db.search_messages(
query=query,
role_filter=role_list,
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
limit=50, # Get more matches to find unique sessions
offset=0,
)
+3 -15
View File
@@ -1050,27 +1050,15 @@ def _get_configured_model() -> str:
def _resolve_trust_level(source: str) -> str:
"""Map a source identifier to a trust level."""
prefix_aliases = (
"skills-sh/",
"skills.sh/",
"skils-sh/",
"skils.sh/",
)
normalized_source = source
for prefix in prefix_aliases:
if normalized_source.startswith(prefix):
normalized_source = normalized_source[len(prefix):]
break
# Agent-created skills get their own permissive trust level
if normalized_source == "agent-created":
if source == "agent-created":
return "agent-created"
# Official optional skills shipped with the repo
if normalized_source.startswith("official/") or normalized_source == "official":
if source.startswith("official/") or source == "official":
return "builtin"
# Check if source matches any trusted repo
for trusted in TRUSTED_REPOS:
if normalized_source.startswith(trusted) or normalized_source == trusted:
if source.startswith(trusted) or source == trusted:
return "trusted"
return "community"
+23 -29
View File
@@ -925,10 +925,19 @@ class SkillsShSource(SkillSource):
def inspect(self, identifier: str) -> Optional[SkillMeta]:
canonical = self._normalize_identifier(identifier)
detail: Optional[dict] = None
for candidate in self._candidate_identifiers(canonical):
meta = self.github.inspect(candidate)
if meta:
detail = self._fetch_detail_page(canonical)
return self._finalize_inspect_meta(meta, canonical, detail)
detail = self._fetch_detail_page(canonical)
meta = self._resolve_github_meta(canonical, detail=detail)
if meta:
return self._finalize_inspect_meta(meta, canonical, detail)
resolved = self._discover_identifier(canonical, detail=detail)
if resolved:
meta = self.github.inspect(resolved)
if meta:
return self._finalize_inspect_meta(meta, canonical, detail)
return None
def _featured_skills(self, limit: int) -> List[SkillMeta]:
@@ -1090,13 +1099,6 @@ class SkillsShSource(SkillSource):
if self._matches_skill_tokens(meta, tokens):
return meta.identifier
# Prefer a single recursive tree lookup before brute-forcing every
# top-level directory. This avoids large request bursts on categorized
# repos like borghei/claude-skills.
tree_result = self.github._find_skill_in_repo_tree(repo, skill_token)
if tree_result:
return tree_result
# Fallback: scan repo root for directories that might contain skills
try:
root_url = f"https://api.github.com/repos/{repo}/contents/"
@@ -1129,17 +1131,14 @@ class SkillsShSource(SkillSource):
except Exception:
pass
return None
# Final fallback: use the GitHub Trees API to find the skill anywhere
# in the repo tree. This handles deeply nested structures like
# cli-tool/components/skills/development/<skill>/ that the shallow
# scan above can't reach.
tree_result = self.github._find_skill_in_repo_tree(repo, skill_token)
if tree_result:
return tree_result
def _resolve_github_meta(self, identifier: str, detail: Optional[dict] = None) -> Optional[SkillMeta]:
for candidate in self._candidate_identifiers(identifier):
meta = self.github.inspect(candidate)
if meta:
return meta
resolved = self._discover_identifier(identifier, detail=detail)
if resolved:
return self.github.inspect(resolved)
return None
def _finalize_inspect_meta(self, meta: SkillMeta, canonical: str, detail: Optional[dict]) -> SkillMeta:
@@ -1265,15 +1264,10 @@ class SkillsShSource(SkillSource):
@staticmethod
def _normalize_identifier(identifier: str) -> str:
prefix_aliases = (
"skills-sh/",
"skills.sh/",
"skils-sh/",
"skils.sh/",
)
for prefix in prefix_aliases:
if identifier.startswith(prefix):
return identifier[len(prefix):]
if identifier.startswith("skills-sh/"):
return identifier[len("skills-sh/"):]
if identifier.startswith("skills.sh/"):
return identifier[len("skills.sh/"):]
return identifier
@staticmethod
-36
View File
@@ -248,42 +248,6 @@ TOOLSETS = {
],
"includes": []
},
"hermes-api-server": {
"description": "OpenAI-compatible API server — full agent tools accessible via HTTP (no interactive UI tools like clarify or send_message)",
"tools": [
# Web
"web_search", "web_extract",
# Terminal + process management
"terminal", "process",
# File manipulation
"read_file", "write_file", "patch", "search_files",
# Vision + image generation
"vision_analyze", "image_generate",
# MoA
"mixture_of_agents",
# Skills
"skills_list", "skill_view", "skill_manage",
# Browser automation
"browser_navigate", "browser_snapshot", "browser_click",
"browser_type", "browser_scroll", "browser_back",
"browser_press", "browser_close", "browser_get_images",
"browser_vision", "browser_console",
# Planning & memory
"todo", "memory",
# Session history search
"session_search",
# Code execution + delegation
"execute_code", "delegate_task",
# Cronjob management
"cronjob",
# Home Assistant smart home control (gated on HASS_TOKEN via check_fn)
"ha_list_entities", "ha_get_state", "ha_list_services", "ha_call_service",
# Honcho memory tools (gated on honcho being active via check_fn)
"honcho_context", "honcho_profile", "honcho_search", "honcho_conclude",
],
"includes": []
},
"hermes-cli": {
"description": "Full interactive CLI toolset - all default tools plus cronjob management",
+2 -3
View File
@@ -46,7 +46,7 @@ Type `/` in the CLI to open the autocomplete menu. Built-in commands are case-in
| `/provider` | Show available providers and current provider |
| `/prompt` | View/set custom system prompt |
| `/personality` | Set a predefined personality |
| `/verbose` | Cycle tool progress display: off → new → all → verbose. Can be [enabled for messaging](#notes) via config. |
| `/verbose` | Cycle tool progress display: off → new → all → verbose |
| `/reasoning` | Manage reasoning effort and display (usage: /reasoning [level\|show\|hide]) |
| `/skin` | Show or change the display skin/theme |
| `/voice [on\|off\|tts\|status]` | Toggle CLI voice mode and spoken playback. Recording uses `voice.record_key` (default: `Ctrl+B`). |
@@ -125,8 +125,7 @@ The messaging gateway supports the following built-in commands inside Telegram,
## Notes
- `/skin`, `/tools`, `/toolsets`, `/browser`, `/config`, `/prompt`, `/cron`, `/skills`, `/platforms`, `/paste`, `/statusbar`, and `/plugins` are **CLI-only** commands.
- `/verbose` is **CLI-only by default**, but can be enabled for messaging platforms by setting `display.tool_progress_command: true` in `config.yaml`. When enabled, it cycles the `display.tool_progress` mode and saves to config.
- `/skin`, `/tools`, `/toolsets`, `/browser`, `/config`, `/prompt`, `/cron`, `/skills`, `/platforms`, `/paste`, `/verbose`, `/statusbar`, and `/plugins` are **CLI-only** commands.
- `/status`, `/sethome`, `/update`, `/approve`, and `/deny` are **messaging-only** commands.
- `/background`, `/voice`, `/reload-mcp`, and `/rollback` work in **both** the CLI and the messaging gateway.
- `/voice join`, `/voice channel`, and `/voice leave` are only meaningful on Discord.
+1 -1
View File
@@ -230,7 +230,7 @@ The CLI shows animated feedback as the agent works:
┊ 📄 web_extract (2.1s)
```
Cycle through display modes with `/verbose`: `off → new → all → verbose`. This command can also be enabled for messaging platforms — see [configuration](/docs/user-guide/configuration#display-settings).
Cycle through display modes with `/verbose`: `off → new → all → verbose`.
## Session Management
-3
View File
@@ -1163,7 +1163,6 @@ This controls both the `text_to_speech` tool and spoken replies in voice mode (`
```yaml
display:
tool_progress: all # off | new | all | verbose
tool_progress_command: false # Enable /verbose slash command in messaging gateway
skin: default # Built-in or custom CLI skin (see user-guide/features/skins)
theme_mode: auto # auto | light | dark — color scheme for skin-aware rendering
personality: "kawaii" # Legacy cosmetic field still surfaced in some summaries
@@ -1195,8 +1194,6 @@ This works with any skin — built-in or custom. Skin authors can provide `color
| `all` | Every tool call with a short preview (default) |
| `verbose` | Full args, results, and debug logs |
In the CLI, cycle through these modes with `/verbose`. To use `/verbose` in messaging platforms (Telegram, Discord, Slack, etc.), set `tool_progress_command: true` in the `display` section above. The command will then cycle the mode and save to config.
## Privacy
```yaml
@@ -83,7 +83,6 @@ The handler receives the argument string (everything after `/greet`) and returns
| `aliases` | Tuple of alternative names |
| `cli_only` | Only available in CLI |
| `gateway_only` | Only available in messaging platforms |
| `gateway_config_gate` | Config dotpath (e.g. `"display.my_option"`). When set on a `cli_only` command, the command becomes available in the gateway if the config value is truthy. |
## Managing plugins
@@ -188,7 +188,6 @@ Control how much tool activity is displayed in `~/.hermes/config.yaml`:
```yaml
display:
tool_progress: all # off | new | all | verbose
tool_progress_command: false # set to true to enable /verbose in messaging
```
When enabled, the bot sends status messages as it works: