Compare commits

..

1 Commits

Author SHA1 Message Date
Teknium
beb54ffb93 feat: priority-based context file selection + CLAUDE.md support
Previously, all project context files (AGENTS.md, .cursorrules, .hermes.md)
were loaded and concatenated into the system prompt. This bloated the prompt
with potentially redundant or conflicting instructions.

Now only ONE project context type is loaded, using priority order:
  1. .hermes.md / HERMES.md  (walk to git root)
  2. AGENTS.md / agents.md   (recursive directory walk)
  3. CLAUDE.md / claude.md   (cwd only, NEW)
  4. .cursorrules / .cursor/rules/*.mdc  (cwd only)

SOUL.md from HERMES_HOME remains independent and always loads.

Also adds CLAUDE.md as a recognized context file format, matching the
convention popularized by Claude Code.

Refactored the monolithic function into four focused helpers:
_load_hermes_md, _load_agents_md, _load_claude_md, _load_cursorrules.

Tests: replaced 1 coexistence test with 10 new tests covering priority
ordering, CLAUDE.md loading, case sensitivity, injection blocking.
2026-03-21 06:24:58 -07:00
27 changed files with 143 additions and 1233 deletions

View File

@@ -1,16 +1,8 @@
"""Automatic context window compression for long conversations.
Self-contained class with its own OpenAI client for summarization.
Uses auxiliary model (cheap/fast) to summarize middle turns while
Uses Gemini Flash (cheap/fast) to summarize middle turns while
protecting head and tail context.
Improvements over v1:
- Structured summary template (Goal, Progress, Decisions, Files, Next Steps)
- Iterative summary updates (preserves info across multiple compactions)
- Token-budget tail protection instead of fixed message count
- Tool output pruning before LLM summarization (cheap pre-pass)
- Scaled summary budget (proportional to compressed content)
- Richer tool call/result detail in summarizer input
"""
import logging
@@ -35,31 +27,12 @@ SUMMARY_PREFIX = (
)
LEGACY_SUMMARY_PREFIX = "[CONTEXT SUMMARY]:"
# Minimum / maximum tokens for the summary output
_MIN_SUMMARY_TOKENS = 2000
_MAX_SUMMARY_TOKENS = 8000
# Proportion of compressed content to allocate for summary
_SUMMARY_RATIO = 0.20
# Token budget for tail protection (keep most-recent context)
_DEFAULT_TAIL_TOKEN_BUDGET = 20_000
# Placeholder used when pruning old tool results
_PRUNED_TOOL_PLACEHOLDER = "[Old tool output cleared to save context space]"
# Chars per token rough estimate
_CHARS_PER_TOKEN = 4
class ContextCompressor:
"""Compresses conversation context when approaching the model's context limit.
Algorithm:
1. Prune old tool results (cheap, no LLM call)
2. Protect head messages (system prompt + first exchange)
3. Protect tail messages by token budget (most recent ~20K tokens)
4. Summarize middle turns with structured LLM prompt
5. On subsequent compactions, iteratively update the previous summary
Algorithm: protect first N + last N turns, summarize everything in between.
Token tracking uses actual counts from API responses for accuracy.
"""
def __init__(
@@ -101,9 +74,6 @@ class ContextCompressor:
self.summary_model = summary_model_override or ""
# Stores the previous compaction summary for iterative updates
self._previous_summary: Optional[str] = None
def update_from_response(self, usage: Dict[str, Any]):
"""Update tracked token usage from API response."""
self.last_prompt_tokens = usage.get("prompt_tokens", 0)
@@ -130,204 +100,53 @@ class ContextCompressor:
"compression_count": self.compression_count,
}
# ------------------------------------------------------------------
# Tool output pruning (cheap pre-pass, no LLM call)
# ------------------------------------------------------------------
def _prune_old_tool_results(
self, messages: List[Dict[str, Any]], protect_tail_count: int,
) -> tuple[List[Dict[str, Any]], int]:
"""Replace old tool result contents with a short placeholder.
Walks backward from the end, protecting the most recent
``protect_tail_count`` messages. Older tool results get their
content replaced with a placeholder string.
Returns (pruned_messages, pruned_count).
"""
if not messages:
return messages, 0
result = [m.copy() for m in messages]
pruned = 0
prune_boundary = len(result) - protect_tail_count
for i in range(prune_boundary):
msg = result[i]
if msg.get("role") != "tool":
continue
content = msg.get("content", "")
if not content or content == _PRUNED_TOOL_PLACEHOLDER:
continue
# Only prune if the content is substantial (>200 chars)
if len(content) > 200:
result[i] = {**msg, "content": _PRUNED_TOOL_PLACEHOLDER}
pruned += 1
return result, pruned
# ------------------------------------------------------------------
# Summarization
# ------------------------------------------------------------------
def _compute_summary_budget(self, turns_to_summarize: List[Dict[str, Any]]) -> int:
"""Scale summary token budget with the amount of content being compressed."""
content_tokens = estimate_messages_tokens_rough(turns_to_summarize)
budget = int(content_tokens * _SUMMARY_RATIO)
return max(_MIN_SUMMARY_TOKENS, min(budget, _MAX_SUMMARY_TOKENS))
def _serialize_for_summary(self, turns: List[Dict[str, Any]]) -> str:
"""Serialize conversation turns into labeled text for the summarizer.
Includes tool call arguments and result content (up to 3000 chars
per message) so the summarizer can preserve specific details like
file paths, commands, and outputs.
"""
parts = []
for msg in turns:
role = msg.get("role", "unknown")
content = msg.get("content") or ""
# Tool results: keep more content than before (3000 chars)
if role == "tool":
tool_id = msg.get("tool_call_id", "")
if len(content) > 3000:
content = content[:2000] + "\n...[truncated]...\n" + content[-800:]
parts.append(f"[TOOL RESULT {tool_id}]: {content}")
continue
# Assistant messages: include tool call names AND arguments
if role == "assistant":
if len(content) > 3000:
content = content[:2000] + "\n...[truncated]...\n" + content[-800:]
tool_calls = msg.get("tool_calls", [])
if tool_calls:
tc_parts = []
for tc in tool_calls:
if isinstance(tc, dict):
fn = tc.get("function", {})
name = fn.get("name", "?")
args = fn.get("arguments", "")
# Truncate long arguments but keep enough for context
if len(args) > 500:
args = args[:400] + "..."
tc_parts.append(f" {name}({args})")
else:
fn = getattr(tc, "function", None)
name = getattr(fn, "name", "?") if fn else "?"
tc_parts.append(f" {name}(...)")
content += "\n[Tool calls:\n" + "\n".join(tc_parts) + "\n]"
parts.append(f"[ASSISTANT]: {content}")
continue
# User and other roles
if len(content) > 3000:
content = content[:2000] + "\n...[truncated]...\n" + content[-800:]
parts.append(f"[{role.upper()}]: {content}")
return "\n\n".join(parts)
def _generate_summary(self, turns_to_summarize: List[Dict[str, Any]]) -> Optional[str]:
"""Generate a structured summary of conversation turns.
"""Generate a concise summary of conversation turns.
Uses a structured template (Goal, Progress, Decisions, Files, Next Steps)
inspired by Pi-mono and OpenCode. When a previous summary exists,
generates an iterative update instead of summarizing from scratch.
Returns None if all attempts fail — the caller should drop
Tries the auxiliary model first, then falls back to the user's main
model. Returns None if all attempts fail — the caller should drop
the middle turns without a summary rather than inject a useless
placeholder.
"""
summary_budget = self._compute_summary_budget(turns_to_summarize)
content_to_summarize = self._serialize_for_summary(turns_to_summarize)
parts = []
for msg in turns_to_summarize:
role = msg.get("role", "unknown")
content = msg.get("content") or ""
if len(content) > 2000:
content = content[:1000] + "\n...[truncated]...\n" + content[-500:]
tool_calls = msg.get("tool_calls", [])
if tool_calls:
tool_names = [tc.get("function", {}).get("name", "?") for tc in tool_calls if isinstance(tc, dict)]
content += f"\n[Tool calls: {', '.join(tool_names)}]"
parts.append(f"[{role.upper()}]: {content}")
if self._previous_summary:
# Iterative update: preserve existing info, add new progress
prompt = f"""You are updating a context compaction summary. A previous compaction produced the summary below. New conversation turns have occurred since then and need to be incorporated.
content_to_summarize = "\n\n".join(parts)
prompt = f"""Create a concise handoff summary for a later assistant that will continue this conversation after earlier turns are compacted.
PREVIOUS SUMMARY:
{self._previous_summary}
Describe:
1. What actions were taken (tool calls, searches, file operations)
2. Key information or results obtained
3. Important decisions, constraints, or user preferences
4. Relevant data, file names, outputs, or next steps needed to continue
NEW TURNS TO INCORPORATE:
{content_to_summarize}
Update the summary using this exact structure. PRESERVE all existing information that is still relevant. ADD new progress. Move items from "In Progress" to "Done" when completed. Remove information only if it is clearly obsolete.
## Goal
[What the user is trying to accomplish — preserve from previous summary, update if goal evolved]
## Constraints & Preferences
[User preferences, coding style, constraints, important decisions — accumulate across compactions]
## Progress
### Done
[Completed work — include specific file paths, commands run, results obtained]
### In Progress
[Work currently underway]
### Blocked
[Any blockers or issues encountered]
## Key Decisions
[Important technical decisions and why they were made]
## Relevant Files
[Files read, modified, or created — with brief note on each. Accumulate across compactions.]
## Next Steps
[What needs to happen next to continue the work]
## Critical Context
[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation]
Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions.
Write only the summary body. Do not include any preamble or prefix."""
else:
# First compaction: summarize from scratch
prompt = f"""Create a structured handoff summary for a later assistant that will continue this conversation after earlier turns are compacted.
Keep it factual, concise, and focused on helping the next assistant resume without repeating work. Target ~{self.summary_target_tokens} tokens.
---
TURNS TO SUMMARIZE:
{content_to_summarize}
---
Use this exact structure:
## Goal
[What the user is trying to accomplish]
## Constraints & Preferences
[User preferences, coding style, constraints, important decisions]
## Progress
### Done
[Completed work — include specific file paths, commands run, results obtained]
### In Progress
[Work currently underway]
### Blocked
[Any blockers or issues encountered]
## Key Decisions
[Important technical decisions and why they were made]
## Relevant Files
[Files read, modified, or created — with brief note on each]
## Next Steps
[What needs to happen next to continue the work]
## Critical Context
[Any specific values, error messages, configuration details, or data that would be lost without explicit preservation]
Target ~{summary_budget} tokens. Be specific — include file paths, command outputs, error messages, and concrete values rather than vague descriptions. The goal is to prevent the next assistant from repeating work or losing important details.
Write only the summary body. Do not include any preamble or prefix."""
Write only the summary body. Do not include any preamble or prefix; the system will add the handoff wrapper."""
# Use the centralized LLM router — handles provider resolution,
# auth, and fallback internally.
try:
call_kwargs = {
"task": "compression",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": summary_budget * 2,
"timeout": 45.0,
"max_tokens": self.summary_target_tokens * 2,
"timeout": 30.0,
}
if self.summary_model:
call_kwargs["model"] = self.summary_model
@@ -337,8 +156,6 @@ Write only the summary body. Do not include any preamble or prefix."""
if not isinstance(content, str):
content = str(content) if content else ""
summary = content.strip()
# Store for iterative updates on next compaction
self._previous_summary = summary
return self._with_summary_prefix(summary)
except RuntimeError:
logging.warning("Context compression: no provider available for "
@@ -463,69 +280,10 @@ Write only the summary body. Do not include any preamble or prefix."""
idx = check
return idx
# ------------------------------------------------------------------
# Tail protection by token budget
# ------------------------------------------------------------------
def _find_tail_cut_by_tokens(
self, messages: List[Dict[str, Any]], head_end: int,
token_budget: int = _DEFAULT_TAIL_TOKEN_BUDGET,
) -> int:
"""Walk backward from the end of messages, accumulating tokens until
the budget is reached. Returns the index where the tail starts.
Never cuts inside a tool_call/result group. Falls back to the old
``protect_last_n`` if the budget would protect fewer messages.
"""
n = len(messages)
min_tail = self.protect_last_n
accumulated = 0
cut_idx = n # start from beyond the end
for i in range(n - 1, head_end - 1, -1):
msg = messages[i]
content = msg.get("content") or ""
msg_tokens = len(content) // _CHARS_PER_TOKEN + 10 # +10 for role/metadata
# Include tool call arguments in estimate
for tc in msg.get("tool_calls") or []:
if isinstance(tc, dict):
args = tc.get("function", {}).get("arguments", "")
msg_tokens += len(args) // _CHARS_PER_TOKEN
if accumulated + msg_tokens > token_budget and (n - i) >= min_tail:
break
accumulated += msg_tokens
cut_idx = i
# Ensure we protect at least protect_last_n messages
fallback_cut = n - min_tail
if cut_idx > fallback_cut:
cut_idx = fallback_cut
# If the token budget would protect everything (small conversations),
# fall back to the fixed protect_last_n approach so compression can
# still remove middle turns.
if cut_idx <= head_end:
cut_idx = fallback_cut
# Align to avoid splitting tool groups
cut_idx = self._align_boundary_backward(messages, cut_idx)
return max(cut_idx, head_end + 1)
# ------------------------------------------------------------------
# Main compression entry point
# ------------------------------------------------------------------
def compress(self, messages: List[Dict[str, Any]], current_tokens: int = None) -> List[Dict[str, Any]]:
"""Compress conversation messages by summarizing middle turns.
Algorithm:
1. Prune old tool results (cheap pre-pass, no LLM call)
2. Protect head messages (system prompt + first exchange)
3. Find tail boundary by token budget (~20K tokens of recent context)
4. Summarize middle turns with structured LLM prompt
5. On re-compression, iteratively update the previous summary
Keeps first N + last N turns, summarizes everything in between.
After compression, orphaned tool_call / tool_result pairs are cleaned
up so the API never receives mismatched IDs.
"""
@@ -539,26 +297,19 @@ Write only the summary body. Do not include any preamble or prefix."""
)
return messages
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
# Phase 1: Prune old tool results (cheap, no LLM call)
messages, pruned_count = self._prune_old_tool_results(
messages, protect_tail_count=self.protect_last_n * 3,
)
if pruned_count and not self.quiet_mode:
logger.info("Pre-compression: pruned %d old tool result(s)", pruned_count)
# Phase 2: Determine boundaries
compress_start = self.protect_first_n
compress_end = n_messages - self.protect_last_n
if compress_start >= compress_end:
return messages
# Adjust boundaries to avoid splitting tool_call/result groups.
compress_start = self._align_boundary_forward(messages, compress_start)
# Use token-budget tail protection instead of fixed message count
compress_end = self._find_tail_cut_by_tokens(messages, compress_start)
compress_end = self._align_boundary_backward(messages, compress_end)
if compress_start >= compress_end:
return messages
turns_to_summarize = messages[compress_start:compress_end]
display_tokens = current_tokens if current_tokens else self.last_prompt_tokens or estimate_messages_tokens_rough(messages)
if not self.quiet_mode:
logger.info(
@@ -572,20 +323,15 @@ Write only the summary body. Do not include any preamble or prefix."""
self.threshold_percent * 100,
self.threshold_tokens,
)
tail_msgs = n_messages - compress_end
logger.info(
"Summarizing turns %d-%d (%d turns), protecting %d head + %d tail messages",
"Summarizing turns %d-%d (%d turns)",
compress_start + 1,
compress_end,
len(turns_to_summarize),
compress_start,
tail_msgs,
)
# Phase 3: Generate structured summary
summary = self._generate_summary(turns_to_summarize)
# Phase 4: Assemble compressed message list
compressed = []
for i in range(compress_start):
msg = messages[i].copy()

View File

@@ -128,7 +128,6 @@ def _extract_tool_stats(messages: List[Dict[str, Any]]) -> Dict[str, Dict[str, i
# Track tool calls from assistant messages
if msg["role"] == "assistant" and "tool_calls" in msg and msg["tool_calls"]:
for tool_call in msg["tool_calls"]:
if not tool_call or not isinstance(tool_call, dict): continue
tool_name = tool_call["function"]["name"]
tool_call_id = tool_call["id"]

View File

@@ -696,8 +696,8 @@ display:
# Stream tokens to the terminal as they arrive instead of waiting for the
# full response. The response box opens on first token and text appears
# line-by-line. Tool calls are still captured silently.
# Stream tokens to the terminal in real-time. Disable to wait for full responses.
streaming: true
# Disabled by default — enable to try the streaming UX.
streaming: false
# ───────────────────────────────────────────────────────────────────────────
# Skin / Theme

115
cli.py
View File

@@ -216,7 +216,7 @@ def load_cli_config() -> Dict[str, Any]:
"compact": False,
"resume_display": "full",
"show_reasoning": False,
"streaming": True,
"streaming": False,
"skin": "default",
},
@@ -398,7 +398,7 @@ def load_cli_config() -> Dict[str, Any]:
"provider": "AUXILIARY_WEB_EXTRACT_PROVIDER",
"model": "AUXILIARY_WEB_EXTRACT_MODEL",
"base_url": "AUXILIARY_WEB_EXTRACT_BASE_URL",
"api_key": "AUXILIARY_WEB_EXTRACT_API_KEY",
"api_key": "AUXILI..._KEY",
},
"approval": {
"provider": "AUXILIARY_APPROVAL_PROVIDER",
@@ -1473,15 +1473,9 @@ class HermesCLI:
Opens a dim reasoning box on first token, streams line-by-line.
The box is closed automatically when content tokens start arriving
(via _stream_delta → _emit_stream_text).
Once the response box is open, suppress any further reasoning
rendering — a late thinking block (e.g. after an interrupt) would
otherwise draw a reasoning box inside the response box.
"""
if not text:
return
if getattr(self, "_stream_box_opened", False):
return
# Open reasoning box on first reasoning token
if not getattr(self, "_reasoning_box_opened", False):
@@ -5779,73 +5773,6 @@ class HermesCLI:
self._invalidate(min_interval=0.0)
return True
# --- Protected TUI extension hooks for wrapper CLIs ---
def _get_extra_tui_widgets(self) -> list:
"""Return extra prompt_toolkit widgets to insert into the TUI layout.
Wrapper CLIs can override this to inject widgets (e.g. a mini-player,
overlay menu) into the layout without overriding ``run()``. Widgets
are inserted between the spacer and the status bar.
"""
return []
def _register_extra_tui_keybindings(self, kb, *, input_area) -> None:
"""Register extra keybindings on the TUI ``KeyBindings`` object.
Wrapper CLIs can override this to add keybindings (e.g. transport
controls, modal shortcuts) without overriding ``run()``.
Parameters
----------
kb : KeyBindings
The active keybinding registry for the prompt_toolkit application.
input_area : TextArea
The main input widget, for wrappers that need to inspect or
manipulate user input from a keybinding handler.
"""
def _build_tui_layout_children(
self,
*,
sudo_widget,
secret_widget,
approval_widget,
clarify_widget,
spinner_widget,
spacer,
status_bar,
input_rule_top,
image_bar,
input_area,
input_rule_bot,
voice_status_bar,
completions_menu,
) -> list:
"""Assemble the ordered list of children for the root ``HSplit``.
Wrapper CLIs typically override ``_get_extra_tui_widgets`` instead of
this method. Override this only when you need full control over widget
ordering.
"""
return [
Window(height=0),
sudo_widget,
secret_widget,
approval_widget,
clarify_widget,
spinner_widget,
spacer,
*self._get_extra_tui_widgets(),
status_bar,
input_rule_top,
image_bar,
input_area,
input_rule_bot,
voice_status_bar,
completions_menu,
]
def run(self):
"""Run the interactive CLI loop with persistent input at bottom."""
self.show_banner()
@@ -6808,32 +6735,26 @@ class HermesCLI:
filter=Condition(lambda: cli_ref._status_bar_visible),
)
# Allow wrapper CLIs to register extra keybindings.
self._register_extra_tui_keybindings(kb, input_area=input_area)
# Layout: interactive prompt widgets + ruled input at bottom.
# The sudo, approval, and clarify widgets appear above the input when
# the corresponding interactive prompt is active.
completions_menu = CompletionsMenu(max_height=12, scroll_offset=1)
layout = Layout(
HSplit(
self._build_tui_layout_children(
sudo_widget=sudo_widget,
secret_widget=secret_widget,
approval_widget=approval_widget,
clarify_widget=clarify_widget,
spinner_widget=spinner_widget,
spacer=spacer,
status_bar=status_bar,
input_rule_top=input_rule_top,
image_bar=image_bar,
input_area=input_area,
input_rule_bot=input_rule_bot,
voice_status_bar=voice_status_bar,
completions_menu=completions_menu,
)
)
HSplit([
Window(height=0),
sudo_widget,
secret_widget,
approval_widget,
clarify_widget,
spinner_widget,
spacer,
status_bar,
input_rule_top,
image_bar,
input_area,
input_rule_bot,
voice_status_bar,
CompletionsMenu(max_height=12, scroll_offset=1),
])
)
# Style for the application

View File

@@ -159,29 +159,15 @@ def _deliver_result(job: dict, content: str) -> None:
logger.warning("Job '%s': platform '%s' not configured/enabled", job["id"], platform_name)
return
# Wrap the content so the user knows this is a cron delivery and that
# the interactive agent has no visibility into it.
task_name = job.get("name", job["id"])
wrapped = (
f"Cronjob Response: {task_name}\n"
f"-------------\n\n"
f"{content}\n\n"
f"Note: The agent cannot see this message, and therefore cannot respond to it."
)
# Run the async send in a fresh event loop (safe from any thread)
coro = _send_to_platform(platform, pconfig, chat_id, wrapped, thread_id=thread_id)
try:
result = asyncio.run(coro)
result = asyncio.run(_send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id))
except RuntimeError:
# asyncio.run() checks for a running loop before awaiting the coroutine;
# when it raises, the original coro was never started — close it to
# prevent "coroutine was never awaited" RuntimeWarning, then retry in a
# fresh thread that has no running loop.
coro.close()
# asyncio.run() fails if there's already a running loop in this thread;
# spin up a new thread to avoid that.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, wrapped, thread_id=thread_id))
future = pool.submit(asyncio.run, _send_to_platform(platform, pconfig, chat_id, content, thread_id=thread_id))
result = future.result(timeout=30)
except Exception as e:
logger.error("Job '%s': delivery to %s:%s failed: %s", job["id"], platform_name, chat_id, e)
@@ -191,6 +177,12 @@ def _deliver_result(job: dict, content: str) -> None:
logger.error("Job '%s': delivery error: %s", job["id"], result["error"])
else:
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
# Mirror the delivered content into the target's gateway session
try:
from gateway.mirror import mirror_to_session
mirror_to_session(platform_name, chat_id, content, source_label="cron", thread_id=thread_id)
except Exception as e:
logger.warning("Job '%s': mirror_to_session failed: %s", job["id"], e)
def _build_job_prompt(job: dict) -> str:

View File

@@ -455,27 +455,11 @@ def load_gateway_config() -> GatewayConfig:
"pair",
)
# Merge platforms section from config.yaml into gw_data so that
# nested keys like platforms.webhook.extra.routes are loaded.
yaml_platforms = yaml_cfg.get("platforms")
# Bridge per-platform settings from config.yaml into gw_data
platforms_data = gw_data.setdefault("platforms", {})
if not isinstance(platforms_data, dict):
platforms_data = {}
gw_data["platforms"] = platforms_data
if isinstance(yaml_platforms, dict):
for plat_name, plat_block in yaml_platforms.items():
if not isinstance(plat_block, dict):
continue
existing = platforms_data.get(plat_name, {})
if not isinstance(existing, dict):
existing = {}
# Deep-merge extra dicts so gateway.json defaults survive
merged_extra = {**existing.get("extra", {}), **plat_block.get("extra", {})}
merged = {**existing, **plat_block}
if merged_extra:
merged["extra"] = merged_extra
platforms_data[plat_name] = merged
gw_data["platforms"] = platforms_data
for plat in Platform:
if plat == Platform.LOCAL:
continue

View File

@@ -617,16 +617,16 @@ class MattermostAdapter(BasePlatformAdapter):
if mime.startswith("image/"):
local_path = cache_image_from_bytes(file_data, ext or ".png")
media_urls.append(local_path)
media_types.append(mime)
media_types.append("image")
elif mime.startswith("audio/"):
from gateway.platforms.base import cache_audio_from_bytes
local_path = cache_audio_from_bytes(file_data, ext or ".ogg")
media_urls.append(local_path)
media_types.append(mime)
media_types.append("audio")
else:
local_path = cache_document_from_bytes(file_data, fname)
media_urls.append(local_path)
media_types.append(mime)
media_types.append("document")
else:
logger.warning("Mattermost: failed to download file %s: HTTP %s", fid, resp.status)
except Exception as exc:

View File

@@ -129,8 +129,6 @@ class TelegramAdapter(BasePlatformAdapter):
self._pending_text_batch_tasks: Dict[str, asyncio.Task] = {}
self._token_lock_identity: Optional[str] = None
self._polling_error_task: Optional[asyncio.Task] = None
self._polling_conflict_count: int = 0
self._polling_error_callback_ref = None
@staticmethod
def _looks_like_polling_conflict(error: Exception) -> bool:
@@ -144,49 +142,10 @@ class TelegramAdapter(BasePlatformAdapter):
async def _handle_polling_conflict(self, error: Exception) -> None:
if self.has_fatal_error and self.fatal_error_code == "telegram_polling_conflict":
return
# Track consecutive conflicts — transient 409s can occur when a
# previous gateway instance hasn't fully released its long-poll
# session on Telegram's server (e.g. during --replace handoffs or
# systemd Restart=on-failure respawns). Retry a few times before
# giving up, so the old session has time to expire.
self._polling_conflict_count += 1
MAX_CONFLICT_RETRIES = 3
RETRY_DELAY = 10 # seconds
if self._polling_conflict_count <= MAX_CONFLICT_RETRIES:
logger.warning(
"[%s] Telegram polling conflict (%d/%d), will retry in %ds. Error: %s",
self.name, self._polling_conflict_count, MAX_CONFLICT_RETRIES,
RETRY_DELAY, error,
)
try:
if self._app and self._app.updater and self._app.updater.running:
await self._app.updater.stop()
except Exception:
pass
await asyncio.sleep(RETRY_DELAY)
try:
await self._app.updater.start_polling(
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=False,
error_callback=self._polling_error_callback_ref,
)
logger.info("[%s] Telegram polling resumed after conflict retry %d", self.name, self._polling_conflict_count)
self._polling_conflict_count = 0 # reset on success
return
except Exception as retry_err:
logger.warning("[%s] Telegram polling retry failed: %s", self.name, retry_err)
# Don't fall through to fatal yet — wait for the next conflict
# to trigger another retry attempt (up to MAX_CONFLICT_RETRIES).
return
# Exhausted retries — fatal
message = (
"Another Telegram bot poller is already using this token. "
"Hermes stopped Telegram polling after %d retries. "
"Hermes stopped Telegram polling to avoid endless retry spam. "
"Make sure only one gateway instance is running for this bot token."
% MAX_CONFLICT_RETRIES
)
logger.error("[%s] %s Original error: %s", self.name, message, error)
self._set_fatal_error("telegram_polling_conflict", message, retryable=False)
@@ -283,9 +242,6 @@ class TelegramAdapter(BasePlatformAdapter):
return
self._polling_error_task = loop.create_task(self._handle_polling_conflict(error))
# Store reference for retry use in _handle_polling_conflict
self._polling_error_callback_ref = _polling_error_callback
await self._app.updater.start_polling(
allowed_updates=Update.ALL_TYPES,
drop_pending_updates=True,

View File

@@ -2248,8 +2248,7 @@ class GatewayRunner:
)
# Auto voice reply: send TTS audio before the text response
_already_sent = bool(agent_result.get("already_sent"))
if self._should_send_voice_reply(event, response, agent_messages, already_sent=_already_sent):
if self._should_send_voice_reply(event, response, agent_messages):
await self._send_voice_reply(event, response)
# If streaming already delivered the response, return None so
@@ -3055,7 +3054,6 @@ class GatewayRunner:
event: MessageEvent,
response: str,
agent_messages: list,
already_sent: bool = False,
) -> bool:
"""Decide whether the runner should send a TTS voice reply.
@@ -3064,9 +3062,8 @@ class GatewayRunner:
- response is empty or an error
- agent already called text_to_speech tool (dedup)
- voice input and base adapter auto-TTS already handled it (skip_double)
UNLESS streaming already consumed the response (already_sent=True),
in which case the base adapter won't have text for auto-TTS so the
runner must handle it.
Exception: Discord voice channel — base play_tts is a no-op there,
so the runner must handle VC playback.
"""
if not response or response.startswith("Error:"):
return False
@@ -3096,10 +3093,7 @@ class GatewayRunner:
# Dedup: base adapter auto-TTS already handles voice input
# (play_tts plays in VC when connected, so runner can skip).
# When streaming already delivered the text (already_sent=True),
# the base adapter will receive None and can't run auto-TTS,
# so the runner must take over.
if is_voice_input and not already_sent:
if is_voice_input:
return False
return True

View File

@@ -420,8 +420,6 @@ def generate_systemd_unit(system: bool = False, run_as_user: str | None = None)
Description={SERVICE_DESCRIPTION}
After=network-online.target
Wants=network-online.target
StartLimitIntervalSec=600
StartLimitBurst=5
[Service]
Type=simple
@@ -436,7 +434,7 @@ Environment="PATH={sane_path}"
Environment="VIRTUAL_ENV={venv_dir}"
Environment="HERMES_HOME={hermes_home}"
Restart=on-failure
RestartSec=30
RestartSec=10
KillMode=mixed
KillSignal=SIGTERM
TimeoutStopSec=60
@@ -450,8 +448,6 @@ WantedBy=multi-user.target
return f"""[Unit]
Description={SERVICE_DESCRIPTION}
After=network.target
StartLimitIntervalSec=600
StartLimitBurst=5
[Service]
Type=simple
@@ -461,7 +457,7 @@ Environment="PATH={sane_path}"
Environment="VIRTUAL_ENV={venv_dir}"
Environment="HERMES_HOME={hermes_home}"
Restart=on-failure
RestartSec=30
RestartSec=10
KillMode=mixed
KillSignal=SIGTERM
TimeoutStopSec=60

View File

@@ -2688,7 +2688,7 @@ def cmd_update(args):
print("→ Pulling updates...")
try:
subprocess.run(git_cmd + ["pull", "--ff-only", "origin", branch], cwd=PROJECT_ROOT, check=True)
subprocess.run(git_cmd + ["pull", "origin", branch], cwd=PROJECT_ROOT, check=True)
finally:
if auto_stash_ref is not None:
_restore_stashed_changes(

View File

@@ -5,8 +5,7 @@ Hermes Plugin System
Discovers, loads, and manages plugins from three sources:
1. **User plugins** ``~/.hermes/plugins/<name>/``
2. **Project plugins** ``./.hermes/plugins/<name>/`` (opt-in via
``HERMES_ENABLE_PROJECT_PLUGINS``)
2. **Project plugins** ``./.hermes/plugins/<name>/``
3. **Pip plugins** packages that expose the ``hermes_agent.plugins``
entry-point group.
@@ -63,11 +62,6 @@ ENTRY_POINTS_GROUP = "hermes_agent.plugins"
_NS_PARENT = "hermes_plugins"
def _env_enabled(name: str) -> bool:
"""Return True when an env var is set to a truthy opt-in value."""
return os.getenv(name, "").strip().lower() in {"1", "true", "yes", "on"}
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@@ -192,9 +186,8 @@ class PluginManager:
manifests.extend(self._scan_directory(user_dir, source="user"))
# 2. Project plugins (./.hermes/plugins/)
if _env_enabled("HERMES_ENABLE_PROJECT_PLUGINS"):
project_dir = Path.cwd() / ".hermes" / "plugins"
manifests.extend(self._scan_directory(project_dir, source="project"))
project_dir = Path.cwd() / ".hermes" / "plugins"
manifests.extend(self._scan_directory(project_dir, source="project"))
# 3. Pip / entry-point plugins
manifests.extend(self._scan_entry_points())

View File

@@ -339,7 +339,6 @@ class MiniSWERunner:
# Add tool calls in XML format
for tool_call in msg["tool_calls"]:
if not tool_call or not isinstance(tool_call, dict): continue
try:
arguments = json.loads(tool_call["function"]["arguments"]) \
if isinstance(tool_call["function"]["arguments"], str) \

View File

@@ -1374,6 +1374,9 @@ class AIAgent:
def _run_review():
import contextlib, os as _os
try:
# Redirect stdout to devnull so spinners, cute messages,
# and any other print() calls from the review agent don't
# leak into the main CLI display.
with open(_os.devnull, "w") as _devnull, \
contextlib.redirect_stdout(_devnull):
review_agent = AIAgent(
@@ -1393,39 +1396,6 @@ class AIAgent:
user_message=prompt,
conversation_history=messages_snapshot,
)
# Scan the review agent's messages for successful tool actions
# and surface a compact summary to the user.
actions = []
for msg in getattr(review_agent, "_session_messages", []):
if not isinstance(msg, dict) or msg.get("role") != "tool":
continue
try:
data = json.loads(msg.get("content", "{}"))
except (json.JSONDecodeError, TypeError):
continue
if not data.get("success"):
continue
message = data.get("message", "")
target = data.get("target", "")
if "created" in message.lower():
actions.append(message)
elif "updated" in message.lower():
actions.append(message)
elif "added" in message.lower() or (target and "add" in message.lower()):
label = "Memory" if target == "memory" else "User profile" if target == "user" else target
actions.append(f"{label} updated")
elif "Entry added" in message:
label = "Memory" if target == "memory" else "User profile" if target == "user" else target
actions.append(f"{label} updated")
elif "removed" in message.lower() or "replaced" in message.lower():
label = "Memory" if target == "memory" else "User profile" if target == "user" else target
actions.append(f"{label} updated")
if actions:
summary = " · ".join(dict.fromkeys(actions))
self._safe_print(f" 💾 {summary}")
except Exception as e:
logger.debug("Background memory/skill review failed: %s", e)
@@ -1618,7 +1588,6 @@ class AIAgent:
# Add tool calls wrapped in XML tags
for tool_call in msg["tool_calls"]:
if not tool_call or not isinstance(tool_call, dict): continue
# Parse arguments - should always succeed since we validate during conversation
# but keep try-except as safety net
try:
@@ -2330,18 +2299,6 @@ class AIAgent:
timestamp_line += f"\nProvider: {self.provider}"
prompt_parts.append(timestamp_line)
# Alibaba Coding Plan API always returns "glm-4.7" as model name regardless
# of the requested model. Inject explicit model identity into the system prompt
# so the agent can correctly report which model it is (workaround for API bug).
if self.provider in ("alibaba-coding-plan", "alibaba-coding-plan-anthropic"):
_model_short = self.model.split("/")[-1] if "/" in self.model else self.model
prompt_parts.append(
f"You are powered by the model named {_model_short}. "
f"The exact model ID is {self.model}. "
f"When asked what model you are, always answer based on this information, "
f"not on any model name returned by the API."
)
platform_key = (self.platform or "").lower().strip()
if platform_key in PLATFORM_HINTS:
prompt_parts.append(PLATFORM_HINTS[platform_key])
@@ -6784,7 +6741,6 @@ class AIAgent:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_names = []
for tc in msg["tool_calls"]:
if not tc or not isinstance(tc, dict): continue
fn = tc.get("function", {})
tool_names.append(fn.get("name", "unknown"))
msg["content"] = f"Calling the {', '.join(tool_names)} tool{'s' if len(tool_names) > 1 else ''}..."
@@ -6827,7 +6783,6 @@ class AIAgent:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_names = []
for tc in msg["tool_calls"]:
if not tc or not isinstance(tc, dict): continue
fn = tc.get("function", {})
tool_names.append(fn.get("name", "unknown"))
msg["content"] = f"Calling the {', '.join(tool_names)} tool{'s' if len(tool_names) > 1 else ''}..."
@@ -6947,7 +6902,6 @@ class AIAgent:
if isinstance(m, dict) and m.get("role") == "tool"
}
for tc in msg["tool_calls"]:
if not tc or not isinstance(tc, dict): continue
if tc["id"] not in answered_ids:
err_msg = {
"role": "tool",
@@ -6958,18 +6912,20 @@ class AIAgent:
pending_handled = True
break
# Non-tool errors don't need a synthetic message injected.
# The error is already printed to the user (line above), and
# the retry loop continues. Injecting a fake user/assistant
# message pollutes history, burns tokens, and risks violating
# role-alternation invariants.
if not pending_handled:
# Error happened before tool processing (e.g. response parsing).
# Choose role to avoid consecutive same-role messages.
last_role = messages[-1].get("role") if messages else None
err_role = "assistant" if last_role == "user" else "user"
sys_err_msg = {
"role": err_role,
"content": f"[System error during processing: {error_msg}]",
}
messages.append(sys_err_msg)
# If we're near the limit, break to avoid infinite loops
if api_call_count >= self.max_iterations - 1:
final_response = f"I apologize, but I encountered repeated errors: {error_msg}"
# Append as assistant so the history stays valid for
# session resume (avoids consecutive user messages).
messages.append({"role": "assistant", "content": final_response})
break
if final_response is None and (

View File

@@ -82,15 +82,13 @@ def generate_systemd_unit() -> str:
return f"""[Unit]
Description={SERVICE_DESCRIPTION}
After=network.target
StartLimitIntervalSec=600
StartLimitBurst=5
[Service]
Type=simple
ExecStart={python_path} {script_path} run
WorkingDirectory={working_dir}
Restart=on-failure
RestartSec=30
RestartSec=10
StandardOutput=journal
StandardError=journal

View File

@@ -577,7 +577,7 @@ clone_repo() {
git fetch origin
git checkout "$BRANCH"
git pull --ff-only origin "$BRANCH"
git pull origin "$BRANCH"
if [ -n "$autostash_ref" ]; then
local restore_now="yes"
@@ -772,12 +772,6 @@ setup_path() {
case "$LOGIN_SHELL" in
zsh)
[ -f "$HOME/.zshrc" ] && SHELL_CONFIGS+=("$HOME/.zshrc")
[ -f "$HOME/.zprofile" ] && SHELL_CONFIGS+=("$HOME/.zprofile")
# If neither exists, create ~/.zshrc (common on fresh macOS installs)
if [ ${#SHELL_CONFIGS[@]} -eq 0 ]; then
touch "$HOME/.zshrc"
SHELL_CONFIGS+=("$HOME/.zshrc")
fi
;;
bash)
[ -f "$HOME/.bashrc" ] && SHELL_CONFIGS+=("$HOME/.bashrc")

View File

@@ -95,58 +95,11 @@ class TestResolveDeliveryTarget:
}
class TestDeliverResultWrapping:
"""Verify that cron deliveries are wrapped with header/footer and no longer mirrored."""
class TestDeliverResultMirrorLogging:
"""Verify that mirror_to_session failures are logged, not silently swallowed."""
def test_delivery_wraps_content_with_header_and_footer(self):
"""Delivered content should include task name header and agent-invisible note."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "test-job",
"name": "daily-report",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Here is today's summary.")
send_mock.assert_called_once()
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: daily-report" in sent_content
assert "-------------" in sent_content
assert "Here is today's summary." in sent_content
assert "The agent cannot see this message" in sent_content
def test_delivery_uses_job_id_when_no_name(self):
"""When a job has no name, the wrapper should fall back to job id."""
from gateway.config import Platform
pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
job = {
"id": "abc-123",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Output.")
sent_content = send_mock.call_args.kwargs.get("content") or send_mock.call_args[0][-1]
assert "Cronjob Response: abc-123" in sent_content
def test_no_mirror_to_session_call(self):
"""Cron deliveries should NOT mirror into the gateway session."""
def test_mirror_failure_is_logged(self, caplog):
"""When mirror_to_session raises, a warning should be logged."""
from gateway.config import Platform
pconfig = MagicMock()
@@ -156,18 +109,20 @@ class TestDeliverResultWrapping:
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})), \
patch("gateway.mirror.mirror_to_session") as mirror_mock:
patch("gateway.mirror.mirror_to_session", side_effect=ConnectionError("network down")):
job = {
"id": "test-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}
_deliver_result(job, "Hello!")
with caplog.at_level(logging.WARNING, logger="cron.scheduler"):
_deliver_result(job, "Hello!")
mirror_mock.assert_not_called()
assert any("mirror_to_session failed" in r.message for r in caplog.records), \
f"Expected 'mirror_to_session failed' warning in logs, got: {[r.message for r in caplog.records]}"
def test_origin_delivery_preserves_thread_id(self):
"""Origin delivery should forward thread_id to the send helper."""
"""Origin delivery should forward thread_id to send/mirror helpers."""
from gateway.config import Platform
pconfig = MagicMock()
@@ -177,7 +132,6 @@ class TestDeliverResultWrapping:
job = {
"id": "test-job",
"name": "topic-job",
"deliver": "origin",
"origin": {
"platform": "telegram",
@@ -187,11 +141,19 @@ class TestDeliverResultWrapping:
}
with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock:
patch("tools.send_message_tool._send_to_platform", new=AsyncMock(return_value={"success": True})) as send_mock, \
patch("gateway.mirror.mirror_to_session") as mirror_mock:
_deliver_result(job, "hello")
send_mock.assert_called_once()
assert send_mock.call_args.kwargs["thread_id"] == "17585"
mirror_mock.assert_called_once_with(
"telegram",
"-1001",
"hello",
source_label="cron",
thread_id="17585",
)
class TestRunJobSessionPersistence:

View File

@@ -572,102 +572,3 @@ class TestMattermostRequirements:
monkeypatch.delenv("MATTERMOST_URL", raising=False)
from gateway.platforms.mattermost import check_mattermost_requirements
assert check_mattermost_requirements() is False
# ---------------------------------------------------------------------------
# Media type propagation (MIME types, not bare strings)
# ---------------------------------------------------------------------------
class TestMattermostMediaTypes:
"""Verify that media_types contains actual MIME types (e.g. 'image/png')
rather than bare category strings ('image'), so downstream
``mtype.startswith("image/")`` checks in run.py work correctly."""
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._bot_user_id = "bot_user_id"
self.adapter.handle_message = AsyncMock()
def _make_event(self, file_ids):
post_data = {
"id": "post_media",
"user_id": "user_123",
"channel_id": "chan_456",
"message": "file attached",
"file_ids": file_ids,
}
return {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": "O",
"sender_name": "@alice",
},
}
@pytest.mark.asyncio
async def test_image_media_type_is_full_mime(self):
"""An image attachment should produce 'image/png', not 'image'."""
file_info = {"name": "photo.png", "mime_type": "image/png"}
self.adapter._api_get = AsyncMock(return_value=file_info)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.read = AsyncMock(return_value=b"\x89PNG fake")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session = MagicMock()
self.adapter._session.get = MagicMock(return_value=mock_resp)
with patch("gateway.platforms.base.cache_image_from_bytes", return_value="/tmp/photo.png"):
await self.adapter._handle_ws_event(self._make_event(["file1"]))
msg = self.adapter.handle_message.call_args[0][0]
assert msg.media_types == ["image/png"]
assert msg.media_types[0].startswith("image/")
@pytest.mark.asyncio
async def test_audio_media_type_is_full_mime(self):
"""An audio attachment should produce 'audio/ogg', not 'audio'."""
file_info = {"name": "voice.ogg", "mime_type": "audio/ogg"}
self.adapter._api_get = AsyncMock(return_value=file_info)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.read = AsyncMock(return_value=b"OGG fake")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session = MagicMock()
self.adapter._session.get = MagicMock(return_value=mock_resp)
with patch("gateway.platforms.base.cache_audio_from_bytes", return_value="/tmp/voice.ogg"), \
patch("gateway.platforms.base.cache_image_from_bytes"), \
patch("gateway.platforms.base.cache_document_from_bytes"):
await self.adapter._handle_ws_event(self._make_event(["file2"]))
msg = self.adapter.handle_message.call_args[0][0]
assert msg.media_types == ["audio/ogg"]
assert msg.media_types[0].startswith("audio/")
@pytest.mark.asyncio
async def test_document_media_type_is_full_mime(self):
"""A document attachment should produce 'application/pdf', not 'document'."""
file_info = {"name": "report.pdf", "mime_type": "application/pdf"}
self.adapter._api_get = AsyncMock(return_value=file_info)
mock_resp = AsyncMock()
mock_resp.status = 200
mock_resp.read = AsyncMock(return_value=b"PDF fake")
mock_resp.__aenter__ = AsyncMock(return_value=mock_resp)
mock_resp.__aexit__ = AsyncMock(return_value=False)
self.adapter._session = MagicMock()
self.adapter._session.get = MagicMock(return_value=mock_resp)
with patch("gateway.platforms.base.cache_document_from_bytes", return_value="/tmp/report.pdf"), \
patch("gateway.platforms.base.cache_image_from_bytes"):
await self.adapter._handle_ws_event(self._make_event(["file3"]))
msg = self.adapter.handle_message.call_args[0][0]
assert msg.media_types == ["application/pdf"]
assert not msg.media_types[0].startswith("image/")
assert not msg.media_types[0].startswith("audio/")

View File

@@ -47,9 +47,8 @@ async def test_connect_rejects_same_host_token_lock(monkeypatch):
@pytest.mark.asyncio
async def test_polling_conflict_retries_before_fatal(monkeypatch):
"""A single 409 should trigger a retry, not an immediate fatal error."""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
async def test_polling_conflict_stops_polling_and_notifies_handler(monkeypatch):
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="secret-token"))
fatal_handler = AsyncMock()
adapter.set_fatal_error_handler(fatal_handler)
@@ -70,7 +69,6 @@ async def test_polling_conflict_retries_before_fatal(monkeypatch):
updater = SimpleNamespace(
start_polling=AsyncMock(side_effect=fake_start_polling),
stop=AsyncMock(),
running=True,
)
bot = SimpleNamespace(set_my_commands=AsyncMock())
app = SimpleNamespace(
@@ -85,102 +83,20 @@ async def test_polling_conflict_retries_before_fatal(monkeypatch):
builder.build.return_value = app
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
# Speed up retries for testing
monkeypatch.setattr("asyncio.sleep", AsyncMock())
ok = await adapter.connect()
assert ok is True
assert callable(captured["error_callback"])
conflict = type("Conflict", (Exception,), {})
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request; make sure that only one bot instance is running"))
# First conflict: should retry, NOT be fatal
captured["error_callback"](conflict("Conflict: terminated by other getUpdates request"))
await asyncio.sleep(0)
await asyncio.sleep(0)
# Give the scheduled task a chance to run
for _ in range(10):
await asyncio.sleep(0)
assert adapter.has_fatal_error is False, "First conflict should not be fatal"
assert adapter._polling_conflict_count == 0, "Count should reset after successful retry"
@pytest.mark.asyncio
async def test_polling_conflict_becomes_fatal_after_retries(monkeypatch):
"""After exhausting retries, the conflict should become fatal."""
adapter = TelegramAdapter(PlatformConfig(enabled=True, token="***"))
fatal_handler = AsyncMock()
adapter.set_fatal_error_handler(fatal_handler)
monkeypatch.setattr(
"gateway.status.acquire_scoped_lock",
lambda scope, identity, metadata=None: (True, None),
)
monkeypatch.setattr(
"gateway.status.release_scoped_lock",
lambda scope, identity: None,
)
captured = {}
async def fake_start_polling(**kwargs):
captured["error_callback"] = kwargs["error_callback"]
# Make start_polling fail on retries to exhaust retries
call_count = {"n": 0}
async def failing_start_polling(**kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
# First call (initial connect) succeeds
captured["error_callback"] = kwargs["error_callback"]
else:
# Retry calls fail
raise Exception("Connection refused")
updater = SimpleNamespace(
start_polling=AsyncMock(side_effect=failing_start_polling),
stop=AsyncMock(),
running=True,
)
bot = SimpleNamespace(set_my_commands=AsyncMock())
app = SimpleNamespace(
bot=bot,
updater=updater,
add_handler=MagicMock(),
initialize=AsyncMock(),
start=AsyncMock(),
)
builder = MagicMock()
builder.token.return_value = builder
builder.build.return_value = app
monkeypatch.setattr("gateway.platforms.telegram.Application", SimpleNamespace(builder=MagicMock(return_value=builder)))
# Speed up retries for testing
monkeypatch.setattr("asyncio.sleep", AsyncMock())
ok = await adapter.connect()
assert ok is True
conflict = type("Conflict", (Exception,), {})
# Directly call _handle_polling_conflict to avoid event-loop scheduling
# complexity. Each call simulates one 409 from Telegram.
for i in range(4):
await adapter._handle_polling_conflict(
conflict("Conflict: terminated by other getUpdates request")
)
# After 3 failed retries (count 1-3 each enter the retry branch but
# start_polling raises), the 4th conflict pushes count to 4 which
# exceeds MAX_CONFLICT_RETRIES (3), entering the fatal branch.
assert adapter.fatal_error_code == "telegram_polling_conflict", (
f"Expected fatal after 4 conflicts, got code={adapter.fatal_error_code}, "
f"count={adapter._polling_conflict_count}"
)
assert adapter.fatal_error_code == "telegram_polling_conflict"
assert adapter.has_fatal_error is True
updater.stop.assert_awaited()
fatal_handler.assert_awaited_once()

View File

@@ -2467,8 +2467,7 @@ class TestVoiceTTSPlayback:
runner.adapters = {}
return runner
def _call_should_reply(self, runner, voice_mode, msg_type, response="Hello",
agent_msgs=None, already_sent=False):
def _call_should_reply(self, runner, voice_mode, msg_type, response="Hello", agent_msgs=None):
from gateway.platforms.base import MessageType, MessageEvent, SessionSource
from gateway.config import Platform
runner._voice_mode["ch1"] = voice_mode
@@ -2477,32 +2476,28 @@ class TestVoiceTTSPlayback:
user_id="1", user_name="test", chat_type="channel",
)
event = MessageEvent(source=source, text="test", message_type=msg_type)
return runner._should_send_voice_reply(
event, response, agent_msgs or [], already_sent=already_sent,
)
# -- Streaming OFF (existing behavior, must not change) --
return runner._should_send_voice_reply(event, response, agent_msgs or [])
def test_voice_input_runner_skips(self):
"""Streaming OFF + voice input: runner skips — base adapter handles."""
"""Voice input: runner skips — base adapter handles via play_tts."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "all", MessageType.VOICE, already_sent=False) is False
assert self._call_should_reply(runner, "all", MessageType.VOICE) is False
def test_text_input_voice_all_runner_fires(self):
"""Streaming OFF + text input + voice_mode=all: runner generates TTS."""
"""Text input + voice_mode=all: runner generates TTS."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "all", MessageType.TEXT, already_sent=False) is True
assert self._call_should_reply(runner, "all", MessageType.TEXT) is True
def test_text_input_voice_off_no_tts(self):
"""Streaming OFF + text input + voice_mode=off: no TTS."""
"""Text input + voice_mode=off: no TTS."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "off", MessageType.TEXT) is False
def test_text_input_voice_only_no_tts(self):
"""Streaming OFF + text input + voice_mode=voice_only: no TTS for text."""
"""Text input + voice_mode=voice_only: no TTS for text."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "voice_only", MessageType.TEXT) is False
@@ -2528,43 +2523,6 @@ class TestVoiceTTSPlayback:
]}]
assert self._call_should_reply(runner, "all", MessageType.TEXT, agent_msgs=agent_msgs) is False
# -- Streaming ON (already_sent=True) --
def test_streaming_on_voice_input_runner_fires(self):
"""Streaming ON + voice input: runner handles TTS (base adapter has no text)."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "all", MessageType.VOICE, already_sent=True) is True
def test_streaming_on_text_input_runner_fires(self):
"""Streaming ON + text input: runner handles TTS (same as before)."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "all", MessageType.TEXT, already_sent=True) is True
def test_streaming_on_voice_off_no_tts(self):
"""Streaming ON + voice_mode=off: no TTS regardless of streaming."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "off", MessageType.VOICE, already_sent=True) is False
def test_streaming_on_empty_response_no_tts(self):
"""Streaming ON + empty response: no TTS."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
assert self._call_should_reply(runner, "all", MessageType.VOICE, response="", already_sent=True) is False
def test_streaming_on_agent_tts_dedup(self):
"""Streaming ON + agent called TTS: runner skips (dedup still works)."""
from gateway.platforms.base import MessageType
runner = self._make_runner()
agent_msgs = [{"role": "assistant", "tool_calls": [
{"id": "1", "type": "function", "function": {"name": "text_to_speech", "arguments": "{}"}}
]}]
assert self._call_should_reply(
runner, "all", MessageType.VOICE, agent_msgs=agent_msgs, already_sent=True,
) is False
class TestUDPKeepalive:
"""UDP keepalive prevents Discord from dropping the voice session."""

View File

@@ -1,138 +0,0 @@
"""Tests for protected HermesCLI TUI extension hooks.
Verifies that wrapper CLIs can extend the TUI via:
- _get_extra_tui_widgets()
- _register_extra_tui_keybindings()
- _build_tui_layout_children()
without overriding run().
"""
from __future__ import annotations
import importlib
import sys
from unittest.mock import MagicMock, patch
from prompt_toolkit.key_binding import KeyBindings
def _make_cli(**kwargs):
"""Create a HermesCLI with prompt_toolkit stubs (same pattern as test_cli_init)."""
_clean_config = {
"model": {
"default": "anthropic/claude-opus-4.6",
"base_url": "https://openrouter.ai/api/v1",
"provider": "auto",
},
"display": {"compact": False, "tool_progress": "all"},
"agent": {},
"terminal": {"env_type": "local"},
}
clean_env = {"LLM_MODEL": "", "HERMES_MAX_ITERATIONS": ""}
prompt_toolkit_stubs = {
"prompt_toolkit": MagicMock(),
"prompt_toolkit.history": MagicMock(),
"prompt_toolkit.styles": MagicMock(),
"prompt_toolkit.patch_stdout": MagicMock(),
"prompt_toolkit.application": MagicMock(),
"prompt_toolkit.layout": MagicMock(),
"prompt_toolkit.layout.processors": MagicMock(),
"prompt_toolkit.filters": MagicMock(),
"prompt_toolkit.layout.dimension": MagicMock(),
"prompt_toolkit.layout.menus": MagicMock(),
"prompt_toolkit.widgets": MagicMock(),
"prompt_toolkit.key_binding": MagicMock(),
"prompt_toolkit.completion": MagicMock(),
"prompt_toolkit.formatted_text": MagicMock(),
"prompt_toolkit.auto_suggest": MagicMock(),
}
with patch.dict(sys.modules, prompt_toolkit_stubs), patch.dict(
"os.environ", clean_env, clear=False
):
import cli as _cli_mod
_cli_mod = importlib.reload(_cli_mod)
with patch.object(_cli_mod, "get_tool_definitions", return_value=[]), patch.dict(
_cli_mod.__dict__, {"CLI_CONFIG": _clean_config}
):
return _cli_mod.HermesCLI(**kwargs)
class TestExtensionHookDefaults:
def test_extra_tui_widgets_default_empty(self):
cli = _make_cli()
assert cli._get_extra_tui_widgets() == []
def test_register_extra_tui_keybindings_default_noop(self):
cli = _make_cli()
kb = KeyBindings()
result = cli._register_extra_tui_keybindings(kb, input_area=None)
assert result is None
assert kb.bindings == []
def test_build_tui_layout_children_returns_all_widgets_in_order(self):
cli = _make_cli()
children = cli._build_tui_layout_children(
sudo_widget="sudo",
secret_widget="secret",
approval_widget="approval",
clarify_widget="clarify",
spinner_widget="spinner",
spacer="spacer",
status_bar="status",
input_rule_top="top-rule",
image_bar="image-bar",
input_area="input-area",
input_rule_bot="bottom-rule",
voice_status_bar="voice-status",
completions_menu="completions-menu",
)
# First element is Window(height=0), rest are the named widgets
assert children[1:] == [
"sudo", "secret", "approval", "clarify", "spinner",
"spacer", "status", "top-rule", "image-bar", "input-area",
"bottom-rule", "voice-status", "completions-menu",
]
class TestExtensionHookSubclass:
def test_extra_widgets_inserted_before_status_bar(self):
cli = _make_cli()
# Monkey-patch to simulate subclass override
cli._get_extra_tui_widgets = lambda: ["radio-menu", "mini-player"]
children = cli._build_tui_layout_children(
sudo_widget="sudo",
secret_widget="secret",
approval_widget="approval",
clarify_widget="clarify",
spinner_widget="spinner",
spacer="spacer",
status_bar="status",
input_rule_top="top-rule",
image_bar="image-bar",
input_area="input-area",
input_rule_bot="bottom-rule",
voice_status_bar="voice-status",
completions_menu="completions-menu",
)
# Extra widgets should appear between spacer and status bar
spacer_idx = children.index("spacer")
status_idx = children.index("status")
assert children[spacer_idx + 1] == "radio-menu"
assert children[spacer_idx + 2] == "mini-player"
assert children[spacer_idx + 3] == "status"
assert status_idx == spacer_idx + 3
def test_extra_keybindings_can_add_bindings(self):
cli = _make_cli()
kb = KeyBindings()
def _custom_hook(kb, *, input_area):
@kb.add("f2")
def _toggle(event):
return None
cli._register_extra_tui_keybindings = _custom_hook
cli._register_extra_tui_keybindings(kb, input_area=None)
assert len(kb.bindings) == 1

View File

@@ -67,7 +67,6 @@ class TestPluginDiscovery:
project_dir = tmp_path / "project"
project_dir.mkdir()
monkeypatch.chdir(project_dir)
monkeypatch.setenv("HERMES_ENABLE_PROJECT_PLUGINS", "true")
plugins_dir = project_dir / ".hermes" / "plugins"
_make_plugin_dir(plugins_dir, "proj_plugin")
@@ -77,19 +76,6 @@ class TestPluginDiscovery:
assert "proj_plugin" in mgr._plugins
assert mgr._plugins["proj_plugin"].enabled
def test_discover_project_plugins_skipped_by_default(self, tmp_path, monkeypatch):
"""Project plugins are not discovered unless explicitly enabled."""
project_dir = tmp_path / "project"
project_dir.mkdir()
monkeypatch.chdir(project_dir)
plugins_dir = project_dir / ".hermes" / "plugins"
_make_plugin_dir(plugins_dir, "proj_plugin")
mgr = PluginManager()
mgr.discover_and_load()
assert "proj_plugin" not in mgr._plugins
def test_discover_is_idempotent(self, tmp_path, monkeypatch):
"""Calling discover_and_load() twice does not duplicate plugins."""
plugins_dir = tmp_path / "hermes_test" / "plugins"

View File

@@ -355,27 +355,24 @@ def resolve_toolset(name: str, visited: Set[str] = None) -> List[str]:
all_tools.update(resolved)
return list(all_tools)
# Check for cycles / already-resolved (diamond deps).
# Silently return [] — either this is a diamond (not a bug, tools already
# collected via another path) or a genuine cycle (safe to skip).
# Check for cycles
if name in visited:
print(f"⚠️ Circular dependency detected in toolset '{name}'")
return []
visited.add(name)
# Get toolset definition
toolset = TOOLSETS.get(name)
if not toolset:
return []
# Collect direct tools
tools = set(toolset.get("tools", []))
# Recursively resolve included toolsets, sharing the visited set across
# sibling includes so diamond dependencies are only resolved once and
# cycle warnings don't fire multiple times for the same cycle.
# Recursively resolve included toolsets
for included_name in toolset.get("includes", []):
included_tools = resolve_toolset(included_name, visited)
included_tools = resolve_toolset(included_name, visited.copy())
tools.update(included_tools)
return list(tools)

View File

@@ -1,196 +0,0 @@
---
sidebar_position: 8
title: "Extending the CLI"
description: "Build wrapper CLIs that extend the Hermes TUI with custom widgets, keybindings, and layout changes"
---
# Extending the CLI
Hermes exposes protected extension hooks on `HermesCLI` so wrapper CLIs can add widgets, keybindings, and layout customizations without overriding the 1000+ line `run()` method. This keeps your extension decoupled from internal changes.
## Extension points
There are five extension seams available:
| Hook | Purpose | Override when... |
|------|---------|------------------|
| `_get_extra_tui_widgets()` | Inject widgets into the layout | You need a persistent UI element (panel, status line, mini-player) |
| `_register_extra_tui_keybindings(kb, *, input_area)` | Add keyboard shortcuts | You need hotkeys (toggle panels, transport controls, modal shortcuts) |
| `_build_tui_layout_children(**widgets)` | Full control over widget ordering | You need to reorder or wrap existing widgets (rare) |
| `process_command()` | Add custom slash commands | You need `/mycommand` handling (pre-existing hook) |
| `_build_tui_style_dict()` | Custom prompt_toolkit styles | You need custom colors or styling (pre-existing hook) |
The first three are new protected hooks. The last two already existed.
## Quick start: a wrapper CLI
```python
#!/usr/bin/env python3
"""my_cli.py — Example wrapper CLI that extends Hermes."""
from cli import HermesCLI
from prompt_toolkit.layout import FormattedTextControl, Window
from prompt_toolkit.filters import Condition
class MyCLI(HermesCLI):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._panel_visible = False
def _get_extra_tui_widgets(self):
"""Add a toggleable info panel above the status bar."""
cli_ref = self
return [
Window(
FormattedTextControl(lambda: "📊 My custom panel content"),
height=1,
filter=Condition(lambda: cli_ref._panel_visible),
),
]
def _register_extra_tui_keybindings(self, kb, *, input_area):
"""F2 toggles the custom panel."""
cli_ref = self
@kb.add("f2")
def _toggle_panel(event):
cli_ref._panel_visible = not cli_ref._panel_visible
def process_command(self, cmd: str) -> bool:
"""Add a /panel slash command."""
if cmd.strip().lower() == "/panel":
self._panel_visible = not self._panel_visible
state = "visible" if self._panel_visible else "hidden"
print(f"Panel is now {state}")
return True
return super().process_command(cmd)
if __name__ == "__main__":
cli = MyCLI()
cli.run()
```
Run it:
```bash
cd ~/.hermes/hermes-agent
source .venv/bin/activate
python my_cli.py
```
## Hook reference
### `_get_extra_tui_widgets()`
Returns a list of prompt_toolkit widgets to insert into the TUI layout. Widgets appear **between the spacer and the status bar** — above the input area but below the main output.
```python
def _get_extra_tui_widgets(self) -> list:
return [] # default: no extra widgets
```
Each widget should be a prompt_toolkit container (e.g., `Window`, `ConditionalContainer`, `HSplit`). Use `ConditionalContainer` or `filter=Condition(...)` to make widgets toggleable.
```python
from prompt_toolkit.layout import ConditionalContainer, Window, FormattedTextControl
from prompt_toolkit.filters import Condition
def _get_extra_tui_widgets(self):
return [
ConditionalContainer(
Window(FormattedTextControl("Status: connected"), height=1),
filter=Condition(lambda: self._show_status),
),
]
```
### `_register_extra_tui_keybindings(kb, *, input_area)`
Called after Hermes registers its own keybindings and before the layout is built. Add your keybindings to `kb`.
```python
def _register_extra_tui_keybindings(self, kb, *, input_area):
pass # default: no extra keybindings
```
Parameters:
- **`kb`** — The `KeyBindings` instance for the prompt_toolkit application
- **`input_area`** — The main `TextArea` widget, if you need to read or manipulate user input
```python
def _register_extra_tui_keybindings(self, kb, *, input_area):
cli_ref = self
@kb.add("f3")
def _clear_input(event):
input_area.text = ""
@kb.add("f4")
def _insert_template(event):
input_area.text = "/search "
```
**Avoid conflicts** with built-in keybindings: `Enter` (submit), `Escape Enter` (newline), `Ctrl-C` (interrupt), `Ctrl-D` (exit), `Tab` (auto-suggest accept). Function keys F2+ and Ctrl-combinations are generally safe.
### `_build_tui_layout_children(**widgets)`
Override this only when you need full control over widget ordering. Most extensions should use `_get_extra_tui_widgets()` instead.
```python
def _build_tui_layout_children(self, *, sudo_widget, secret_widget,
approval_widget, clarify_widget, spinner_widget, spacer,
status_bar, input_rule_top, image_bar, input_area,
input_rule_bot, voice_status_bar, completions_menu) -> list:
```
The default implementation returns:
```python
[
Window(height=0), # anchor
sudo_widget, # sudo password prompt (conditional)
secret_widget, # secret input prompt (conditional)
approval_widget, # dangerous command approval (conditional)
clarify_widget, # clarify question UI (conditional)
spinner_widget, # thinking spinner (conditional)
spacer, # fills remaining vertical space
*self._get_extra_tui_widgets(), # YOUR WIDGETS GO HERE
status_bar, # model/token/context status line
input_rule_top, # ─── border above input
image_bar, # attached images indicator
input_area, # user text input
input_rule_bot, # ─── border below input
voice_status_bar, # voice mode status (conditional)
completions_menu, # autocomplete dropdown
]
```
## Layout diagram
```
┌─────────────────────────────────────────┐
│ (output scrolls here) │
│ │
│ spacer ────────│
│ ★ Your extra widgets appear here ★ │
├─────────────────────────────────────────┤
│ ⚕ claude-sonnet-4 · 42% · 2m status │
├─────────────────────────────────────────┤
│ 📎 2 images image bar│
your input here input area │
├─────────────────────────────────────────┤
│ 🎤 Voice mode: listening voice status │
│ ▸ completions... autocomplete │
└─────────────────────────────────────────┘
```
## Tips
- **Invalidate the display** after state changes: call `self._invalidate()` to trigger a prompt_toolkit redraw.
- **Access agent state**: `self.agent`, `self.model`, `self.conversation_history` are all available.
- **Custom styles**: Override `_build_tui_style_dict()` and add entries for your custom style classes.
- **Slash commands**: Override `process_command()`, handle your commands, and call `super().process_command(cmd)` for everything else.
- **Don't override `run()`** unless absolutely necessary — the extension hooks exist specifically to avoid that coupling.

View File

@@ -232,7 +232,6 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
| `HERMES_QUIET` | Suppress non-essential output (`true`/`false`) |
| `HERMES_API_TIMEOUT` | LLM API call timeout in seconds (default: `900`) |
| `HERMES_EXEC_ASK` | Enable execution approval prompts in gateway mode (`true`/`false`) |
| `HERMES_ENABLE_PROJECT_PLUGINS` | Enable auto-discovery of repo-local plugins from `./.hermes/plugins/` (`true`/`false`, default: `false`) |
| `HERMES_BACKGROUND_NOTIFICATIONS` | Background process notification mode in gateway: `all` (default), `result`, `error`, `off` |
| `HERMES_EPHEMERAL_SYSTEM_PROMPT` | Ephemeral system prompt injected at API-call time (never persisted to sessions) |

View File

@@ -22,8 +22,6 @@ Drop a directory into `~/.hermes/plugins/` with a `plugin.yaml` and Python code:
Start Hermes — your tools appear alongside built-in tools. The model can call them immediately.
Project-local plugins under `./.hermes/plugins/` are disabled by default. Enable them only for trusted repositories by setting `HERMES_ENABLE_PROJECT_PLUGINS=true` before starting Hermes.
## What plugins can do
| Capability | How |
@@ -40,7 +38,7 @@ Project-local plugins under `./.hermes/plugins/` are disabled by default. Enable
| Source | Path | Use case |
|--------|------|----------|
| User | `~/.hermes/plugins/` | Personal plugins |
| Project | `.hermes/plugins/` | Project-specific plugins (requires `HERMES_ENABLE_PROJECT_PLUGINS=true`) |
| Project | `.hermes/plugins/` | Project-specific plugins |
| pip | `hermes_agent.plugins` entry_points | Distributed packages |
## Available hooks

View File

@@ -129,7 +129,6 @@ const sidebars: SidebarsConfig = {
'developer-guide/environments',
'developer-guide/adding-tools',
'developer-guide/creating-skills',
'developer-guide/extending-the-cli',
'developer-guide/contributing',
],
},