Compare commits

..

1 Commits

Author SHA1 Message Date
kshitijk4poor b277962dcc refactor: extract codex_responses logic into dedicated adapter
Move 10 Responses API format-conversion and normalization functions from
run_agent.py into agent/codex_responses_adapter.py. All functions are now
stateless module-level functions with zero self references.

The AIAgent methods remain as thin one-line wrappers that delegate to the
adapter, so all callers (tests, gateway, CLI) are unchanged.

Functions extracted:
- _deterministic_call_id: deterministic tool call ID generation
- _split_responses_tool_id: composite ID splitting
- _derive_responses_function_call_id: call_ to fc_ prefix conversion
- _responses_tools: chat completions tool schema → Responses format
- _chat_messages_to_responses_input: message format conversion
- _preflight_codex_input_items: input item normalization
- _preflight_codex_api_kwargs: API kwargs validation/cleaning
- _extract_responses_message_text: text extraction from response items
- _extract_responses_reasoning_text: reasoning extraction
- _normalize_codex_response: full response normalization

This brings codex_responses in line with anthropic_adapter.py and
bedrock_adapter.py which already have their own adapter files.

run_agent.py: 12410 → 11845 lines (-565 net)
2026-04-20 16:32:43 +05:30
74 changed files with 968 additions and 5549 deletions
+2
View File
@@ -27,10 +27,12 @@ WORKDIR /opt/hermes
# Copy only package manifests first so npm install + Playwright are cached
# unless the lockfiles themselves change.
COPY package.json package-lock.json ./
COPY scripts/whatsapp-bridge/package.json scripts/whatsapp-bridge/package-lock.json scripts/whatsapp-bridge/
COPY web/package.json web/package-lock.json web/
RUN npm install --prefer-offline --no-audit && \
npx playwright install --with-deps chromium --only-shell && \
(cd scripts/whatsapp-bridge && npm install --prefer-offline --no-audit) && \
(cd web && npm install --prefer-offline --no-audit) && \
npm cache clean --force
-39
View File
@@ -1525,42 +1525,3 @@ def normalize_anthropic_response(
),
finish_reason,
)
def normalize_anthropic_response_v2(
response,
strip_tool_prefix: bool = False,
) -> "NormalizedResponse":
"""Normalize Anthropic response to NormalizedResponse.
Wraps the existing normalize_anthropic_response() and maps its output
to the shared transport types. This allows incremental migration —
one call site at a time — without changing the original function.
"""
from agent.transports.types import NormalizedResponse, build_tool_call
assistant_msg, finish_reason = normalize_anthropic_response(response, strip_tool_prefix)
tool_calls = None
if assistant_msg.tool_calls:
tool_calls = [
build_tool_call(
id=tc.id,
name=tc.function.name,
arguments=tc.function.arguments,
)
for tc in assistant_msg.tool_calls
]
provider_data = {}
if getattr(assistant_msg, "reasoning_details", None):
provider_data["reasoning_details"] = assistant_msg.reasoning_details
return NormalizedResponse(
content=assistant_msg.content,
tool_calls=tool_calls,
finish_reason=finish_reason,
reasoning=getattr(assistant_msg, "reasoning", None),
usage=None, # Anthropic usage is on the raw response, not the normaliser
provider_data=provider_data or None,
)
+650
View File
@@ -0,0 +1,650 @@
"""Codex Responses API adapter.
Pure format-conversion and normalization logic for the OpenAI Responses API
(used by OpenAI Codex, xAI, GitHub Models, and other Responses-compatible endpoints).
Extracted from run_agent.py to isolate Responses API-specific logic from the
core agent loop. All functions are stateless — they operate on the data passed
in and return transformed results.
"""
from __future__ import annotations
import hashlib
import json
import logging
import re
import uuid
from types import SimpleNamespace
from typing import Any, Dict, List, Optional
from agent.prompt_builder import DEFAULT_AGENT_IDENTITY
logger = logging.getLogger(__name__)
def _deterministic_call_id(fn_name: str, arguments: str, index: int = 0) -> str:
"""Generate a deterministic call_id from tool call content.
Used as a fallback when the API doesn't provide a call_id.
Deterministic IDs prevent cache invalidation — random UUIDs would
make every API call's prefix unique, breaking OpenAI's prompt cache.
"""
seed = f"{fn_name}:{arguments}:{index}"
digest = hashlib.sha256(seed.encode("utf-8", errors="replace")).hexdigest()[:12]
return f"call_{digest}"
def _split_responses_tool_id(raw_id: Any) -> tuple[Optional[str], Optional[str]]:
"""Split a stored tool id into (call_id, response_item_id)."""
if not isinstance(raw_id, str):
return None, None
value = raw_id.strip()
if not value:
return None, None
if "|" in value:
call_id, response_item_id = value.split("|", 1)
call_id = call_id.strip() or None
response_item_id = response_item_id.strip() or None
return call_id, response_item_id
if value.startswith("fc_"):
return None, value
return value, None
def _derive_responses_function_call_id(
call_id: str,
response_item_id: Optional[str] = None,
) -> str:
"""Build a valid Responses `function_call.id` (must start with `fc_`)."""
if isinstance(response_item_id, str):
candidate = response_item_id.strip()
if candidate.startswith("fc_"):
return candidate
source = (call_id or "").strip()
if source.startswith("fc_"):
return source
if source.startswith("call_") and len(source) > len("call_"):
return f"fc_{source[len('call_'):]}"
sanitized = re.sub(r"[^A-Za-z0-9_-]", "", source)
if sanitized.startswith("fc_"):
return sanitized
if sanitized.startswith("call_") and len(sanitized) > len("call_"):
return f"fc_{sanitized[len('call_'):]}"
if sanitized:
return f"fc_{sanitized[:48]}"
seed = source or str(response_item_id or "") or uuid.uuid4().hex
digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:24]
return f"fc_{digest}"
def _responses_tools(tools: Optional[List[Dict[str, Any]]] = None) -> Optional[List[Dict[str, Any]]]:
"""Convert chat-completions tool schemas to Responses function-tool schemas."""
source_tools = tools
if not source_tools:
return None
converted: List[Dict[str, Any]] = []
for item in source_tools:
fn = item.get("function", {}) if isinstance(item, dict) else {}
name = fn.get("name")
if not isinstance(name, str) or not name.strip():
continue
converted.append({
"type": "function",
"name": name,
"description": fn.get("description", ""),
"strict": False,
"parameters": fn.get("parameters", {"type": "object", "properties": {}}),
})
return converted or None
def _chat_messages_to_responses_input(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert internal chat-style messages to Responses input items."""
items: List[Dict[str, Any]] = []
seen_item_ids: set = set()
for msg in messages:
if not isinstance(msg, dict):
continue
role = msg.get("role")
if role == "system":
continue
if role in {"user", "assistant"}:
content = msg.get("content", "")
content_text = str(content) if content is not None else ""
if role == "assistant":
# Replay encrypted reasoning items from previous turns
# so the API can maintain coherent reasoning chains.
codex_reasoning = msg.get("codex_reasoning_items")
has_codex_reasoning = False
if isinstance(codex_reasoning, list):
for ri in codex_reasoning:
if isinstance(ri, dict) and ri.get("encrypted_content"):
item_id = ri.get("id")
if item_id and item_id in seen_item_ids:
continue
# Strip the "id" field — with store=False the
# Responses API cannot look up items by ID and
# returns 404. The encrypted_content blob is
# self-contained for reasoning chain continuity.
replay_item = {k: v for k, v in ri.items() if k != "id"}
items.append(replay_item)
if item_id:
seen_item_ids.add(item_id)
has_codex_reasoning = True
if content_text.strip():
items.append({"role": "assistant", "content": content_text})
elif has_codex_reasoning:
# The Responses API requires a following item after each
# reasoning item (otherwise: missing_following_item error).
# When the assistant produced only reasoning with no visible
# content, emit an empty assistant message as the required
# following item.
items.append({"role": "assistant", "content": ""})
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if not isinstance(tc, dict):
continue
fn = tc.get("function", {})
fn_name = fn.get("name")
if not isinstance(fn_name, str) or not fn_name.strip():
continue
embedded_call_id, embedded_response_item_id = _split_responses_tool_id(
tc.get("id")
)
call_id = tc.get("call_id")
if not isinstance(call_id, str) or not call_id.strip():
call_id = embedded_call_id
if not isinstance(call_id, str) or not call_id.strip():
if (
isinstance(embedded_response_item_id, str)
and embedded_response_item_id.startswith("fc_")
and len(embedded_response_item_id) > len("fc_")
):
call_id = f"call_{embedded_response_item_id[len('fc_'):]}"
else:
_raw_args = str(fn.get("arguments", "{}"))
call_id = _deterministic_call_id(fn_name, _raw_args, len(items))
call_id = call_id.strip()
arguments = fn.get("arguments", "{}")
if isinstance(arguments, dict):
arguments = json.dumps(arguments, ensure_ascii=False)
elif not isinstance(arguments, str):
arguments = str(arguments)
arguments = arguments.strip() or "{}"
items.append({
"type": "function_call",
"call_id": call_id,
"name": fn_name,
"arguments": arguments,
})
continue
items.append({"role": role, "content": content_text})
continue
if role == "tool":
raw_tool_call_id = msg.get("tool_call_id")
call_id, _ = _split_responses_tool_id(raw_tool_call_id)
if not isinstance(call_id, str) or not call_id.strip():
if isinstance(raw_tool_call_id, str) and raw_tool_call_id.strip():
call_id = raw_tool_call_id.strip()
if not isinstance(call_id, str) or not call_id.strip():
continue
items.append({
"type": "function_call_output",
"call_id": call_id,
"output": str(msg.get("content", "") or ""),
})
return items
def _preflight_codex_input_items(raw_items: Any) -> List[Dict[str, Any]]:
if not isinstance(raw_items, list):
raise ValueError("Codex Responses input must be a list of input items.")
normalized: List[Dict[str, Any]] = []
seen_ids: set = set()
for idx, item in enumerate(raw_items):
if not isinstance(item, dict):
raise ValueError(f"Codex Responses input[{idx}] must be an object.")
item_type = item.get("type")
if item_type == "function_call":
call_id = item.get("call_id")
name = item.get("name")
if not isinstance(call_id, str) or not call_id.strip():
raise ValueError(f"Codex Responses input[{idx}] function_call is missing call_id.")
if not isinstance(name, str) or not name.strip():
raise ValueError(f"Codex Responses input[{idx}] function_call is missing name.")
arguments = item.get("arguments", "{}")
if isinstance(arguments, dict):
arguments = json.dumps(arguments, ensure_ascii=False)
elif not isinstance(arguments, str):
arguments = str(arguments)
arguments = arguments.strip() or "{}"
normalized.append(
{
"type": "function_call",
"call_id": call_id.strip(),
"name": name.strip(),
"arguments": arguments,
}
)
continue
if item_type == "function_call_output":
call_id = item.get("call_id")
if not isinstance(call_id, str) or not call_id.strip():
raise ValueError(f"Codex Responses input[{idx}] function_call_output is missing call_id.")
output = item.get("output", "")
if output is None:
output = ""
if not isinstance(output, str):
output = str(output)
normalized.append(
{
"type": "function_call_output",
"call_id": call_id.strip(),
"output": output,
}
)
continue
if item_type == "reasoning":
encrypted = item.get("encrypted_content")
if isinstance(encrypted, str) and encrypted:
item_id = item.get("id")
if isinstance(item_id, str) and item_id:
if item_id in seen_ids:
continue
seen_ids.add(item_id)
reasoning_item = {"type": "reasoning", "encrypted_content": encrypted}
# Do NOT include the "id" in the outgoing item — with
# store=False (our default) the API tries to resolve the
# id server-side and returns 404. The id is still used
# above for local deduplication via seen_ids.
summary = item.get("summary")
if isinstance(summary, list):
reasoning_item["summary"] = summary
else:
reasoning_item["summary"] = []
normalized.append(reasoning_item)
continue
role = item.get("role")
if role in {"user", "assistant"}:
content = item.get("content", "")
if content is None:
content = ""
if not isinstance(content, str):
content = str(content)
normalized.append({"role": role, "content": content})
continue
raise ValueError(
f"Codex Responses input[{idx}] has unsupported item shape (type={item_type!r}, role={role!r})."
)
return normalized
def _preflight_codex_api_kwargs(
api_kwargs: Any,
*,
allow_stream: bool = False,
) -> Dict[str, Any]:
if not isinstance(api_kwargs, dict):
raise ValueError("Codex Responses request must be a dict.")
required = {"model", "instructions", "input"}
missing = [key for key in required if key not in api_kwargs]
if missing:
raise ValueError(f"Codex Responses request missing required field(s): {', '.join(sorted(missing))}.")
model = api_kwargs.get("model")
if not isinstance(model, str) or not model.strip():
raise ValueError("Codex Responses request 'model' must be a non-empty string.")
model = model.strip()
instructions = api_kwargs.get("instructions")
if instructions is None:
instructions = ""
if not isinstance(instructions, str):
instructions = str(instructions)
instructions = instructions.strip() or DEFAULT_AGENT_IDENTITY
normalized_input = _preflight_codex_input_items(api_kwargs.get("input"))
tools = api_kwargs.get("tools")
normalized_tools = None
if tools is not None:
if not isinstance(tools, list):
raise ValueError("Codex Responses request 'tools' must be a list when provided.")
normalized_tools = []
for idx, tool in enumerate(tools):
if not isinstance(tool, dict):
raise ValueError(f"Codex Responses tools[{idx}] must be an object.")
if tool.get("type") != "function":
raise ValueError(f"Codex Responses tools[{idx}] has unsupported type {tool.get('type')!r}.")
name = tool.get("name")
parameters = tool.get("parameters")
if not isinstance(name, str) or not name.strip():
raise ValueError(f"Codex Responses tools[{idx}] is missing a valid name.")
if not isinstance(parameters, dict):
raise ValueError(f"Codex Responses tools[{idx}] is missing valid parameters.")
description = tool.get("description", "")
if description is None:
description = ""
if not isinstance(description, str):
description = str(description)
strict = tool.get("strict", False)
if not isinstance(strict, bool):
strict = bool(strict)
normalized_tools.append(
{
"type": "function",
"name": name.strip(),
"description": description,
"strict": strict,
"parameters": parameters,
}
)
store = api_kwargs.get("store", False)
if store is not False:
raise ValueError("Codex Responses contract requires 'store' to be false.")
allowed_keys = {
"model", "instructions", "input", "tools", "store",
"reasoning", "include", "max_output_tokens", "temperature",
"tool_choice", "parallel_tool_calls", "prompt_cache_key", "service_tier",
"extra_headers",
}
normalized: Dict[str, Any] = {
"model": model,
"instructions": instructions,
"input": normalized_input,
"store": False,
}
if normalized_tools is not None:
normalized["tools"] = normalized_tools
# Pass through reasoning config
reasoning = api_kwargs.get("reasoning")
if isinstance(reasoning, dict):
normalized["reasoning"] = reasoning
include = api_kwargs.get("include")
if isinstance(include, list):
normalized["include"] = include
service_tier = api_kwargs.get("service_tier")
if isinstance(service_tier, str) and service_tier.strip():
normalized["service_tier"] = service_tier.strip()
# Pass through max_output_tokens and temperature
max_output_tokens = api_kwargs.get("max_output_tokens")
if isinstance(max_output_tokens, (int, float)) and max_output_tokens > 0:
normalized["max_output_tokens"] = int(max_output_tokens)
temperature = api_kwargs.get("temperature")
if isinstance(temperature, (int, float)):
normalized["temperature"] = float(temperature)
# Pass through tool_choice, parallel_tool_calls, prompt_cache_key
for passthrough_key in ("tool_choice", "parallel_tool_calls", "prompt_cache_key"):
val = api_kwargs.get(passthrough_key)
if val is not None:
normalized[passthrough_key] = val
extra_headers = api_kwargs.get("extra_headers")
if extra_headers is not None:
if not isinstance(extra_headers, dict):
raise ValueError("Codex Responses request 'extra_headers' must be an object.")
normalized_headers: Dict[str, str] = {}
for key, value in extra_headers.items():
if not isinstance(key, str) or not key.strip():
raise ValueError("Codex Responses request 'extra_headers' keys must be non-empty strings.")
if value is None:
continue
normalized_headers[key.strip()] = str(value)
if normalized_headers:
normalized["extra_headers"] = normalized_headers
if allow_stream:
stream = api_kwargs.get("stream")
if stream is not None and stream is not True:
raise ValueError("Codex Responses 'stream' must be true when set.")
if stream is True:
normalized["stream"] = True
allowed_keys.add("stream")
elif "stream" in api_kwargs:
raise ValueError("Codex Responses stream flag is only allowed in fallback streaming requests.")
unexpected = sorted(key for key in api_kwargs if key not in allowed_keys)
if unexpected:
raise ValueError(
f"Codex Responses request has unsupported field(s): {', '.join(unexpected)}."
)
return normalized
def _extract_responses_message_text(item: Any) -> str:
"""Extract assistant text from a Responses message output item."""
content = getattr(item, "content", None)
if not isinstance(content, list):
return ""
chunks: List[str] = []
for part in content:
ptype = getattr(part, "type", None)
if ptype not in {"output_text", "text"}:
continue
text = getattr(part, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
return "".join(chunks).strip()
def _extract_responses_reasoning_text(item: Any) -> str:
"""Extract a compact reasoning text from a Responses reasoning item."""
summary = getattr(item, "summary", None)
if isinstance(summary, list):
chunks: List[str] = []
for part in summary:
text = getattr(part, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
if chunks:
return "\n".join(chunks).strip()
text = getattr(item, "text", None)
if isinstance(text, str) and text:
return text.strip()
return ""
def _normalize_codex_response(response: Any) -> tuple[Any, str]:
"""Normalize a Responses API object to an assistant_message-like object."""
output = getattr(response, "output", None)
if not isinstance(output, list) or not output:
# The Codex backend can return empty output when the answer was
# delivered entirely via stream events. Check output_text as a
# last-resort fallback before raising.
out_text = getattr(response, "output_text", None)
if isinstance(out_text, str) and out_text.strip():
logger.debug(
"Codex response has empty output but output_text is present (%d chars); "
"synthesizing output item.", len(out_text.strip()),
)
output = [SimpleNamespace(
type="message", role="assistant", status="completed",
content=[SimpleNamespace(type="output_text", text=out_text.strip())],
)]
response.output = output
else:
raise RuntimeError("Responses API returned no output items")
response_status = getattr(response, "status", None)
if isinstance(response_status, str):
response_status = response_status.strip().lower()
else:
response_status = None
if response_status in {"failed", "cancelled"}:
error_obj = getattr(response, "error", None)
if isinstance(error_obj, dict):
error_msg = error_obj.get("message") or str(error_obj)
else:
error_msg = str(error_obj) if error_obj else f"Responses API returned status '{response_status}'"
raise RuntimeError(error_msg)
content_parts: List[str] = []
reasoning_parts: List[str] = []
reasoning_items_raw: List[Dict[str, Any]] = []
tool_calls: List[Any] = []
has_incomplete_items = response_status in {"queued", "in_progress", "incomplete"}
saw_commentary_phase = False
saw_final_answer_phase = False
for item in output:
item_type = getattr(item, "type", None)
item_status = getattr(item, "status", None)
if isinstance(item_status, str):
item_status = item_status.strip().lower()
else:
item_status = None
if item_status in {"queued", "in_progress", "incomplete"}:
has_incomplete_items = True
if item_type == "message":
item_phase = getattr(item, "phase", None)
if isinstance(item_phase, str):
normalized_phase = item_phase.strip().lower()
if normalized_phase in {"commentary", "analysis"}:
saw_commentary_phase = True
elif normalized_phase in {"final_answer", "final"}:
saw_final_answer_phase = True
message_text = _extract_responses_message_text(item)
if message_text:
content_parts.append(message_text)
elif item_type == "reasoning":
reasoning_text = _extract_responses_reasoning_text(item)
if reasoning_text:
reasoning_parts.append(reasoning_text)
# Capture the full reasoning item for multi-turn continuity.
# encrypted_content is an opaque blob the API needs back on
# subsequent turns to maintain coherent reasoning chains.
encrypted = getattr(item, "encrypted_content", None)
if isinstance(encrypted, str) and encrypted:
raw_item = {"type": "reasoning", "encrypted_content": encrypted}
item_id = getattr(item, "id", None)
if isinstance(item_id, str) and item_id:
raw_item["id"] = item_id
# Capture summary — required by the API when replaying reasoning items
summary = getattr(item, "summary", None)
if isinstance(summary, list):
raw_summary = []
for part in summary:
text = getattr(part, "text", None)
if isinstance(text, str):
raw_summary.append({"type": "summary_text", "text": text})
raw_item["summary"] = raw_summary
reasoning_items_raw.append(raw_item)
elif item_type == "function_call":
if item_status in {"queued", "in_progress", "incomplete"}:
continue
fn_name = getattr(item, "name", "") or ""
arguments = getattr(item, "arguments", "{}")
if not isinstance(arguments, str):
arguments = json.dumps(arguments, ensure_ascii=False)
raw_call_id = getattr(item, "call_id", None)
raw_item_id = getattr(item, "id", None)
embedded_call_id, _ = _split_responses_tool_id(raw_item_id)
call_id = raw_call_id if isinstance(raw_call_id, str) and raw_call_id.strip() else embedded_call_id
if not isinstance(call_id, str) or not call_id.strip():
call_id = _deterministic_call_id(fn_name, arguments, len(tool_calls))
call_id = call_id.strip()
response_item_id = raw_item_id if isinstance(raw_item_id, str) else None
response_item_id = _derive_responses_function_call_id(call_id, response_item_id)
tool_calls.append(SimpleNamespace(
id=call_id,
call_id=call_id,
response_item_id=response_item_id,
type="function",
function=SimpleNamespace(name=fn_name, arguments=arguments),
))
elif item_type == "custom_tool_call":
fn_name = getattr(item, "name", "") or ""
arguments = getattr(item, "input", "{}")
if not isinstance(arguments, str):
arguments = json.dumps(arguments, ensure_ascii=False)
raw_call_id = getattr(item, "call_id", None)
raw_item_id = getattr(item, "id", None)
embedded_call_id, _ = _split_responses_tool_id(raw_item_id)
call_id = raw_call_id if isinstance(raw_call_id, str) and raw_call_id.strip() else embedded_call_id
if not isinstance(call_id, str) or not call_id.strip():
call_id = _deterministic_call_id(fn_name, arguments, len(tool_calls))
call_id = call_id.strip()
response_item_id = raw_item_id if isinstance(raw_item_id, str) else None
response_item_id = _derive_responses_function_call_id(call_id, response_item_id)
tool_calls.append(SimpleNamespace(
id=call_id,
call_id=call_id,
response_item_id=response_item_id,
type="function",
function=SimpleNamespace(name=fn_name, arguments=arguments),
))
final_text = "\n".join([p for p in content_parts if p]).strip()
if not final_text and hasattr(response, "output_text"):
out_text = getattr(response, "output_text", "")
if isinstance(out_text, str):
final_text = out_text.strip()
assistant_message = SimpleNamespace(
content=final_text,
tool_calls=tool_calls,
reasoning="\n\n".join(reasoning_parts).strip() if reasoning_parts else None,
reasoning_content=None,
reasoning_details=None,
codex_reasoning_items=reasoning_items_raw or None,
)
if tool_calls:
finish_reason = "tool_calls"
elif has_incomplete_items or (saw_commentary_phase and not saw_final_answer_phase):
finish_reason = "incomplete"
elif reasoning_items_raw and not final_text:
# Response contains only reasoning (encrypted thinking state) with
# no visible content or tool calls. The model is still thinking and
# needs another turn to produce the actual answer. Marking this as
# "stop" would send it into the empty-content retry loop which burns
# 3 retries then fails — treat it as incomplete instead so the Codex
# continuation path handles it correctly.
finish_reason = "incomplete"
else:
finish_reason = "stop"
return assistant_message, finish_reason
-162
View File
@@ -124,7 +124,6 @@ class InsightsEngine:
# Gather raw data
sessions = self._get_sessions(cutoff, source)
tool_usage = self._get_tool_usage(cutoff, source)
skill_usage = self._get_skill_usage(cutoff, source)
message_stats = self._get_message_stats(cutoff, source)
if not sessions:
@@ -136,15 +135,6 @@ class InsightsEngine:
"models": [],
"platforms": [],
"tools": [],
"skills": {
"summary": {
"total_skill_loads": 0,
"total_skill_edits": 0,
"total_skill_actions": 0,
"distinct_skills_used": 0,
},
"top_skills": [],
},
"activity": {},
"top_sessions": [],
}
@@ -154,7 +144,6 @@ class InsightsEngine:
models = self._compute_model_breakdown(sessions)
platforms = self._compute_platform_breakdown(sessions)
tools = self._compute_tool_breakdown(tool_usage)
skills = self._compute_skill_breakdown(skill_usage)
activity = self._compute_activity_patterns(sessions)
top_sessions = self._compute_top_sessions(sessions)
@@ -167,7 +156,6 @@ class InsightsEngine:
"models": models,
"platforms": platforms,
"tools": tools,
"skills": skills,
"activity": activity,
"top_sessions": top_sessions,
}
@@ -296,82 +284,6 @@ class InsightsEngine:
for name, count in tool_counts.most_common()
]
def _get_skill_usage(self, cutoff: float, source: str = None) -> List[Dict]:
"""Extract per-skill usage from assistant tool calls."""
skill_counts: Dict[str, Dict[str, Any]] = {}
if source:
cursor = self._conn.execute(
"""SELECT m.tool_calls, m.timestamp
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ? AND s.source = ?
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
(cutoff, source),
)
else:
cursor = self._conn.execute(
"""SELECT m.tool_calls, m.timestamp
FROM messages m
JOIN sessions s ON s.id = m.session_id
WHERE s.started_at >= ?
AND m.role = 'assistant' AND m.tool_calls IS NOT NULL""",
(cutoff,),
)
for row in cursor.fetchall():
try:
calls = row["tool_calls"]
if isinstance(calls, str):
calls = json.loads(calls)
if not isinstance(calls, list):
continue
except (json.JSONDecodeError, TypeError):
continue
timestamp = row["timestamp"]
for call in calls:
if not isinstance(call, dict):
continue
func = call.get("function", {})
tool_name = func.get("name")
if tool_name not in {"skill_view", "skill_manage"}:
continue
args = func.get("arguments")
if isinstance(args, str):
try:
args = json.loads(args)
except (json.JSONDecodeError, TypeError):
continue
if not isinstance(args, dict):
continue
skill_name = args.get("name")
if not isinstance(skill_name, str) or not skill_name.strip():
continue
entry = skill_counts.setdefault(
skill_name,
{
"skill": skill_name,
"view_count": 0,
"manage_count": 0,
"last_used_at": None,
},
)
if tool_name == "skill_view":
entry["view_count"] += 1
else:
entry["manage_count"] += 1
if timestamp is not None and (
entry["last_used_at"] is None or timestamp > entry["last_used_at"]
):
entry["last_used_at"] = timestamp
return list(skill_counts.values())
def _get_message_stats(self, cutoff: float, source: str = None) -> Dict:
"""Get aggregate message statistics."""
if source:
@@ -563,46 +475,6 @@ class InsightsEngine:
})
return result
def _compute_skill_breakdown(self, skill_usage: List[Dict]) -> Dict[str, Any]:
"""Process per-skill usage into summary + ranked list."""
total_skill_loads = sum(s["view_count"] for s in skill_usage) if skill_usage else 0
total_skill_edits = sum(s["manage_count"] for s in skill_usage) if skill_usage else 0
total_skill_actions = total_skill_loads + total_skill_edits
top_skills = []
for skill in skill_usage:
total_count = skill["view_count"] + skill["manage_count"]
percentage = (total_count / total_skill_actions * 100) if total_skill_actions else 0
top_skills.append({
"skill": skill["skill"],
"view_count": skill["view_count"],
"manage_count": skill["manage_count"],
"total_count": total_count,
"percentage": percentage,
"last_used_at": skill.get("last_used_at"),
})
top_skills.sort(
key=lambda s: (
s["total_count"],
s["view_count"],
s["manage_count"],
s["last_used_at"] or 0,
s["skill"],
),
reverse=True,
)
return {
"summary": {
"total_skill_loads": total_skill_loads,
"total_skill_edits": total_skill_edits,
"total_skill_actions": total_skill_actions,
"distinct_skills_used": len(skill_usage),
},
"top_skills": top_skills,
}
def _compute_activity_patterns(self, sessions: List[Dict]) -> Dict:
"""Analyze activity patterns by day of week and hour."""
day_counts = Counter() # 0=Monday ... 6=Sunday
@@ -798,28 +670,6 @@ class InsightsEngine:
lines.append(f" ... and {len(report['tools']) - 15} more tools")
lines.append("")
# Skill usage
skills = report.get("skills", {})
top_skills = skills.get("top_skills", [])
if top_skills:
lines.append(" 🧠 Top Skills")
lines.append(" " + "" * 56)
lines.append(f" {'Skill':<28} {'Loads':>7} {'Edits':>7} {'Last used':>11}")
for skill in top_skills[:10]:
last_used = ""
if skill.get("last_used_at"):
last_used = datetime.fromtimestamp(skill["last_used_at"]).strftime("%b %d")
lines.append(
f" {skill['skill'][:28]:<28} {skill['view_count']:>7,} {skill['manage_count']:>7,} {last_used:>11}"
)
summary = skills.get("summary", {})
lines.append(
f" Distinct skills: {summary.get('distinct_skills_used', 0)} "
f"Loads: {summary.get('total_skill_loads', 0):,} "
f"Edits: {summary.get('total_skill_edits', 0):,}"
)
lines.append("")
# Activity patterns
act = report.get("activity", {})
if act.get("by_day"):
@@ -903,18 +753,6 @@ class InsightsEngine:
lines.append(f" {t['tool']}{t['count']:,} calls ({t['percentage']:.1f}%)")
lines.append("")
skills = report.get("skills", {})
if skills.get("top_skills"):
lines.append("**🧠 Top Skills:**")
for skill in skills["top_skills"][:5]:
suffix = ""
if skill.get("last_used_at"):
suffix = f", last used {datetime.fromtimestamp(skill['last_used_at']).strftime('%b %d')}"
lines.append(
f" {skill['skill']}{skill['view_count']:,} loads, {skill['manage_count']:,} edits{suffix}"
)
lines.append("")
# Activity summary
act = report.get("activity", {})
if act.get("busiest_day") and act.get("busiest_hour"):
+1
View File
@@ -116,6 +116,7 @@ DEFAULT_CONTEXT_LENGTHS = {
"gpt-5.4-nano": 400000, # 400k (not 1.05M like full 5.4)
"gpt-5.4-mini": 400000, # 400k (not 1.05M like full 5.4)
"gpt-5.4": 1050000, # GPT-5.4, GPT-5.4 Pro (1.05M context)
"gpt-5.3-codex-spark": 128000, # Spark variant has reduced 128k context
"gpt-5.1-chat": 128000, # Chat variant has 128k context
"gpt-5": 400000, # GPT-5.x base, mini, codex variants (400k)
"gpt-4.1": 1047576,
-1
View File
@@ -1 +0,0 @@
"""Transport layer types for provider response normalization."""
-100
View File
@@ -1,100 +0,0 @@
"""Shared types for normalized provider responses.
These dataclasses define the canonical shape that all provider adapters
normalize responses to. The shared surface is intentionally minimal —
only fields that every downstream consumer reads are top-level.
Protocol-specific state goes in ``provider_data`` dicts (response-level
and per-tool-call) so that protocol-aware code paths can access it
without polluting the shared type.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
@dataclass
class ToolCall:
"""A normalized tool call from any provider.
``id`` is the protocol's canonical identifier — what gets used in
``tool_call_id`` / ``tool_use_id`` when constructing tool result
messages. May be ``None`` when the provider omits it; the agent
fills it via ``_deterministic_call_id()`` before storing in history.
``provider_data`` carries per-tool-call protocol metadata that only
protocol-aware code reads:
* Codex: ``{"call_id": "call_XXX", "response_item_id": "fc_XXX"}``
* Gemini: ``{"extra_content": {"google": {"thought_signature": "..."}}}``
* Others: ``None``
"""
id: Optional[str]
name: str
arguments: str # JSON string
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
@dataclass
class Usage:
"""Token usage from an API response."""
prompt_tokens: int = 0
completion_tokens: int = 0
total_tokens: int = 0
cached_tokens: int = 0
@dataclass
class NormalizedResponse:
"""Normalized API response from any provider.
Shared fields are truly cross-provider — every caller can rely on
them without branching on api_mode. Protocol-specific state goes in
``provider_data`` so that only protocol-aware code paths read it.
Response-level ``provider_data`` examples:
* Anthropic: ``{"reasoning_details": [...]}``
* Codex: ``{"codex_reasoning_items": [...]}``
* Others: ``None``
"""
content: Optional[str]
tool_calls: Optional[List[ToolCall]]
finish_reason: str # "stop", "tool_calls", "length", "content_filter"
reasoning: Optional[str] = None
usage: Optional[Usage] = None
provider_data: Optional[Dict[str, Any]] = field(default=None, repr=False)
# ---------------------------------------------------------------------------
# Factory helpers
# ---------------------------------------------------------------------------
def build_tool_call(
id: Optional[str],
name: str,
arguments: Any,
**provider_fields: Any,
) -> ToolCall:
"""Build a ``ToolCall``, auto-serialising *arguments* if it's a dict.
Any extra keyword arguments are collected into ``provider_data``.
"""
args_str = json.dumps(arguments) if isinstance(arguments, dict) else str(arguments)
pd = dict(provider_fields) if provider_fields else None
return ToolCall(id=id, name=name, arguments=args_str, provider_data=pd)
def map_finish_reason(reason: Optional[str], mapping: Dict[str, str]) -> str:
"""Translate a provider-specific stop reason to the normalised set.
Falls back to ``"stop"`` for unknown or ``None`` reasons.
"""
if reason is None:
return "stop"
return mapping.get(reason, "stop")
-1
View File
@@ -444,7 +444,6 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
if not reasoning.get("has_any_reasoning", True):
print(f" 🚫 Prompt {prompt_index} discarded (no reasoning in any turn)")
discarded_no_reasoning += 1
completed_in_batch.append(prompt_index)
continue
# Get and normalize tool stats for consistent schema across all entries
+12 -179
View File
@@ -117,160 +117,6 @@ def _normalize_chat_content(
return ""
# Content part type aliases used by the OpenAI Chat Completions and Responses
# APIs. We accept both spellings on input and emit a single canonical internal
# shape (``{"type": "text", ...}`` / ``{"type": "image_url", ...}``) that the
# rest of the agent pipeline already understands.
_TEXT_PART_TYPES = frozenset({"text", "input_text", "output_text"})
_IMAGE_PART_TYPES = frozenset({"image_url", "input_image"})
_FILE_PART_TYPES = frozenset({"file", "input_file"})
def _normalize_multimodal_content(content: Any) -> Any:
"""Validate and normalize multimodal content for the API server.
Returns a plain string when the content is text-only, or a list of
``{"type": "text"|"image_url", ...}`` parts when images are present.
The output shape is the native OpenAI Chat Completions vision format,
which the agent pipeline accepts verbatim (OpenAI-wire providers) or
converts (``_preprocess_anthropic_content`` for Anthropic).
Raises ``ValueError`` with an OpenAI-style code on invalid input:
* ``unsupported_content_type`` — file/input_file/file_id parts, or
non-image ``data:`` URLs.
* ``invalid_image_url`` — missing URL or unsupported scheme.
* ``invalid_content_part`` — malformed text/image objects.
Callers translate the ValueError into a 400 response.
"""
# Scalar passthrough mirrors ``_normalize_chat_content``.
if content is None:
return ""
if isinstance(content, str):
return content[:MAX_NORMALIZED_TEXT_LENGTH] if len(content) > MAX_NORMALIZED_TEXT_LENGTH else content
if not isinstance(content, list):
# Mirror the legacy text-normalizer's fallback so callers that
# pre-existed image support still get a string back.
return _normalize_chat_content(content)
items = content[:MAX_CONTENT_LIST_SIZE] if len(content) > MAX_CONTENT_LIST_SIZE else content
normalized_parts: List[Dict[str, Any]] = []
text_accum_len = 0
for part in items:
if isinstance(part, str):
if part:
trimmed = part[:MAX_NORMALIZED_TEXT_LENGTH]
normalized_parts.append({"type": "text", "text": trimmed})
text_accum_len += len(trimmed)
continue
if not isinstance(part, dict):
# Ignore unknown scalars for forward compatibility with future
# Responses API additions (e.g. ``refusal``). The same policy
# the text normalizer applies.
continue
raw_type = part.get("type")
part_type = str(raw_type or "").strip().lower()
if part_type in _TEXT_PART_TYPES:
text = part.get("text")
if text is None:
continue
if not isinstance(text, str):
text = str(text)
if text:
trimmed = text[:MAX_NORMALIZED_TEXT_LENGTH]
normalized_parts.append({"type": "text", "text": trimmed})
text_accum_len += len(trimmed)
continue
if part_type in _IMAGE_PART_TYPES:
detail = part.get("detail")
image_ref = part.get("image_url")
# OpenAI Responses sends ``input_image`` with a top-level
# ``image_url`` string; Chat Completions sends ``image_url`` as
# ``{"url": "...", "detail": "..."}``. Support both.
if isinstance(image_ref, dict):
url_value = image_ref.get("url")
detail = image_ref.get("detail", detail)
else:
url_value = image_ref
if not isinstance(url_value, str) or not url_value.strip():
raise ValueError("invalid_image_url:Image parts must include a non-empty image URL.")
url_value = url_value.strip()
lowered = url_value.lower()
if lowered.startswith("data:"):
if not lowered.startswith("data:image/") or "," not in url_value:
raise ValueError(
"unsupported_content_type:Only image data URLs are supported. "
"Non-image data payloads are not supported."
)
elif not (lowered.startswith("http://") or lowered.startswith("https://")):
raise ValueError(
"invalid_image_url:Image inputs must use http(s) URLs or data:image/... URLs."
)
image_part: Dict[str, Any] = {"type": "image_url", "image_url": {"url": url_value}}
if detail is not None:
if not isinstance(detail, str) or not detail.strip():
raise ValueError("invalid_content_part:Image detail must be a non-empty string when provided.")
image_part["image_url"]["detail"] = detail.strip()
normalized_parts.append(image_part)
continue
if part_type in _FILE_PART_TYPES:
raise ValueError(
"unsupported_content_type:Inline image inputs are supported, "
"but uploaded files and document inputs are not supported on this endpoint."
)
# Unknown part type — reject explicitly so clients get a clear error
# instead of a silently dropped turn.
raise ValueError(
f"unsupported_content_type:Unsupported content part type {raw_type!r}. "
"Only text and image_url/input_image parts are supported."
)
if not normalized_parts:
return ""
# Text-only: collapse to a plain string so downstream logging/trajectory
# code sees the native shape and prompt caching on text-only turns is
# unaffected.
if all(p.get("type") == "text" for p in normalized_parts):
return "\n".join(p["text"] for p in normalized_parts if p.get("text"))
return normalized_parts
def _content_has_visible_payload(content: Any) -> bool:
"""True when content has any text or image attachment. Used to reject empty turns."""
if isinstance(content, str):
return bool(content.strip())
if isinstance(content, list):
for part in content:
if isinstance(part, dict):
ptype = str(part.get("type") or "").strip().lower()
if ptype in _TEXT_PART_TYPES and str(part.get("text") or "").strip():
return True
if ptype in _IMAGE_PART_TYPES:
return True
return False
def _multimodal_validation_error(exc: ValueError, *, param: str) -> "web.Response":
"""Translate a ``_normalize_multimodal_content`` ValueError into a 400 response."""
raw = str(exc)
code, _, message = raw.partition(":")
if not message:
code, message = "invalid_content_part", raw
return web.json_response(
_openai_error(message, code=code, param=param),
status=400,
)
def check_api_server_requirements() -> bool:
"""Check if API server dependencies are available."""
return AIOHTTP_AVAILABLE
@@ -791,32 +637,26 @@ class APIServerAdapter(BasePlatformAdapter):
system_prompt = None
conversation_messages: List[Dict[str, str]] = []
for idx, msg in enumerate(messages):
for msg in messages:
role = msg.get("role", "")
raw_content = msg.get("content", "")
content = _normalize_chat_content(msg.get("content", ""))
if role == "system":
# System messages don't support images (Anthropic rejects, OpenAI
# text-model systems don't render them). Flatten to text.
content = _normalize_chat_content(raw_content)
# Accumulate system messages
if system_prompt is None:
system_prompt = content
else:
system_prompt = system_prompt + "\n" + content
elif role in ("user", "assistant"):
try:
content = _normalize_multimodal_content(raw_content)
except ValueError as exc:
return _multimodal_validation_error(exc, param=f"messages[{idx}].content")
conversation_messages.append({"role": role, "content": content})
# Extract the last user message as the primary input
user_message: Any = ""
user_message = ""
history = []
if conversation_messages:
user_message = conversation_messages[-1].get("content", "")
history = conversation_messages[:-1]
if not _content_has_visible_payload(user_message):
if not user_message:
return web.json_response(
{"error": {"message": "No user message found in messages", "type": "invalid_request_error"}},
status=400,
@@ -1584,19 +1424,16 @@ class APIServerAdapter(BasePlatformAdapter):
# No error if conversation doesn't exist yet — it's a new conversation
# Normalize input to message list
input_messages: List[Dict[str, Any]] = []
input_messages: List[Dict[str, str]] = []
if isinstance(raw_input, str):
input_messages = [{"role": "user", "content": raw_input}]
elif isinstance(raw_input, list):
for idx, item in enumerate(raw_input):
for item in raw_input:
if isinstance(item, str):
input_messages.append({"role": "user", "content": item})
elif isinstance(item, dict):
role = item.get("role", "user")
try:
content = _normalize_multimodal_content(item.get("content", ""))
except ValueError as exc:
return _multimodal_validation_error(exc, param=f"input[{idx}].content")
content = _normalize_chat_content(item.get("content", ""))
input_messages.append({"role": role, "content": content})
else:
return web.json_response(_openai_error("'input' must be a string or array"), status=400)
@@ -1605,7 +1442,7 @@ class APIServerAdapter(BasePlatformAdapter):
# This lets stateless clients supply their own history instead of
# relying on server-side response chaining via previous_response_id.
# Precedence: explicit conversation_history > previous_response_id.
conversation_history: List[Dict[str, Any]] = []
conversation_history: List[Dict[str, str]] = []
raw_history = body.get("conversation_history")
if raw_history:
if not isinstance(raw_history, list):
@@ -1619,11 +1456,7 @@ class APIServerAdapter(BasePlatformAdapter):
_openai_error(f"conversation_history[{i}] must have 'role' and 'content' fields"),
status=400,
)
try:
entry_content = _normalize_multimodal_content(entry["content"])
except ValueError as exc:
return _multimodal_validation_error(exc, param=f"conversation_history[{i}].content")
conversation_history.append({"role": str(entry["role"]), "content": entry_content})
conversation_history.append({"role": str(entry["role"]), "content": str(entry["content"])})
if previous_response_id:
logger.debug("Both conversation_history and previous_response_id provided; using conversation_history")
@@ -1643,8 +1476,8 @@ class APIServerAdapter(BasePlatformAdapter):
conversation_history.append(msg)
# Last input message is the user_message
user_message: Any = input_messages[-1].get("content", "") if input_messages else ""
if not _content_has_visible_payload(user_message):
user_message = input_messages[-1].get("content", "") if input_messages else ""
if not user_message:
return web.json_response(_openai_error("No user message found in input"), status=400)
# Truncation support
-33
View File
@@ -552,39 +552,6 @@ async def cache_audio_from_url(url: str, ext: str = ".ogg", retries: int = 2) ->
raise last_exc
# ---------------------------------------------------------------------------
# Video cache utilities
#
# Same pattern as image/audio cache -- videos from platforms are downloaded
# here so the agent can reference them by local file path.
# ---------------------------------------------------------------------------
VIDEO_CACHE_DIR = get_hermes_dir("cache/videos", "video_cache")
SUPPORTED_VIDEO_TYPES = {
".mp4": "video/mp4",
".mov": "video/quicktime",
".webm": "video/webm",
".mkv": "video/x-matroska",
".avi": "video/x-msvideo",
}
def get_video_cache_dir() -> Path:
"""Return the video cache directory, creating it if it doesn't exist."""
VIDEO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return VIDEO_CACHE_DIR
def cache_video_from_bytes(data: bytes, ext: str = ".mp4") -> str:
"""Save raw video bytes to the cache and return the absolute file path."""
cache_dir = get_video_cache_dir()
filename = f"video_{uuid.uuid4().hex[:12]}{ext}"
filepath = cache_dir / filename
filepath.write_bytes(data)
return str(filepath)
# ---------------------------------------------------------------------------
# Document cache utilities
#
-34
View File
@@ -71,10 +71,8 @@ from gateway.platforms.base import (
SendResult,
cache_image_from_bytes,
cache_audio_from_bytes,
cache_video_from_bytes,
cache_document_from_bytes,
resolve_proxy_url,
SUPPORTED_VIDEO_TYPES,
SUPPORTED_DOCUMENT_TYPES,
utf16_len,
_prefix_within_utf16_limit,
@@ -2630,23 +2628,6 @@ class TelegramAdapter(BasePlatformAdapter):
except Exception as e:
logger.warning("[Telegram] Failed to cache audio: %s", e, exc_info=True)
elif msg.video:
try:
file_obj = await msg.video.get_file()
video_bytes = await file_obj.download_as_bytearray()
ext = ".mp4"
if getattr(file_obj, "file_path", None):
for candidate in SUPPORTED_VIDEO_TYPES:
if file_obj.file_path.lower().endswith(candidate):
ext = candidate
break
cached_path = cache_video_from_bytes(bytes(video_bytes), ext=ext)
event.media_urls = [cached_path]
event.media_types = [SUPPORTED_VIDEO_TYPES.get(ext, "video/mp4")]
logger.info("[Telegram] Cached user video at %s", cached_path)
except Exception as e:
logger.warning("[Telegram] Failed to cache video: %s", e, exc_info=True)
# Download document files to cache for agent processing
elif msg.document:
doc = msg.document
@@ -2663,21 +2644,6 @@ class TelegramAdapter(BasePlatformAdapter):
mime_to_ext = {v: k for k, v in SUPPORTED_DOCUMENT_TYPES.items()}
ext = mime_to_ext.get(doc.mime_type, "")
if not ext and doc.mime_type:
video_mime_to_ext = {v: k for k, v in SUPPORTED_VIDEO_TYPES.items()}
ext = video_mime_to_ext.get(doc.mime_type, "")
if ext in SUPPORTED_VIDEO_TYPES:
file_obj = await doc.get_file()
video_bytes = await file_obj.download_as_bytearray()
cached_path = cache_video_from_bytes(bytes(video_bytes), ext=ext)
event.media_urls = [cached_path]
event.media_types = [SUPPORTED_VIDEO_TYPES[ext]]
event.message_type = MessageType.VIDEO
logger.info("[Telegram] Cached user video document at %s", cached_path)
await self.handle_message(event)
return
# Check if supported
if ext not in SUPPORTED_DOCUMENT_TYPES:
supported_list = ", ".join(sorted(SUPPORTED_DOCUMENT_TYPES.keys()))
-11
View File
@@ -773,17 +773,6 @@ class WhatsAppAdapter(BasePlatformAdapter):
"""Send a video natively via bridge — plays inline in WhatsApp."""
return await self._send_media_to_bridge(chat_id, video_path, "video", caption)
async def send_voice(
self,
chat_id: str,
audio_path: str,
caption: Optional[str] = None,
reply_to: Optional[str] = None,
**kwargs,
) -> SendResult:
"""Send an audio file as a WhatsApp voice message via bridge."""
return await self._send_media_to_bridge(chat_id, audio_path, "audio", caption)
async def send_document(
self,
chat_id: str,
+7 -26
View File
@@ -1667,32 +1667,12 @@ class GatewayRunner:
notified: set = set()
for session_key in active:
source = None
try:
if getattr(self, "session_store", None) is not None:
self.session_store._ensure_loaded()
entry = self.session_store._entries.get(session_key)
source = getattr(entry, "origin", None) if entry else None
except Exception as e:
logger.debug(
"Failed to load session origin for shutdown notification %s: %s",
session_key,
e,
)
if source is not None:
platform_str = source.platform.value
chat_id = source.chat_id
thread_id = source.thread_id
else:
# Fall back to parsing the session key when no persisted
# origin is available (legacy sessions/tests).
_parsed = _parse_session_key(session_key)
if not _parsed:
continue
platform_str = _parsed["platform"]
chat_id = _parsed["chat_id"]
thread_id = _parsed.get("thread_id")
# Parse platform + chat_id from the session key.
_parsed = _parse_session_key(session_key)
if not _parsed:
continue
platform_str = _parsed["platform"]
chat_id = _parsed["chat_id"]
# Deduplicate: one notification per chat, even if multiple
# sessions (different users/threads) share the same chat.
@@ -1708,6 +1688,7 @@ class GatewayRunner:
# Include thread_id if present so the message lands in the
# correct forum topic / thread.
thread_id = _parsed.get("thread_id")
metadata = {"thread_id": thread_id} if thread_id else None
await adapter.send(chat_id, msg, metadata=metadata)
+1
View File
@@ -24,6 +24,7 @@ _FORWARD_COMPAT_TEMPLATE_MODELS: List[tuple[str, tuple[str, ...]]] = [
("gpt-5.4-mini", ("gpt-5.3-codex", "gpt-5.2-codex")),
("gpt-5.4", ("gpt-5.3-codex", "gpt-5.2-codex")),
("gpt-5.3-codex", ("gpt-5.2-codex",)),
("gpt-5.3-codex-spark", ("gpt-5.3-codex", "gpt-5.2-codex")),
]
+3 -2
View File
@@ -497,8 +497,9 @@ def _collect_gateway_skill_entries(
# --- Tier 1: Plugin slash commands (never trimmed) ---------------------
plugin_pairs: list[tuple[str, str]] = []
try:
from hermes_cli.plugins import get_plugin_commands
plugin_cmds = get_plugin_commands()
from hermes_cli.plugins import get_plugin_manager
pm = get_plugin_manager()
plugin_cmds = getattr(pm, "_plugin_commands", {})
for cmd_name in sorted(plugin_cmds):
name = sanitize_name(cmd_name) if sanitize_name else cmd_name
if not name:
+4 -113
View File
@@ -13,7 +13,6 @@ This module provides:
"""
import copy
import logging
import os
import platform
import re
@@ -25,7 +24,6 @@ from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Any, Optional, List, Tuple
logger = logging.getLogger(__name__)
_IS_WINDOWS = platform.system() == "Windows"
_ENV_VAR_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
@@ -829,7 +827,7 @@ DEFAULT_CONFIG = {
},
# Config schema version - bump this when adding new required fields
"_config_version": 21,
"_config_version": 20,
}
# =============================================================================
@@ -1852,53 +1850,12 @@ def _normalize_custom_provider_entry(
if not isinstance(entry, dict):
return None
# Accept camelCase aliases commonly used in hand-written configs.
_CAMEL_ALIASES: Dict[str, str] = {
"apiKey": "api_key",
"baseUrl": "base_url",
"apiMode": "api_mode",
"keyEnv": "key_env",
"defaultModel": "default_model",
"contextLength": "context_length",
"rateLimitDelay": "rate_limit_delay",
}
_KNOWN_KEYS = {
"name", "api", "url", "base_url", "api_key", "key_env",
"api_mode", "transport", "model", "default_model", "models",
"context_length", "rate_limit_delay",
}
for camel, snake in _CAMEL_ALIASES.items():
if camel in entry and snake not in entry:
logger.warning(
"providers.%s: camelCase key '%s' auto-mapped to '%s' "
"(use snake_case to avoid this warning)",
provider_key or "?", camel, snake,
)
entry[snake] = entry[camel]
unknown = set(entry.keys()) - _KNOWN_KEYS - set(_CAMEL_ALIASES.keys())
if unknown:
logger.warning(
"providers.%s: unknown config keys ignored: %s",
provider_key or "?", ", ".join(sorted(unknown)),
)
from urllib.parse import urlparse
base_url = ""
for url_key in ("base_url", "url", "api"):
for url_key in ("api", "url", "base_url"):
raw_url = entry.get(url_key)
if isinstance(raw_url, str) and raw_url.strip():
candidate = raw_url.strip()
parsed = urlparse(candidate)
if parsed.scheme and parsed.netloc:
base_url = candidate
break
else:
logger.warning(
"providers.%s: '%s' value '%s' is not a valid URL "
"(no scheme or host) — skipped",
provider_key or "?", url_key, candidate,
)
base_url = raw_url.strip()
break
if not base_url:
return None
@@ -2527,72 +2484,6 @@ def migrate_config(interactive: bool = True, quiet: bool = False) -> Dict[str, A
else:
print(" ✓ Removed unused compression.summary_* keys")
# ── Version 20 → 21: plugins are now opt-in; grandfather existing user plugins ──
# The loader now requires plugins to appear in ``plugins.enabled`` before
# loading. Existing installs had all discovered plugins loading by default
# (minus anything in ``plugins.disabled``). To avoid silently breaking
# those setups on upgrade, populate ``plugins.enabled`` with the set of
# currently-installed user plugins that aren't already disabled.
#
# Bundled plugins (shipped in the repo itself) are NOT grandfathered —
# they ship off for everyone, including existing users, so any user who
# wants one has to opt in explicitly.
if current_ver < 21:
config = read_raw_config()
plugins_cfg = config.get("plugins")
if not isinstance(plugins_cfg, dict):
plugins_cfg = {}
# Only migrate if the enabled allow-list hasn't been set yet.
if "enabled" not in plugins_cfg:
disabled = plugins_cfg.get("disabled", []) or []
if not isinstance(disabled, list):
disabled = []
disabled_set = set(disabled)
# Scan ``$HERMES_HOME/plugins/`` for currently installed user plugins.
grandfathered: List[str] = []
try:
from hermes_constants import get_hermes_home as _ghome
user_plugins_dir = _ghome() / "plugins"
if user_plugins_dir.is_dir():
for child in sorted(user_plugins_dir.iterdir()):
if not child.is_dir():
continue
manifest_file = child / "plugin.yaml"
if not manifest_file.exists():
manifest_file = child / "plugin.yml"
if not manifest_file.exists():
continue
try:
with open(manifest_file) as _mf:
manifest = yaml.safe_load(_mf) or {}
except Exception:
manifest = {}
name = manifest.get("name") or child.name
if name in disabled_set:
continue
grandfathered.append(name)
except Exception:
grandfathered = []
plugins_cfg["enabled"] = grandfathered
config["plugins"] = plugins_cfg
save_config(config)
results["config_added"].append(
f"plugins.enabled (opt-in allow-list, {len(grandfathered)} grandfathered)"
)
if not quiet:
if grandfathered:
print(
f" ✓ Plugins now opt-in: grandfathered "
f"{len(grandfathered)} existing plugin(s) into plugins.enabled"
)
else:
print(
" ✓ Plugins now opt-in: no existing plugins to grandfather. "
"Use `hermes plugins enable <name>` to activate."
)
if current_ver < latest_ver and not quiet:
print(f"Config version: {current_ver}{latest_ver}")
-11
View File
@@ -7449,17 +7449,6 @@ Examples:
action="store_true",
help="Remove existing plugin and reinstall",
)
_install_enable_group = plugins_install.add_mutually_exclusive_group()
_install_enable_group.add_argument(
"--enable",
action="store_true",
help="Auto-enable the plugin after install (skip confirmation prompt)",
)
_install_enable_group.add_argument(
"--no-enable",
action="store_true",
help="Install disabled (skip confirmation prompt); enable later with `hermes plugins enable <name>`",
)
plugins_update = plugins_subparsers.add_parser(
"update", help="Pull latest changes for an installed plugin"
+2 -7
View File
@@ -16,12 +16,6 @@ from difflib import get_close_matches
from pathlib import Path
from typing import Any, NamedTuple, Optional
from hermes_cli import __version__ as _HERMES_VERSION
# Identify ourselves so endpoints fronted by Cloudflare's Browser Integrity
# Check (error 1010) don't reject the default ``Python-urllib/*`` signature.
_HERMES_USER_AGENT = f"hermes-cli/{_HERMES_VERSION}"
COPILOT_BASE_URL = "https://api.githubcopilot.com"
COPILOT_MODELS_URL = f"{COPILOT_BASE_URL}/models"
COPILOT_EDITOR_VERSION = "vscode/1.104.1"
@@ -231,6 +225,7 @@ _PROVIDER_MODELS: dict[str, list[str]] = {
"gpt-5.4-pro",
"gpt-5.4",
"gpt-5.3-codex",
"gpt-5.3-codex-spark",
"gpt-5.2",
"gpt-5.2-codex",
"gpt-5.1",
@@ -1774,7 +1769,7 @@ def probe_api_models(
candidates.append((alternate_base, True))
tried: list[str] = []
headers: dict[str, str] = {"User-Agent": _HERMES_USER_AGENT}
headers: dict[str, str] = {}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
if normalized.startswith(COPILOT_BASE_URL):
+16 -116
View File
@@ -2,20 +2,14 @@
Hermes Plugin System
====================
Discovers, loads, and manages plugins from four sources:
Discovers, loads, and manages plugins from three sources:
1. **Bundled plugins** ``<repo>/plugins/<name>/`` (shipped with hermes-agent;
``memory/`` and ``context_engine/`` subdirs are excluded they have their
own discovery paths)
2. **User plugins** ``~/.hermes/plugins/<name>/``
3. **Project plugins** ``./.hermes/plugins/<name>/`` (opt-in via
1. **User plugins** ``~/.hermes/plugins/<name>/``
2. **Project plugins** ``./.hermes/plugins/<name>/`` (opt-in via
``HERMES_ENABLE_PROJECT_PLUGINS``)
4. **Pip plugins** packages that expose the ``hermes_agent.plugins``
3. **Pip plugins** packages that expose the ``hermes_agent.plugins``
entry-point group.
Later sources override earlier ones on name collision, so a user or project
plugin with the same name as a bundled plugin replaces it.
Each directory plugin must contain a ``plugin.yaml`` manifest **and** an
``__init__.py`` with a ``register(ctx)`` function.
@@ -83,12 +77,7 @@ def _env_enabled(name: str) -> bool:
def _get_disabled_plugins() -> set:
"""Read the disabled plugins list from config.yaml.
Kept for backward compat and explicit deny-list semantics. A plugin
name in this set will never load, even if it appears in
``plugins.enabled``.
"""
"""Read the disabled plugins list from config.yaml."""
try:
from hermes_cli.config import load_config
config = load_config()
@@ -98,36 +87,6 @@ def _get_disabled_plugins() -> set:
return set()
def _get_enabled_plugins() -> Optional[set]:
"""Read the enabled-plugins allow-list from config.yaml.
Plugins are opt-in by default only plugins whose name appears in
this set are loaded. Returns:
* ``None`` the key is missing or malformed. Callers should treat
this as "nothing enabled yet" (the opt-in default); the first
``migrate_config`` run populates the key with a grandfathered set
of currently-installed user plugins so existing setups don't
break on upgrade.
* ``set()`` an empty list was explicitly set; nothing loads.
* ``set(...)`` the concrete allow-list.
"""
try:
from hermes_cli.config import load_config
config = load_config()
plugins_cfg = config.get("plugins")
if not isinstance(plugins_cfg, dict):
return None
if "enabled" not in plugins_cfg:
return None
enabled = plugins_cfg.get("enabled")
if not isinstance(enabled, list):
return None
return set(enabled)
except Exception:
return None
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@@ -463,66 +422,27 @@ class PluginManager:
manifests: List[PluginManifest] = []
# 1. Bundled plugins (<repo>/plugins/<name>/)
# Repo-shipped generic plugins live next to hermes_cli/. Memory and
# context_engine subdirs are handled by their own discovery paths, so
# skip those names here. Bundled plugins are discovered (so they
# show up in `hermes plugins`) but only loaded when added to
# `plugins.enabled` in config.yaml — opt-in like any other plugin.
repo_plugins = Path(__file__).resolve().parent.parent / "plugins"
manifests.extend(
self._scan_directory(
repo_plugins,
source="bundled",
skip_names={"memory", "context_engine"},
)
)
# 2. User plugins (~/.hermes/plugins/)
# 1. User plugins (~/.hermes/plugins/)
user_dir = get_hermes_home() / "plugins"
manifests.extend(self._scan_directory(user_dir, source="user"))
# 3. Project plugins (./.hermes/plugins/)
# 2. Project plugins (./.hermes/plugins/)
if _env_enabled("HERMES_ENABLE_PROJECT_PLUGINS"):
project_dir = Path.cwd() / ".hermes" / "plugins"
manifests.extend(self._scan_directory(project_dir, source="project"))
# 4. Pip / entry-point plugins
# 3. Pip / entry-point plugins
manifests.extend(self._scan_entry_points())
# Load each manifest (skip user-disabled plugins).
# Later sources override earlier ones on name collision — user plugins
# take precedence over bundled, project plugins take precedence over
# user. Dedup here so we only load the final winner.
# Load each manifest (skip user-disabled plugins)
disabled = _get_disabled_plugins()
enabled = _get_enabled_plugins() # None = opt-in default (nothing enabled)
winners: Dict[str, PluginManifest] = {}
for manifest in manifests:
winners[manifest.name] = manifest
for manifest in winners.values():
# Explicit disable always wins.
if manifest.name in disabled:
loaded = LoadedPlugin(manifest=manifest, enabled=False)
loaded.error = "disabled via config"
self._plugins[manifest.name] = loaded
logger.debug("Skipping disabled plugin '%s'", manifest.name)
continue
# Opt-in gate: plugins must be in the enabled allow-list.
# If the allow-list is missing (None), treat as "nothing enabled"
# — users have to explicitly enable plugins to load them.
# Memory and context_engine providers are excluded from this gate
# since they have their own single-select config (memory.provider
# / context.engine), not the enabled list.
if enabled is None or manifest.name not in enabled:
loaded = LoadedPlugin(manifest=manifest, enabled=False)
loaded.error = "not enabled in config (run `hermes plugins enable {}` to activate)".format(
manifest.name
)
self._plugins[manifest.name] = loaded
logger.debug(
"Skipping '%s' (not in plugins.enabled)", manifest.name
)
continue
self._load_plugin(manifest)
if manifests:
@@ -536,18 +456,8 @@ class PluginManager:
# Directory scanning
# -----------------------------------------------------------------------
def _scan_directory(
self,
path: Path,
source: str,
skip_names: Optional[Set[str]] = None,
) -> List[PluginManifest]:
"""Read ``plugin.yaml`` manifests from subdirectories of *path*.
*skip_names* is an optional allow-list of names to ignore (used
for the bundled scan to exclude ``memory`` / ``context_engine``
subdirs that have their own discovery path).
"""
def _scan_directory(self, path: Path, source: str) -> List[PluginManifest]:
"""Read ``plugin.yaml`` manifests from subdirectories of *path*."""
manifests: List[PluginManifest] = []
if not path.is_dir():
return manifests
@@ -555,8 +465,6 @@ class PluginManager:
for child in sorted(path.iterdir()):
if not child.is_dir():
continue
if skip_names and child.name in skip_names:
continue
manifest_file = child / "plugin.yaml"
if not manifest_file.exists():
manifest_file = child / "plugin.yml"
@@ -624,7 +532,7 @@ class PluginManager:
loaded = LoadedPlugin(manifest=manifest)
try:
if manifest.source in ("user", "project", "bundled"):
if manifest.source in ("user", "project"):
module = self._load_directory_module(manifest)
else:
module = self._load_entrypoint_module(manifest)
@@ -873,31 +781,23 @@ def get_pre_tool_call_block_message(
return None
def _ensure_plugins_discovered() -> PluginManager:
"""Return the global manager after running idempotent plugin discovery."""
manager = get_plugin_manager()
manager.discover_and_load()
return manager
def get_plugin_context_engine():
"""Return the plugin-registered context engine, or None."""
return _ensure_plugins_discovered()._context_engine
return get_plugin_manager()._context_engine
def get_plugin_command_handler(name: str) -> Optional[Callable]:
"""Return the handler for a plugin-registered slash command, or ``None``."""
entry = _ensure_plugins_discovered()._plugin_commands.get(name)
entry = get_plugin_manager()._plugin_commands.get(name)
return entry["handler"] if entry else None
def get_plugin_commands() -> Dict[str, dict]:
"""Return the full plugin commands dict (name → {handler, description, plugin}).
Triggers idempotent plugin discovery so callers can use plugin commands
before any explicit discover_plugins() call.
Safe to call before discovery returns an empty dict if no plugins loaded.
"""
return _ensure_plugins_discovered()._plugin_commands
return get_plugin_manager()._plugin_commands
def get_plugin_toolsets() -> List[tuple]:
+93 -245
View File
@@ -15,7 +15,6 @@ import shutil
import subprocess
import sys
from pathlib import Path
from typing import Optional
from hermes_constants import get_hermes_home
@@ -282,16 +281,8 @@ def _require_installed_plugin(name: str, plugins_dir: Path, console) -> Path:
# ---------------------------------------------------------------------------
def cmd_install(
identifier: str,
force: bool = False,
enable: Optional[bool] = None,
) -> None:
"""Install a plugin from a Git URL or owner/repo shorthand.
After install, prompt "Enable now? [y/N]" unless *enable* is provided
(True = auto-enable without prompting, False = install disabled).
"""
def cmd_install(identifier: str, force: bool = False) -> None:
"""Install a plugin from a Git URL or owner/repo shorthand."""
import tempfile
from rich.console import Console
@@ -400,40 +391,6 @@ def cmd_install(
_display_after_install(target, identifier)
# Determine the canonical plugin name for enable-list bookkeeping.
installed_name = installed_manifest.get("name") or target.name
# Decide whether to enable: explicit flag > interactive prompt > default off
should_enable = enable
if should_enable is None:
# Interactive prompt unless stdin isn't a TTY (scripted install).
if sys.stdin.isatty() and sys.stdout.isatty():
try:
answer = input(
f" Enable '{installed_name}' now? [y/N]: "
).strip().lower()
should_enable = answer in ("y", "yes")
except (EOFError, KeyboardInterrupt):
should_enable = False
else:
should_enable = False
if should_enable:
enabled = _get_enabled_set()
disabled = _get_disabled_set()
enabled.add(installed_name)
disabled.discard(installed_name)
_save_enabled_set(enabled)
_save_disabled_set(disabled)
console.print(
f"[green]✓[/green] Plugin [bold]{installed_name}[/bold] enabled."
)
else:
console.print(
f"[dim]Plugin installed but not enabled. "
f"Run `hermes plugins enable {installed_name}` to activate.[/dim]"
)
console.print("[dim]Restart the gateway for the plugin to take effect:[/dim]")
console.print("[dim] hermes gateway restart[/dim]")
console.print()
@@ -511,11 +468,7 @@ def cmd_remove(name: str) -> None:
def _get_disabled_set() -> set:
"""Read the disabled plugins set from config.yaml.
An explicit deny-list. A plugin name here never loads, even if also
listed in ``plugins.enabled``.
"""
"""Read the disabled plugins set from config.yaml."""
try:
from hermes_cli.config import load_config
config = load_config()
@@ -535,196 +488,103 @@ def _save_disabled_set(disabled: set) -> None:
save_config(config)
def _get_enabled_set() -> set:
"""Read the enabled plugins allow-list from config.yaml.
Plugins are opt-in: only names here are loaded. Returns ``set()`` if
the key is missing (same behaviour as "nothing enabled yet").
"""
try:
from hermes_cli.config import load_config
config = load_config()
plugins_cfg = config.get("plugins", {})
if not isinstance(plugins_cfg, dict):
return set()
enabled = plugins_cfg.get("enabled", [])
return set(enabled) if isinstance(enabled, list) else set()
except Exception:
return set()
def _save_enabled_set(enabled: set) -> None:
"""Write the enabled plugins list to config.yaml."""
from hermes_cli.config import load_config, save_config
config = load_config()
if "plugins" not in config:
config["plugins"] = {}
config["plugins"]["enabled"] = sorted(enabled)
save_config(config)
def cmd_enable(name: str) -> None:
"""Add a plugin to the enabled allow-list (and remove it from disabled)."""
"""Enable a previously disabled plugin."""
from rich.console import Console
console = Console()
# Discover the plugin — check installed (user) AND bundled.
if not _plugin_exists(name):
console.print(f"[red]Plugin '{name}' is not installed or bundled.[/red]")
plugins_dir = _plugins_dir()
# Verify the plugin exists
target = plugins_dir / name
if not target.is_dir():
console.print(f"[red]Plugin '{name}' is not installed.[/red]")
sys.exit(1)
enabled = _get_enabled_set()
disabled = _get_disabled_set()
if name in enabled and name not in disabled:
if name not in disabled:
console.print(f"[dim]Plugin '{name}' is already enabled.[/dim]")
return
enabled.add(name)
disabled.discard(name)
_save_enabled_set(enabled)
_save_disabled_set(disabled)
console.print(
f"[green]✓[/green] Plugin [bold]{name}[/bold] enabled. "
"Takes effect on next session."
)
console.print(f"[green]✓[/green] Plugin [bold]{name}[/bold] enabled. Takes effect on next session.")
def cmd_disable(name: str) -> None:
"""Remove a plugin from the enabled allow-list (and add to disabled)."""
"""Disable a plugin without removing it."""
from rich.console import Console
console = Console()
if not _plugin_exists(name):
console.print(f"[red]Plugin '{name}' is not installed or bundled.[/red]")
plugins_dir = _plugins_dir()
# Verify the plugin exists
target = plugins_dir / name
if not target.is_dir():
console.print(f"[red]Plugin '{name}' is not installed.[/red]")
sys.exit(1)
enabled = _get_enabled_set()
disabled = _get_disabled_set()
if name not in enabled and name in disabled:
if name in disabled:
console.print(f"[dim]Plugin '{name}' is already disabled.[/dim]")
return
enabled.discard(name)
disabled.add(name)
_save_enabled_set(enabled)
_save_disabled_set(disabled)
console.print(
f"[yellow]\u2298[/yellow] Plugin [bold]{name}[/bold] disabled. "
"Takes effect on next session."
)
console.print(f"[yellow]\u2298[/yellow] Plugin [bold]{name}[/bold] disabled. Takes effect on next session.")
def _plugin_exists(name: str) -> bool:
"""Return True if a plugin with *name* is installed (user) or bundled."""
# Installed: directory name or manifest name match in user plugins dir
user_dir = _plugins_dir()
if user_dir.is_dir():
if (user_dir / name).is_dir():
return True
for child in user_dir.iterdir():
if not child.is_dir():
continue
manifest = _read_manifest(child)
if manifest.get("name") == name:
return True
# Bundled: <repo>/plugins/<name>/
from pathlib import Path as _P
import hermes_cli
repo_plugins = _P(hermes_cli.__file__).resolve().parent.parent / "plugins"
if repo_plugins.is_dir():
candidate = repo_plugins / name
if candidate.is_dir() and (
(candidate / "plugin.yaml").exists()
or (candidate / "plugin.yml").exists()
):
return True
return False
def cmd_list() -> None:
"""List installed plugins."""
from rich.console import Console
from rich.table import Table
def _discover_all_plugins() -> list:
"""Return a list of (name, version, description, source, dir_path) for
every plugin the loader can see user + bundled + project.
Matches the ordering/dedup of ``PluginManager.discover_and_load``:
bundled first, then user, then project; user overrides bundled on
name collision.
"""
try:
import yaml
except ImportError:
yaml = None
seen: dict = {} # name -> (name, version, description, source, path)
# Bundled (<repo>/plugins/<name>/), excluding memory/ and context_engine/
import hermes_cli
repo_plugins = Path(hermes_cli.__file__).resolve().parent.parent / "plugins"
for base, source in ((repo_plugins, "bundled"), (_plugins_dir(), "user")):
if not base.is_dir():
continue
for d in sorted(base.iterdir()):
if not d.is_dir():
continue
if source == "bundled" and d.name in ("memory", "context_engine"):
continue
manifest_file = d / "plugin.yaml"
if not manifest_file.exists():
manifest_file = d / "plugin.yml"
if not manifest_file.exists():
continue
name = d.name
version = ""
description = ""
if yaml:
try:
with open(manifest_file) as f:
manifest = yaml.safe_load(f) or {}
name = manifest.get("name", d.name)
version = manifest.get("version", "")
description = manifest.get("description", "")
except Exception:
pass
# User plugins override bundled on name collision.
if name in seen and source == "bundled":
continue
src_label = source
if source == "user" and (d / ".git").exists():
src_label = "git"
seen[name] = (name, version, description, src_label, d)
return list(seen.values())
def cmd_list() -> None:
"""List all plugins (bundled + user) with enabled/disabled state."""
from rich.console import Console
from rich.table import Table
console = Console()
entries = _discover_all_plugins()
if not entries:
plugins_dir = _plugins_dir()
dirs = sorted(d for d in plugins_dir.iterdir() if d.is_dir())
if not dirs:
console.print("[dim]No plugins installed.[/dim]")
console.print("[dim]Install with:[/dim] hermes plugins install owner/repo")
return
enabled = _get_enabled_set()
disabled = _get_disabled_set()
table = Table(title="Plugins", show_lines=False)
table = Table(title="Installed Plugins", show_lines=False)
table.add_column("Name", style="bold")
table.add_column("Status")
table.add_column("Version", style="dim")
table.add_column("Description")
table.add_column("Source", style="dim")
for name, version, description, source, _dir in entries:
if name in disabled:
status = "[red]disabled[/red]"
elif name in enabled:
status = "[green]enabled[/green]"
else:
status = "[yellow]not enabled[/yellow]"
for d in dirs:
manifest_file = d / "plugin.yaml"
name = d.name
version = ""
description = ""
source = "local"
if manifest_file.exists() and yaml:
try:
with open(manifest_file) as f:
manifest = yaml.safe_load(f) or {}
name = manifest.get("name", d.name)
version = manifest.get("version", "")
description = manifest.get("description", "")
except Exception:
pass
# Check if it's a git repo (installed via hermes plugins install)
if (d / ".git").exists():
source = "git"
is_disabled = name in disabled or d.name in disabled
status = "[red]disabled[/red]" if is_disabled else "[green]enabled[/green]"
table.add_row(name, status, str(version), description, source)
console.print()
@@ -732,7 +592,6 @@ def cmd_list() -> None:
console.print()
console.print("[dim]Interactive toggle:[/dim] hermes plugins")
console.print("[dim]Enable/disable:[/dim] hermes plugins enable/disable <name>")
console.print("[dim]Plugins are opt-in by default — only 'enabled' plugins load.[/dim]")
# ---------------------------------------------------------------------------
@@ -883,25 +742,41 @@ def cmd_toggle() -> None:
"""Interactive composite UI — general plugins + provider plugin categories."""
from rich.console import Console
console = Console()
try:
import yaml
except ImportError:
yaml = None
# -- General plugins discovery (bundled + user) --
entries = _discover_all_plugins()
enabled_set = _get_enabled_set()
disabled_set = _get_disabled_set()
console = Console()
plugins_dir = _plugins_dir()
# -- General plugins discovery --
dirs = sorted(d for d in plugins_dir.iterdir() if d.is_dir())
disabled = _get_disabled_set()
plugin_names = []
plugin_labels = []
plugin_selected = set()
for i, (name, _version, description, source, _d) in enumerate(entries):
label = f"{name} \u2014 {description}" if description else name
if source == "bundled":
label = f"{label} [bundled]"
for i, d in enumerate(dirs):
manifest_file = d / "plugin.yaml"
name = d.name
description = ""
if manifest_file.exists() and yaml:
try:
with open(manifest_file) as f:
manifest = yaml.safe_load(f) or {}
name = manifest.get("name", d.name)
description = manifest.get("description", "")
except Exception:
pass
plugin_names.append(name)
label = f"{name} \u2014 {description}" if description else name
plugin_labels.append(label)
# Selected (enabled) when in enabled-set AND not in disabled-set
if name in enabled_set and name not in disabled_set:
if name not in disabled and d.name not in disabled:
plugin_selected.add(i)
# -- Provider categories --
@@ -929,10 +804,10 @@ def cmd_toggle() -> None:
try:
import curses
_run_composite_ui(curses, plugin_names, plugin_labels, plugin_selected,
disabled_set, categories, console)
disabled, categories, console)
except ImportError:
_run_composite_fallback(plugin_names, plugin_labels, plugin_selected,
disabled_set, categories, console)
disabled, categories, console)
def _run_composite_ui(curses, plugin_names, plugin_labels, plugin_selected,
@@ -1145,29 +1020,18 @@ def _run_composite_ui(curses, plugin_names, plugin_labels, plugin_selected,
curses.wrapper(_draw)
flush_stdin()
# Persist general plugin changes. The new allow-list is the set of
# plugin names that were checked; anything not checked is explicitly
# disabled (written to disabled-list) so it remains off even if the
# plugin code does something clever like auto-enable in the future.
new_enabled: set = set()
new_disabled: set = set(disabled) # preserve existing disabled state for unseen plugins
# Persist general plugin changes
new_disabled = set()
for i, name in enumerate(plugin_names):
if i in chosen:
new_enabled.add(name)
new_disabled.discard(name)
else:
if i not in chosen:
new_disabled.add(name)
prev_enabled = _get_enabled_set()
enabled_changed = new_enabled != prev_enabled
disabled_changed = new_disabled != disabled
if enabled_changed or disabled_changed:
_save_enabled_set(new_enabled)
if new_disabled != disabled:
_save_disabled_set(new_disabled)
enabled_count = len(plugin_names) - len(new_disabled)
console.print(
f"\n[green]\u2713[/green] General plugins: {len(new_enabled)} enabled, "
f"{len(plugin_names) - len(new_enabled)} disabled."
f"\n[green]\u2713[/green] General plugins: {enabled_count} enabled, "
f"{len(new_disabled)} disabled."
)
elif n_plugins > 0:
console.print("\n[dim]General plugins unchanged.[/dim]")
@@ -1214,17 +1078,11 @@ def _run_composite_fallback(plugin_names, plugin_labels, plugin_selected,
return
print()
new_enabled: set = set()
new_disabled: set = set(disabled)
new_disabled = set()
for i, name in enumerate(plugin_names):
if i in chosen:
new_enabled.add(name)
new_disabled.discard(name)
else:
if i not in chosen:
new_disabled.add(name)
prev_enabled = _get_enabled_set()
if new_enabled != prev_enabled or new_disabled != disabled:
_save_enabled_set(new_enabled)
if new_disabled != disabled:
_save_disabled_set(new_disabled)
# Provider categories
@@ -1250,17 +1108,7 @@ def plugins_command(args) -> None:
action = getattr(args, "plugins_action", None)
if action == "install":
# Map argparse tri-state: --enable=True, --no-enable=False, neither=None (prompt)
enable_arg = None
if getattr(args, "enable", False):
enable_arg = True
elif getattr(args, "no_enable", False):
enable_arg = False
cmd_install(
args.identifier,
force=getattr(args, "force", False),
enable=enable_arg,
)
cmd_install(args.identifier, force=getattr(args, "force", False))
elif action == "update":
cmd_update(args.name)
elif action in ("remove", "rm", "uninstall"):
+1 -19
View File
@@ -1958,8 +1958,6 @@ async def update_config_raw(body: RawConfigUpdate):
@app.get("/api/analytics/usage")
async def get_usage_analytics(days: int = 30):
from hermes_state import SessionDB
from agent.insights import InsightsEngine
db = SessionDB()
try:
cutoff = time.time() - (days * 86400)
@@ -1999,24 +1997,8 @@ async def get_usage_analytics(days: int = 30):
FROM sessions WHERE started_at > ?
""", (cutoff,))
totals = dict(cur3.fetchone())
insights_report = InsightsEngine(db).generate(days=days)
skills = insights_report.get("skills", {
"summary": {
"total_skill_loads": 0,
"total_skill_edits": 0,
"total_skill_actions": 0,
"distinct_skills_used": 0,
},
"top_skills": [],
})
return {
"daily": daily,
"by_model": by_model,
"totals": totals,
"period_days": days,
"skills": skills,
}
return {"daily": daily, "by_model": by_model, "totals": totals, "period_days": days}
finally:
db.close()
+4 -31
View File
@@ -1249,37 +1249,10 @@ class SessionDB:
try:
with self._lock:
ctx_cursor = self._conn.execute(
"""WITH target AS (
SELECT session_id, timestamp, id
FROM messages
WHERE id = ?
)
SELECT role, content
FROM (
SELECT m.id, m.timestamp, m.role, m.content
FROM messages m
JOIN target t ON t.session_id = m.session_id
WHERE (m.timestamp < t.timestamp)
OR (m.timestamp = t.timestamp AND m.id < t.id)
ORDER BY m.timestamp DESC, m.id DESC
LIMIT 1
)
UNION ALL
SELECT role, content
FROM messages
WHERE id = ?
UNION ALL
SELECT role, content
FROM (
SELECT m.id, m.timestamp, m.role, m.content
FROM messages m
JOIN target t ON t.session_id = m.session_id
WHERE (m.timestamp > t.timestamp)
OR (m.timestamp = t.timestamp AND m.id > t.id)
ORDER BY m.timestamp ASC, m.id ASC
LIMIT 1
)""",
(match["id"], match["id"]),
"""SELECT role, content FROM messages
WHERE session_id = ? AND id >= ? - 1 AND id <= ? + 1
ORDER BY id""",
(match["session_id"], match["id"], match["id"]),
)
context_msgs = [
{"role": r["role"], "content": (r["content"] or "")[:200]}
@@ -57,32 +57,32 @@ Use the `ddgs` command via `terminal` when it exists. This is the preferred path
```bash
# Text search
ddgs text -q "python async programming" -m 5
ddgs text -k "python async programming" -m 5
# News search
ddgs news -q "artificial intelligence" -m 5
ddgs news -k "artificial intelligence" -m 5
# Image search
ddgs images -q "landscape photography" -m 10
ddgs images -k "landscape photography" -m 10
# Video search
ddgs videos -q "python tutorial" -m 5
ddgs videos -k "python tutorial" -m 5
# With region filter
ddgs text -q "best restaurants" -m 5 -r us-en
ddgs text -k "best restaurants" -m 5 -r us-en
# Recent results only (d=day, w=week, m=month, y=year)
ddgs text -q "latest AI news" -m 5 -t w
ddgs text -k "latest AI news" -m 5 -t w
# JSON output for parsing
ddgs text -q "fastapi tutorial" -m 5 -o json
ddgs text -k "fastapi tutorial" -m 5 -o json
```
### CLI Flags
| Flag | Description | Example |
|------|-------------|---------|
| `-q` | Query — **required** | `-q "search terms"` |
| `-k` | Keywords (query)**required** | `-k "search terms"` |
| `-m` | Max results | `-m 5` |
| `-r` | Region | `-r us-en` |
| `-t` | Time limit | `-t w` (week) |
@@ -189,7 +189,7 @@ DuckDuckGo returns titles, URLs, and snippets — not full page content. To get
CLI example:
```bash
ddgs text -q "fastapi deployment guide" -m 3 -o json
ddgs text -k "fastapi deployment guide" -m 3 -o json
```
Python example, only after verifying `ddgs` is installed in that runtime:
@@ -229,7 +229,7 @@ Then extract the best URL with `web_extract` or another content-retrieval tool.
- **Do not assume the CLI exists**: Check `command -v ddgs` before using it.
- **Do not assume `execute_code` can import `ddgs`**: `from ddgs import DDGS` may fail with `ModuleNotFoundError` unless that runtime was prepared separately.
- **Package name**: The package is `ddgs` (previously `duckduckgo-search`). Install with `pip install ddgs`.
- **Don't confuse `-q` and `-m`** (CLI): `-q` is for the query, `-m` is for max results count.
- **Don't confuse `-k` and `-m`** (CLI): `-k` is for keywords, `-m` is for max results count.
- **Empty results**: If `ddgs` returns nothing, it may be rate-limited. Wait a few seconds and retry.
## Validated With
@@ -25,4 +25,4 @@ if ! command -v ddgs &> /dev/null; then
exit 1
fi
ddgs text -q "$QUERY" -m "$MAX_RESULTS"
ddgs text -k "$QUERY" -m "$MAX_RESULTS"
-51
View File
@@ -1,51 +0,0 @@
# disk-cleanup
Auto-tracks and cleans up ephemeral files created during Hermes Agent
sessions — test scripts, temp outputs, cron logs, stale chrome profiles.
Scoped strictly to `$HERMES_HOME` and `/tmp/hermes-*`.
Originally contributed by [@LVT382009](https://github.com/LVT382009) as a
skill in PR #12212. Ported to the plugin system so the behaviour runs
automatically via `post_tool_call` and `on_session_end` hooks — the agent
never needs to remember to call a tool.
## How it works
| Hook | Behaviour |
|---|---|
| `post_tool_call` | When `write_file` / `terminal` / `patch` creates a file matching `test_*`, `tmp_*`, or `*.test.*` inside `HERMES_HOME`, track it silently as `test` / `temp` / `cron-output`. |
| `on_session_end` | If any test files were auto-tracked during this turn, run `quick` cleanup (no prompts). |
Deletion rules (same as the original PR):
| Category | Threshold | Confirmation |
|---|---|---|
| `test` | every session end | Never |
| `temp` | >7 days since tracked | Never |
| `cron-output` | >14 days since tracked | Never |
| empty dirs under HERMES_HOME | always | Never |
| `research` | >30 days, beyond 10 newest | Always (deep only) |
| `chrome-profile` | >14 days since tracked | Always (deep only) |
| files >500 MB | never auto | Always (deep only) |
## Slash command
```
/disk-cleanup status # breakdown + top-10 largest
/disk-cleanup dry-run # preview without deleting
/disk-cleanup quick # run safe cleanup now
/disk-cleanup deep # quick + list items needing prompt
/disk-cleanup track <path> <category> # manual tracking
/disk-cleanup forget <path> # stop tracking
```
## Safety
- `is_safe_path()` rejects anything outside `HERMES_HOME` or `/tmp/hermes-*`
- Windows mounts (`/mnt/c` etc.) are rejected
- The state directory `$HERMES_HOME/disk-cleanup/` is itself excluded
- `$HERMES_HOME/logs/`, `memories/`, `sessions/`, `skills/`, `plugins/`,
and config files are never tracked
- Backup/restore is scoped to `tracked.json` — the plugin never touches
agent logs
- Atomic writes: `.tmp` → backup → rename
-316
View File
@@ -1,316 +0,0 @@
"""disk-cleanup plugin — auto-cleanup of ephemeral Hermes session files.
Wires three behaviours:
1. ``post_tool_call`` hook inspects ``write_file`` and ``terminal``
tool results for newly-created paths matching test/temp patterns
under ``HERMES_HOME`` and tracks them silently. Zero agent
compliance required.
2. ``on_session_end`` hook when any test files were auto-tracked
during the just-finished turn, runs :func:`disk_cleanup.quick` and
logs a single line to ``$HERMES_HOME/disk-cleanup/cleanup.log``.
3. ``/disk-cleanup`` slash command manual ``status``, ``dry-run``,
``quick``, ``deep``, ``track``, ``forget``.
Replaces PR #12212's skill-plus-script design: the agent no longer
needs to remember to run commands.
"""
from __future__ import annotations
import logging
import re
import shlex
import threading
from pathlib import Path
from typing import Any, Dict, Optional, Set
from . import disk_cleanup as dg
logger = logging.getLogger(__name__)
# Per-task set of "test files newly tracked this turn". Keyed by task_id
# (or session_id as fallback) so on_session_end can decide whether to run
# cleanup. Guarded by a lock — post_tool_call can fire concurrently on
# parallel tool calls.
_recent_test_tracks: Dict[str, Set[str]] = {}
_lock = threading.Lock()
# Tool-call result shapes we can parse
_WRITE_FILE_PATH_KEY = "path"
_TERMINAL_PATH_REGEX = re.compile(r"(?:^|\s)(/[^\s'\"`]+|\~/[^\s'\"`]+)")
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _tracker_key(task_id: str, session_id: str) -> str:
return task_id or session_id or "default"
def _record_track(task_id: str, session_id: str, path: Path, category: str) -> None:
"""Record that we tracked *path* as *category* during this turn."""
if category != "test":
return
key = _tracker_key(task_id, session_id)
with _lock:
_recent_test_tracks.setdefault(key, set()).add(str(path))
def _drain(task_id: str, session_id: str) -> Set[str]:
"""Pop the set of test paths tracked during this turn."""
key = _tracker_key(task_id, session_id)
with _lock:
return _recent_test_tracks.pop(key, set())
def _attempt_track(path_str: str, task_id: str, session_id: str) -> None:
"""Best-effort auto-track. Never raises."""
try:
p = Path(path_str).expanduser()
except Exception:
return
if not p.exists():
return
category = dg.guess_category(p)
if category is None:
return
newly = dg.track(str(p), category, silent=True)
if newly:
_record_track(task_id, session_id, p, category)
def _extract_paths_from_write_file(args: Dict[str, Any]) -> Set[str]:
path = args.get(_WRITE_FILE_PATH_KEY)
return {path} if isinstance(path, str) and path else set()
def _extract_paths_from_patch(args: Dict[str, Any]) -> Set[str]:
# The patch tool creates new files via the `mode="patch"` path too, but
# most of its use is editing existing files — we only care about new
# ephemeral creations, so treat patch conservatively and only pick up
# the single-file `path` arg. Track-then-cleanup is idempotent, so
# re-tracking an already-tracked file is a no-op (dedup in track()).
path = args.get("path")
return {path} if isinstance(path, str) and path else set()
def _extract_paths_from_terminal(args: Dict[str, Any], result: str) -> Set[str]:
"""Best-effort: pull candidate filesystem paths from a terminal command
and its output, then let ``guess_category`` / ``is_safe_path`` filter.
"""
paths: Set[str] = set()
cmd = args.get("command") or ""
if isinstance(cmd, str) and cmd:
# Tokenise the command — catches `touch /tmp/hermes-x/test_foo.py`
try:
for tok in shlex.split(cmd, posix=True):
if tok.startswith(("/", "~")):
paths.add(tok)
except ValueError:
pass
# Only scan the result text if it's a reasonable size (avoid 50KB dumps).
if isinstance(result, str) and len(result) < 4096:
for match in _TERMINAL_PATH_REGEX.findall(result):
paths.add(match)
return paths
# ---------------------------------------------------------------------------
# Hooks
# ---------------------------------------------------------------------------
def _on_post_tool_call(
tool_name: str = "",
args: Optional[Dict[str, Any]] = None,
result: Any = None,
task_id: str = "",
session_id: str = "",
tool_call_id: str = "",
**_: Any,
) -> None:
"""Auto-track ephemeral files created by recent tool calls."""
if not isinstance(args, dict):
return
candidates: Set[str] = set()
if tool_name == "write_file":
candidates = _extract_paths_from_write_file(args)
elif tool_name == "patch":
candidates = _extract_paths_from_patch(args)
elif tool_name == "terminal":
candidates = _extract_paths_from_terminal(args, result if isinstance(result, str) else "")
else:
return
for path_str in candidates:
_attempt_track(path_str, task_id, session_id)
def _on_session_end(
session_id: str = "",
completed: bool = True,
interrupted: bool = False,
**_: Any,
) -> None:
"""Run quick cleanup if any test files were tracked during this turn."""
# Drain both task-level and session-level buckets. In practice only one
# is populated per turn; the other is empty.
drained_session = _drain("", session_id)
# Also drain any task-scoped buckets that happen to exist. This is a
# cheap sweep: if an agent spawned subagents (each with their own
# task_id) they'll have recorded into separate buckets; we want to
# cleanup them all at session end.
with _lock:
task_buckets = list(_recent_test_tracks.keys())
for key in task_buckets:
if key and key != session_id:
_recent_test_tracks.pop(key, None)
if not drained_session and not task_buckets:
return
try:
summary = dg.quick()
except Exception as exc:
logger.debug("disk-cleanup quick cleanup failed: %s", exc)
return
if summary["deleted"] or summary["empty_dirs"]:
dg._log(
f"AUTO_QUICK (session_end): deleted={summary['deleted']} "
f"dirs={summary['empty_dirs']} freed={dg.fmt_size(summary['freed'])}"
)
# ---------------------------------------------------------------------------
# Slash command
# ---------------------------------------------------------------------------
_HELP_TEXT = """\
/disk-cleanup ephemeral-file cleanup
Subcommands:
status Per-category breakdown + top-10 largest
dry-run Preview what quick/deep would delete
quick Run safe cleanup now (no prompts)
deep Run quick, then list items that need prompts
track <path> <category> Manually add a path to tracking
forget <path> Stop tracking a path (does not delete)
Categories: temp | test | research | download | chrome-profile | cron-output | other
All operations are scoped to HERMES_HOME and /tmp/hermes-*.
Test files are auto-tracked on write_file / terminal and auto-cleaned at session end.
"""
def _fmt_summary(summary: Dict[str, Any]) -> str:
base = (
f"[disk-cleanup] Cleaned {summary['deleted']} files + "
f"{summary['empty_dirs']} empty dirs, freed {dg.fmt_size(summary['freed'])}."
)
if summary.get("errors"):
base += f"\n {len(summary['errors'])} error(s); see cleanup.log."
return base
def _handle_slash(raw_args: str) -> Optional[str]:
argv = raw_args.strip().split()
if not argv or argv[0] in ("help", "-h", "--help"):
return _HELP_TEXT
sub = argv[0]
if sub == "status":
return dg.format_status(dg.status())
if sub == "dry-run":
auto, prompt = dg.dry_run()
auto_size = sum(i["size"] for i in auto)
prompt_size = sum(i["size"] for i in prompt)
lines = [
"Dry-run preview (nothing deleted):",
f" Auto-delete : {len(auto)} files ({dg.fmt_size(auto_size)})",
]
for item in auto:
lines.append(f" [{item['category']}] {item['path']}")
lines.append(
f" Needs prompt: {len(prompt)} files ({dg.fmt_size(prompt_size)})"
)
for item in prompt:
lines.append(f" [{item['category']}] {item['path']}")
lines.append(
f"\n Total potential: {dg.fmt_size(auto_size + prompt_size)}"
)
return "\n".join(lines)
if sub == "quick":
return _fmt_summary(dg.quick())
if sub == "deep":
# In-session deep can't prompt the user interactively — show what
# quick cleaned plus the items that WOULD need confirmation.
quick_summary = dg.quick()
_auto, prompt_items = dg.dry_run()
lines = [_fmt_summary(quick_summary)]
if prompt_items:
size = sum(i["size"] for i in prompt_items)
lines.append(
f"\n{len(prompt_items)} item(s) need confirmation "
f"({dg.fmt_size(size)}):"
)
for item in prompt_items:
lines.append(f" [{item['category']}] {item['path']}")
lines.append(
"\nRun `/disk-cleanup forget <path>` to skip, or delete "
"manually via terminal."
)
return "\n".join(lines)
if sub == "track":
if len(argv) < 3:
return "Usage: /disk-cleanup track <path> <category>"
path_arg = argv[1]
category = argv[2]
if category not in dg.ALLOWED_CATEGORIES:
return (
f"Unknown category '{category}'. "
f"Allowed: {sorted(dg.ALLOWED_CATEGORIES)}"
)
if dg.track(path_arg, category, silent=True):
return f"Tracked {path_arg} as '{category}'."
return (
f"Not tracked (already present, missing, or outside HERMES_HOME): "
f"{path_arg}"
)
if sub == "forget":
if len(argv) < 2:
return "Usage: /disk-cleanup forget <path>"
n = dg.forget(argv[1])
return (
f"Removed {n} tracking entr{'y' if n == 1 else 'ies'} for {argv[1]}."
if n else f"Not found in tracking: {argv[1]}"
)
return f"Unknown subcommand: {sub}\n\n{_HELP_TEXT}"
# ---------------------------------------------------------------------------
# Plugin registration
# ---------------------------------------------------------------------------
def register(ctx) -> None:
ctx.register_hook("post_tool_call", _on_post_tool_call)
ctx.register_hook("on_session_end", _on_session_end)
ctx.register_command(
"disk-cleanup",
handler=_handle_slash,
description="Track and clean up ephemeral Hermes session files.",
)
-496
View File
@@ -1,496 +0,0 @@
"""disk_cleanup — ephemeral file cleanup for Hermes Agent.
Library module wrapping the deterministic cleanup rules written by
@LVT382009 in PR #12212. The plugin ``__init__.py`` wires these
functions into ``post_tool_call`` and ``on_session_end`` hooks so
tracking and cleanup happen automatically the agent never needs to
call a tool or remember a skill.
Rules:
- test files delete immediately at task end (age >= 0)
- temp files delete after 7 days
- cron-output delete after 14 days
- empty dirs always delete (under HERMES_HOME)
- research keep 10 newest, prompt for older (deep only)
- chrome-profile prompt after 14 days (deep only)
- >500 MB files prompt always (deep only)
Scope: strictly HERMES_HOME and /tmp/hermes-*
Never touches: ~/.hermes/logs/ or any system directory.
"""
from __future__ import annotations
import json
import logging
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
try:
from hermes_constants import get_hermes_home
except Exception: # pragma: no cover — plugin may load before constants resolves
import os
def get_hermes_home() -> Path: # type: ignore[no-redef]
val = (os.environ.get("HERMES_HOME") or "").strip()
return Path(val).resolve() if val else (Path.home() / ".hermes").resolve()
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
def get_state_dir() -> Path:
"""State dir — separate from ``$HERMES_HOME/logs/``."""
return get_hermes_home() / "disk-cleanup"
def get_tracked_file() -> Path:
return get_state_dir() / "tracked.json"
def get_log_file() -> Path:
"""Audit log — intentionally NOT under ``$HERMES_HOME/logs/``."""
return get_state_dir() / "cleanup.log"
# ---------------------------------------------------------------------------
# Path safety
# ---------------------------------------------------------------------------
def is_safe_path(path: Path) -> bool:
"""Accept only paths under HERMES_HOME or ``/tmp/hermes-*``.
Rejects Windows mounts (``/mnt/c`` etc.) and any system directory.
"""
hermes_home = get_hermes_home()
try:
path.resolve().relative_to(hermes_home)
return True
except (ValueError, OSError):
pass
# Allow /tmp/hermes-* explicitly
parts = path.parts
if len(parts) >= 3 and parts[1] == "tmp" and parts[2].startswith("hermes-"):
return True
return False
# ---------------------------------------------------------------------------
# Audit log
# ---------------------------------------------------------------------------
def _log(message: str) -> None:
try:
log_file = get_log_file()
log_file.parent.mkdir(parents=True, exist_ok=True)
ts = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
with open(log_file, "a") as f:
f.write(f"[{ts}] {message}\n")
except OSError:
# Never let the audit log break the agent loop.
pass
# ---------------------------------------------------------------------------
# tracked.json — atomic read/write, backup scoped to tracked.json only
# ---------------------------------------------------------------------------
def load_tracked() -> List[Dict[str, Any]]:
"""Load tracked.json. Restores from ``.bak`` on corruption."""
tf = get_tracked_file()
tf.parent.mkdir(parents=True, exist_ok=True)
if not tf.exists():
return []
try:
return json.loads(tf.read_text())
except (json.JSONDecodeError, ValueError):
bak = tf.with_suffix(".json.bak")
if bak.exists():
try:
data = json.loads(bak.read_text())
_log("WARN: tracked.json corrupted — restored from .bak")
return data
except Exception:
pass
_log("WARN: tracked.json corrupted, no backup — starting fresh")
return []
def save_tracked(tracked: List[Dict[str, Any]]) -> None:
"""Atomic write: ``.tmp`` → backup old → rename."""
tf = get_tracked_file()
tf.parent.mkdir(parents=True, exist_ok=True)
tmp = tf.with_suffix(".json.tmp")
tmp.write_text(json.dumps(tracked, indent=2))
if tf.exists():
shutil.copy2(tf, tf.with_suffix(".json.bak"))
tmp.replace(tf)
# ---------------------------------------------------------------------------
# Categories
# ---------------------------------------------------------------------------
ALLOWED_CATEGORIES = {
"temp", "test", "research", "download",
"chrome-profile", "cron-output", "other",
}
def fmt_size(n: float) -> str:
for unit in ("B", "KB", "MB", "GB", "TB"):
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
# ---------------------------------------------------------------------------
# Track / forget
# ---------------------------------------------------------------------------
def track(path_str: str, category: str, silent: bool = False) -> bool:
"""Register a file for tracking. Returns True if newly tracked."""
if category not in ALLOWED_CATEGORIES:
_log(f"WARN: unknown category '{category}', using 'other'")
category = "other"
path = Path(path_str).resolve()
if not path.exists():
_log(f"SKIP: {path} (does not exist)")
return False
if not is_safe_path(path):
_log(f"REJECT: {path} (outside HERMES_HOME)")
return False
size = path.stat().st_size if path.is_file() else 0
tracked = load_tracked()
# Deduplicate
if any(item["path"] == str(path) for item in tracked):
return False
tracked.append({
"path": str(path),
"timestamp": datetime.now(timezone.utc).isoformat(),
"category": category,
"size": size,
})
save_tracked(tracked)
_log(f"TRACKED: {path} ({category}, {fmt_size(size)})")
if not silent:
print(f"Tracked: {path} ({category}, {fmt_size(size)})")
return True
def forget(path_str: str) -> int:
"""Remove a path from tracking without deleting the file."""
p = Path(path_str).resolve()
tracked = load_tracked()
before = len(tracked)
tracked = [i for i in tracked if Path(i["path"]).resolve() != p]
removed = before - len(tracked)
if removed:
save_tracked(tracked)
_log(f"FORGOT: {p} ({removed} entries)")
return removed
# ---------------------------------------------------------------------------
# Dry run
# ---------------------------------------------------------------------------
def dry_run() -> Tuple[List[Dict], List[Dict]]:
"""Return (auto_delete_list, needs_prompt_list) without touching files."""
tracked = load_tracked()
now = datetime.now(timezone.utc)
auto: List[Dict] = []
prompt: List[Dict] = []
for item in tracked:
p = Path(item["path"])
if not p.exists():
continue
age = (now - datetime.fromisoformat(item["timestamp"])).days
cat = item["category"]
size = item["size"]
if cat == "test":
auto.append(item)
elif cat == "temp" and age > 7:
auto.append(item)
elif cat == "cron-output" and age > 14:
auto.append(item)
elif cat == "research" and age > 30:
prompt.append(item)
elif cat == "chrome-profile" and age > 14:
prompt.append(item)
elif size > 500 * 1024 * 1024:
prompt.append(item)
return auto, prompt
# ---------------------------------------------------------------------------
# Quick cleanup
# ---------------------------------------------------------------------------
def quick() -> Dict[str, Any]:
"""Safe deterministic cleanup — no prompts.
Returns: ``{"deleted": N, "empty_dirs": N, "freed": bytes,
"errors": [str, ...]}``.
"""
tracked = load_tracked()
now = datetime.now(timezone.utc)
deleted = 0
freed = 0
new_tracked: List[Dict] = []
errors: List[str] = []
for item in tracked:
p = Path(item["path"])
cat = item["category"]
if not p.exists():
_log(f"STALE: {p} (removed from tracking)")
continue
age = (now - datetime.fromisoformat(item["timestamp"])).days
should_delete = (
cat == "test"
or (cat == "temp" and age > 7)
or (cat == "cron-output" and age > 14)
)
if should_delete:
try:
if p.is_file():
p.unlink()
elif p.is_dir():
shutil.rmtree(p)
freed += item["size"]
deleted += 1
_log(f"DELETED: {p} ({cat}, {fmt_size(item['size'])})")
except OSError as e:
_log(f"ERROR deleting {p}: {e}")
errors.append(f"{p}: {e}")
new_tracked.append(item)
else:
new_tracked.append(item)
# Remove empty dirs under HERMES_HOME (but leave HERMES_HOME itself and
# a short list of well-known top-level state dirs alone — a fresh install
# has these empty, and deleting them would surprise the user).
hermes_home = get_hermes_home()
_PROTECTED_TOP_LEVEL = {
"logs", "memories", "sessions", "cron", "cronjobs",
"cache", "skills", "plugins", "disk-cleanup", "optional-skills",
"hermes-agent", "backups", "profiles", ".worktrees",
}
empty_removed = 0
try:
for dirpath in sorted(hermes_home.rglob("*"), reverse=True):
if not dirpath.is_dir() or dirpath == hermes_home:
continue
try:
rel_parts = dirpath.relative_to(hermes_home).parts
except ValueError:
continue
# Skip the well-known top-level state dirs themselves.
if len(rel_parts) == 1 and rel_parts[0] in _PROTECTED_TOP_LEVEL:
continue
try:
if not any(dirpath.iterdir()):
dirpath.rmdir()
empty_removed += 1
_log(f"DELETED: {dirpath} (empty dir)")
except OSError:
pass
except OSError:
pass
save_tracked(new_tracked)
_log(
f"QUICK_SUMMARY: {deleted} files, {empty_removed} dirs, "
f"{fmt_size(freed)}"
)
return {
"deleted": deleted,
"empty_dirs": empty_removed,
"freed": freed,
"errors": errors,
}
# ---------------------------------------------------------------------------
# Deep cleanup (interactive — not called from plugin hooks)
# ---------------------------------------------------------------------------
def deep(
confirm: Optional[callable] = None,
) -> Dict[str, Any]:
"""Deep cleanup.
Runs :func:`quick` first, then asks the *confirm* callable for each
risky item (research > 30d beyond 10 newest, chrome-profile > 14d,
any file > 500 MB). *confirm(item)* must return True to delete.
Returns: ``{"quick": {...}, "deep_deleted": N, "deep_freed": bytes}``.
"""
quick_result = quick()
if confirm is None:
# No interactive confirmer — deep stops after the quick pass.
return {"quick": quick_result, "deep_deleted": 0, "deep_freed": 0}
tracked = load_tracked()
now = datetime.now(timezone.utc)
research, chrome, large = [], [], []
for item in tracked:
p = Path(item["path"])
if not p.exists():
continue
age = (now - datetime.fromisoformat(item["timestamp"])).days
cat = item["category"]
if cat == "research" and age > 30:
research.append(item)
elif cat == "chrome-profile" and age > 14:
chrome.append(item)
elif item["size"] > 500 * 1024 * 1024:
large.append(item)
research.sort(key=lambda x: x["timestamp"], reverse=True)
old_research = research[10:]
freed, count = 0, 0
to_remove: List[Dict] = []
for group in (old_research, chrome, large):
for item in group:
if confirm(item):
try:
p = Path(item["path"])
if p.is_file():
p.unlink()
elif p.is_dir():
shutil.rmtree(p)
to_remove.append(item)
freed += item["size"]
count += 1
_log(
f"DELETED: {p} ({item['category']}, "
f"{fmt_size(item['size'])})"
)
except OSError as e:
_log(f"ERROR deleting {item['path']}: {e}")
if to_remove:
remove_paths = {i["path"] for i in to_remove}
save_tracked([i for i in tracked if i["path"] not in remove_paths])
return {"quick": quick_result, "deep_deleted": count, "deep_freed": freed}
# ---------------------------------------------------------------------------
# Status
# ---------------------------------------------------------------------------
def status() -> Dict[str, Any]:
"""Return per-category breakdown and top 10 largest tracked files."""
tracked = load_tracked()
cats: Dict[str, Dict] = {}
for item in tracked:
c = item["category"]
cats.setdefault(c, {"count": 0, "size": 0})
cats[c]["count"] += 1
cats[c]["size"] += item["size"]
existing = [
(i["path"], i["size"], i["category"])
for i in tracked if Path(i["path"]).exists()
]
existing.sort(key=lambda x: x[1], reverse=True)
return {
"categories": cats,
"top10": existing[:10],
"total_tracked": len(tracked),
}
def format_status(s: Dict[str, Any]) -> str:
"""Human-readable status string (for slash command output)."""
lines = [f"{'Category':<20} {'Files':>6} {'Size':>10}", "-" * 40]
cats = s["categories"]
for cat, d in sorted(cats.items(), key=lambda x: x[1]["size"], reverse=True):
lines.append(f"{cat:<20} {d['count']:>6} {fmt_size(d['size']):>10}")
if not cats:
lines.append("(nothing tracked yet)")
lines.append("")
lines.append("Top 10 largest tracked files:")
if not s["top10"]:
lines.append(" (none)")
else:
for rank, (path, size, cat) in enumerate(s["top10"], 1):
lines.append(f" {rank:>2}. {fmt_size(size):>8} [{cat}] {path}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Auto-categorisation from tool-call inspection
# ---------------------------------------------------------------------------
_TEST_PATTERNS = ("test_", "tmp_")
_TEST_SUFFIXES = (".test.py", ".test.js", ".test.ts", ".test.md")
def guess_category(path: Path) -> Optional[str]:
"""Return a category label for *path*, or None if we shouldn't track it.
Used by the ``post_tool_call`` hook to auto-track ephemeral files.
"""
if not is_safe_path(path):
return None
# Skip the state dir itself, logs, memory files, sessions, config.
hermes_home = get_hermes_home()
try:
rel = path.resolve().relative_to(hermes_home)
top = rel.parts[0] if rel.parts else ""
if top in {
"disk-cleanup", "logs", "memories", "sessions", "config.yaml",
"skills", "plugins", ".env", "USER.md", "MEMORY.md", "SOUL.md",
"auth.json", "hermes-agent",
}:
return None
if top == "cron" or top == "cronjobs":
return "cron-output"
if top == "cache":
return "temp"
except ValueError:
# Path isn't under HERMES_HOME (e.g. /tmp/hermes-*) — fall through.
pass
name = path.name
if name.startswith(_TEST_PATTERNS):
return "test"
if any(name.endswith(sfx) for sfx in _TEST_SUFFIXES):
return "test"
return None
-7
View File
@@ -1,7 +0,0 @@
name: disk-cleanup
version: 2.0.0
description: "Auto-track and clean up ephemeral files (test scripts, temp outputs, cron logs) created during Hermes sessions. Runs via plugin hooks — no agent action required."
author: "@LVT382009 (original), NousResearch (plugin port)"
hooks:
- post_tool_call
- on_session_end
+33 -821
View File
@@ -100,6 +100,18 @@ from agent.subdirectory_hints import SubdirectoryHintTracker
from agent.prompt_caching import apply_anthropic_cache_control
from agent.prompt_builder import build_skills_system_prompt, build_context_files_prompt, build_environment_hints, load_soul_md, TOOL_USE_ENFORCEMENT_GUIDANCE, TOOL_USE_ENFORCEMENT_MODELS, DEVELOPER_ROLE_MODELS, GOOGLE_MODEL_OPERATIONAL_GUIDANCE, OPENAI_MODEL_EXECUTION_GUIDANCE
from agent.usage_pricing import estimate_usage_cost, normalize_usage
from agent.codex_responses_adapter import (
_chat_messages_to_responses_input as _codex_chat_messages_to_responses_input,
_derive_responses_function_call_id as _codex_derive_responses_function_call_id,
_deterministic_call_id as _codex_deterministic_call_id,
_extract_responses_message_text as _codex_extract_responses_message_text,
_extract_responses_reasoning_text as _codex_extract_responses_reasoning_text,
_normalize_codex_response as _codex_normalize_codex_response,
_preflight_codex_api_kwargs as _codex_preflight_codex_api_kwargs,
_preflight_codex_input_items as _codex_preflight_codex_input_items,
_responses_tools as _codex_responses_tools,
_split_responses_tool_id as _codex_split_responses_tool_id,
)
from agent.display import (
KawaiiSpinner, build_tool_preview as _build_tool_preview,
get_cute_tool_message as _get_cute_tool_message_impl,
@@ -371,89 +383,6 @@ def _sanitize_surrogates(text: str) -> str:
return text
def _chat_content_to_responses_parts(content: Any) -> List[Dict[str, Any]]:
"""Convert chat-style multimodal content to Responses API input parts.
Input: ``[{"type":"text"|"image_url", ...}]`` (native OpenAI Chat format)
Output: ``[{"type":"input_text"|"input_image", ...}]`` (Responses format)
Returns an empty list when ``content`` is not a list or contains no
recognized parts callers fall back to the string path.
"""
if not isinstance(content, list):
return []
converted: List[Dict[str, Any]] = []
for part in content:
if isinstance(part, str):
if part:
converted.append({"type": "input_text", "text": part})
continue
if not isinstance(part, dict):
continue
ptype = str(part.get("type") or "").strip().lower()
if ptype in {"text", "input_text", "output_text"}:
text = part.get("text")
if isinstance(text, str) and text:
converted.append({"type": "input_text", "text": text})
continue
if ptype in {"image_url", "input_image"}:
image_ref = part.get("image_url")
detail = part.get("detail")
if isinstance(image_ref, dict):
url = image_ref.get("url")
detail = image_ref.get("detail", detail)
else:
url = image_ref
if not isinstance(url, str) or not url:
continue
image_part: Dict[str, Any] = {"type": "input_image", "image_url": url}
if isinstance(detail, str) and detail.strip():
image_part["detail"] = detail.strip()
converted.append(image_part)
return converted
def _summarize_user_message_for_log(content: Any) -> str:
"""Return a short text summary of a user message for logging/trajectory.
Multimodal messages arrive as a list of ``{type:"text"|"image_url", ...}``
parts from the API server. Logging, spinner previews, and trajectory
files all want a plain string this helper extracts the first chunk of
text and notes any attached images. Returns an empty string for empty
lists and ``str(content)`` for unexpected scalar types.
"""
if content is None:
return ""
if isinstance(content, str):
return content
if isinstance(content, list):
text_bits: List[str] = []
image_count = 0
for part in content:
if isinstance(part, str):
if part:
text_bits.append(part)
continue
if not isinstance(part, dict):
continue
ptype = str(part.get("type") or "").strip().lower()
if ptype in {"text", "input_text", "output_text"}:
text = part.get("text")
if isinstance(text, str) and text:
text_bits.append(text)
elif ptype in {"image_url", "input_image"}:
image_count += 1
summary = " ".join(text_bits).strip()
if image_count:
note = f"[{image_count} image{'s' if image_count != 1 else ''}]"
summary = f"{note} {summary}" if summary else note
return summary
try:
return str(content)
except Exception:
return ""
def _sanitize_structure_surrogates(payload: Any) -> bool:
"""Replace surrogate code points in nested dict/list payloads in-place.
@@ -555,71 +484,6 @@ def _sanitize_messages_surrogates(messages: list) -> bool:
return found
def _repair_tool_call_arguments(raw_args: str, tool_name: str = "?") -> str:
"""Attempt to repair malformed tool_call argument JSON.
Models like GLM-5.1 via Ollama can produce truncated JSON, trailing
commas, Python ``None``, etc. The API proxy rejects these with HTTP 400
"invalid tool call arguments". This function applies common repairs;
if all fail it returns ``"{}"`` so the request succeeds (better than
crashing the session). All repairs are logged at WARNING level.
"""
raw_stripped = raw_args.strip() if isinstance(raw_args, str) else ""
# Fast-path: empty / whitespace-only -> empty object
if not raw_stripped:
logger.warning("Sanitized empty tool_call arguments for %s", tool_name)
return "{}"
# Python-literal None -> normalise to {}
if raw_stripped == "None":
logger.warning("Sanitized Python-None tool_call arguments for %s", tool_name)
return "{}"
# Attempt common JSON repairs
fixed = raw_stripped
# 1. Strip trailing commas before } or ]
fixed = re.sub(r',\s*([}\]])', r'\1', fixed)
# 2. Close unclosed structures
open_curly = fixed.count('{') - fixed.count('}')
open_bracket = fixed.count('[') - fixed.count(']')
if open_curly > 0:
fixed += '}' * open_curly
if open_bracket > 0:
fixed += ']' * open_bracket
# 3. Remove excess closing braces/brackets (bounded to 50 iterations)
for _ in range(50):
try:
json.loads(fixed)
break
except json.JSONDecodeError:
if fixed.endswith('}') and fixed.count('}') > fixed.count('{'):
fixed = fixed[:-1]
elif fixed.endswith(']') and fixed.count(']') > fixed.count('['):
fixed = fixed[:-1]
else:
break
try:
json.loads(fixed)
logger.warning(
"Repaired malformed tool_call arguments for %s: %s%s",
tool_name, raw_stripped[:80], fixed[:80],
)
return fixed
except json.JSONDecodeError:
pass
# Last resort: replace with empty object so the API request doesn't
# crash the entire session.
logger.warning(
"Unrepairable tool_call arguments for %s"
"replaced with empty object (was: %s)",
tool_name, raw_stripped[:80],
)
return "{}"
def _strip_non_ascii(text: str) -> str:
"""Remove non-ASCII characters, replacing with closest ASCII equivalent or removing.
@@ -4330,24 +4194,7 @@ class AIAgent:
def _responses_tools(self, tools: Optional[List[Dict[str, Any]]] = None) -> Optional[List[Dict[str, Any]]]:
"""Convert chat-completions tool schemas to Responses function-tool schemas."""
source_tools = tools if tools is not None else self.tools
if not source_tools:
return None
converted: List[Dict[str, Any]] = []
for item in source_tools:
fn = item.get("function", {}) if isinstance(item, dict) else {}
name = fn.get("name")
if not isinstance(name, str) or not name.strip():
continue
converted.append({
"type": "function",
"name": name,
"description": fn.get("description", ""),
"strict": False,
"parameters": fn.get("parameters", {"type": "object", "properties": {}}),
})
return converted or None
return _codex_responses_tools(tools if tools is not None else self.tools)
@staticmethod
def _deterministic_call_id(fn_name: str, arguments: str, index: int = 0) -> str:
@@ -4357,27 +4204,12 @@ class AIAgent:
Deterministic IDs prevent cache invalidation random UUIDs would
make every API call's prefix unique, breaking OpenAI's prompt cache.
"""
import hashlib
seed = f"{fn_name}:{arguments}:{index}"
digest = hashlib.sha256(seed.encode("utf-8", errors="replace")).hexdigest()[:12]
return f"call_{digest}"
return _codex_deterministic_call_id(fn_name, arguments, index)
@staticmethod
def _split_responses_tool_id(raw_id: Any) -> tuple[Optional[str], Optional[str]]:
"""Split a stored tool id into (call_id, response_item_id)."""
if not isinstance(raw_id, str):
return None, None
value = raw_id.strip()
if not value:
return None, None
if "|" in value:
call_id, response_item_id = value.split("|", 1)
call_id = call_id.strip() or None
response_item_id = response_item_id.strip() or None
return call_id, response_item_id
if value.startswith("fc_"):
return None, value
return value, None
return _codex_split_responses_tool_id(raw_id)
def _derive_responses_function_call_id(
self,
@@ -4385,284 +4217,14 @@ class AIAgent:
response_item_id: Optional[str] = None,
) -> str:
"""Build a valid Responses `function_call.id` (must start with `fc_`)."""
if isinstance(response_item_id, str):
candidate = response_item_id.strip()
if candidate.startswith("fc_"):
return candidate
source = (call_id or "").strip()
if source.startswith("fc_"):
return source
if source.startswith("call_") and len(source) > len("call_"):
return f"fc_{source[len('call_'):]}"
sanitized = re.sub(r"[^A-Za-z0-9_-]", "", source)
if sanitized.startswith("fc_"):
return sanitized
if sanitized.startswith("call_") and len(sanitized) > len("call_"):
return f"fc_{sanitized[len('call_'):]}"
if sanitized:
return f"fc_{sanitized[:48]}"
seed = source or str(response_item_id or "") or uuid.uuid4().hex
digest = hashlib.sha1(seed.encode("utf-8")).hexdigest()[:24]
return f"fc_{digest}"
return _codex_derive_responses_function_call_id(call_id, response_item_id)
def _chat_messages_to_responses_input(self, messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Convert internal chat-style messages to Responses input items."""
items: List[Dict[str, Any]] = []
seen_item_ids: set = set()
for msg in messages:
if not isinstance(msg, dict):
continue
role = msg.get("role")
if role == "system":
continue
if role in {"user", "assistant"}:
content = msg.get("content", "")
if isinstance(content, list):
content_parts = _chat_content_to_responses_parts(content)
content_text = "".join(
p.get("text", "") for p in content_parts if p.get("type") == "input_text"
)
else:
content_parts = []
content_text = str(content) if content is not None else ""
if role == "assistant":
# Replay encrypted reasoning items from previous turns
# so the API can maintain coherent reasoning chains.
codex_reasoning = msg.get("codex_reasoning_items")
has_codex_reasoning = False
if isinstance(codex_reasoning, list):
for ri in codex_reasoning:
if isinstance(ri, dict) and ri.get("encrypted_content"):
item_id = ri.get("id")
if item_id and item_id in seen_item_ids:
continue
# Strip the "id" field — with store=False the
# Responses API cannot look up items by ID and
# returns 404. The encrypted_content blob is
# self-contained for reasoning chain continuity.
replay_item = {k: v for k, v in ri.items() if k != "id"}
items.append(replay_item)
if item_id:
seen_item_ids.add(item_id)
has_codex_reasoning = True
if content_parts:
items.append({"role": "assistant", "content": content_parts})
elif content_text.strip():
items.append({"role": "assistant", "content": content_text})
elif has_codex_reasoning:
# The Responses API requires a following item after each
# reasoning item (otherwise: missing_following_item error).
# When the assistant produced only reasoning with no visible
# content, emit an empty assistant message as the required
# following item.
items.append({"role": "assistant", "content": ""})
tool_calls = msg.get("tool_calls")
if isinstance(tool_calls, list):
for tc in tool_calls:
if not isinstance(tc, dict):
continue
fn = tc.get("function", {})
fn_name = fn.get("name")
if not isinstance(fn_name, str) or not fn_name.strip():
continue
embedded_call_id, embedded_response_item_id = self._split_responses_tool_id(
tc.get("id")
)
call_id = tc.get("call_id")
if not isinstance(call_id, str) or not call_id.strip():
call_id = embedded_call_id
if not isinstance(call_id, str) or not call_id.strip():
if (
isinstance(embedded_response_item_id, str)
and embedded_response_item_id.startswith("fc_")
and len(embedded_response_item_id) > len("fc_")
):
call_id = f"call_{embedded_response_item_id[len('fc_'):]}"
else:
_raw_args = str(fn.get("arguments", "{}"))
call_id = self._deterministic_call_id(fn_name, _raw_args, len(items))
call_id = call_id.strip()
arguments = fn.get("arguments", "{}")
if isinstance(arguments, dict):
arguments = json.dumps(arguments, ensure_ascii=False)
elif not isinstance(arguments, str):
arguments = str(arguments)
arguments = arguments.strip() or "{}"
items.append({
"type": "function_call",
"call_id": call_id,
"name": fn_name,
"arguments": arguments,
})
continue
# Non-assistant (user) role: emit multimodal parts when present,
# otherwise fall back to the text payload.
if content_parts:
items.append({"role": role, "content": content_parts})
else:
items.append({"role": role, "content": content_text})
continue
if role == "tool":
raw_tool_call_id = msg.get("tool_call_id")
call_id, _ = self._split_responses_tool_id(raw_tool_call_id)
if not isinstance(call_id, str) or not call_id.strip():
if isinstance(raw_tool_call_id, str) and raw_tool_call_id.strip():
call_id = raw_tool_call_id.strip()
if not isinstance(call_id, str) or not call_id.strip():
continue
items.append({
"type": "function_call_output",
"call_id": call_id,
"output": str(msg.get("content", "") or ""),
})
return items
return _codex_chat_messages_to_responses_input(messages)
def _preflight_codex_input_items(self, raw_items: Any) -> List[Dict[str, Any]]:
if not isinstance(raw_items, list):
raise ValueError("Codex Responses input must be a list of input items.")
normalized: List[Dict[str, Any]] = []
seen_ids: set = set()
for idx, item in enumerate(raw_items):
if not isinstance(item, dict):
raise ValueError(f"Codex Responses input[{idx}] must be an object.")
item_type = item.get("type")
if item_type == "function_call":
call_id = item.get("call_id")
name = item.get("name")
if not isinstance(call_id, str) or not call_id.strip():
raise ValueError(f"Codex Responses input[{idx}] function_call is missing call_id.")
if not isinstance(name, str) or not name.strip():
raise ValueError(f"Codex Responses input[{idx}] function_call is missing name.")
arguments = item.get("arguments", "{}")
if isinstance(arguments, dict):
arguments = json.dumps(arguments, ensure_ascii=False)
elif not isinstance(arguments, str):
arguments = str(arguments)
arguments = arguments.strip() or "{}"
normalized.append(
{
"type": "function_call",
"call_id": call_id.strip(),
"name": name.strip(),
"arguments": arguments,
}
)
continue
if item_type == "function_call_output":
call_id = item.get("call_id")
if not isinstance(call_id, str) or not call_id.strip():
raise ValueError(f"Codex Responses input[{idx}] function_call_output is missing call_id.")
output = item.get("output", "")
if output is None:
output = ""
if not isinstance(output, str):
output = str(output)
normalized.append(
{
"type": "function_call_output",
"call_id": call_id.strip(),
"output": output,
}
)
continue
if item_type == "reasoning":
encrypted = item.get("encrypted_content")
if isinstance(encrypted, str) and encrypted:
item_id = item.get("id")
if isinstance(item_id, str) and item_id:
if item_id in seen_ids:
continue
seen_ids.add(item_id)
reasoning_item = {"type": "reasoning", "encrypted_content": encrypted}
# Do NOT include the "id" in the outgoing item — with
# store=False (our default) the API tries to resolve the
# id server-side and returns 404. The id is still used
# above for local deduplication via seen_ids.
summary = item.get("summary")
if isinstance(summary, list):
reasoning_item["summary"] = summary
else:
reasoning_item["summary"] = []
normalized.append(reasoning_item)
continue
role = item.get("role")
if role in {"user", "assistant"}:
content = item.get("content", "")
if content is None:
content = ""
if isinstance(content, list):
# Multimodal content from ``_chat_messages_to_responses_input``
# is already in Responses format (``input_text`` / ``input_image``).
# Validate each part and pass through.
validated: List[Dict[str, Any]] = []
for part_idx, part in enumerate(content):
if isinstance(part, str):
if part:
validated.append({"type": "input_text", "text": part})
continue
if not isinstance(part, dict):
raise ValueError(
f"Codex Responses input[{idx}].content[{part_idx}] must be an object or string."
)
ptype = str(part.get("type") or "").strip().lower()
if ptype in {"input_text", "text", "output_text"}:
text = part.get("text", "")
if not isinstance(text, str):
text = str(text or "")
validated.append({"type": "input_text", "text": text})
elif ptype in {"input_image", "image_url"}:
image_ref = part.get("image_url", "")
detail = part.get("detail")
if isinstance(image_ref, dict):
url = image_ref.get("url", "")
detail = image_ref.get("detail", detail)
else:
url = image_ref
if not isinstance(url, str):
url = str(url or "")
image_part: Dict[str, Any] = {"type": "input_image", "image_url": url}
if isinstance(detail, str) and detail.strip():
image_part["detail"] = detail.strip()
validated.append(image_part)
else:
raise ValueError(
f"Codex Responses input[{idx}].content[{part_idx}] has unsupported type {part.get('type')!r}."
)
normalized.append({"role": role, "content": validated})
continue
if not isinstance(content, str):
content = str(content)
normalized.append({"role": role, "content": content})
continue
raise ValueError(
f"Codex Responses input[{idx}] has unsupported item shape (type={item_type!r}, role={role!r})."
)
return normalized
return _codex_preflight_codex_input_items(raw_items)
def _preflight_codex_api_kwargs(
self,
@@ -4670,338 +4232,19 @@ class AIAgent:
*,
allow_stream: bool = False,
) -> Dict[str, Any]:
if not isinstance(api_kwargs, dict):
raise ValueError("Codex Responses request must be a dict.")
required = {"model", "instructions", "input"}
missing = [key for key in required if key not in api_kwargs]
if missing:
raise ValueError(f"Codex Responses request missing required field(s): {', '.join(sorted(missing))}.")
model = api_kwargs.get("model")
if not isinstance(model, str) or not model.strip():
raise ValueError("Codex Responses request 'model' must be a non-empty string.")
model = model.strip()
instructions = api_kwargs.get("instructions")
if instructions is None:
instructions = ""
if not isinstance(instructions, str):
instructions = str(instructions)
instructions = instructions.strip() or DEFAULT_AGENT_IDENTITY
normalized_input = self._preflight_codex_input_items(api_kwargs.get("input"))
tools = api_kwargs.get("tools")
normalized_tools = None
if tools is not None:
if not isinstance(tools, list):
raise ValueError("Codex Responses request 'tools' must be a list when provided.")
normalized_tools = []
for idx, tool in enumerate(tools):
if not isinstance(tool, dict):
raise ValueError(f"Codex Responses tools[{idx}] must be an object.")
if tool.get("type") != "function":
raise ValueError(f"Codex Responses tools[{idx}] has unsupported type {tool.get('type')!r}.")
name = tool.get("name")
parameters = tool.get("parameters")
if not isinstance(name, str) or not name.strip():
raise ValueError(f"Codex Responses tools[{idx}] is missing a valid name.")
if not isinstance(parameters, dict):
raise ValueError(f"Codex Responses tools[{idx}] is missing valid parameters.")
description = tool.get("description", "")
if description is None:
description = ""
if not isinstance(description, str):
description = str(description)
strict = tool.get("strict", False)
if not isinstance(strict, bool):
strict = bool(strict)
normalized_tools.append(
{
"type": "function",
"name": name.strip(),
"description": description,
"strict": strict,
"parameters": parameters,
}
)
store = api_kwargs.get("store", False)
if store is not False:
raise ValueError("Codex Responses contract requires 'store' to be false.")
allowed_keys = {
"model", "instructions", "input", "tools", "store",
"reasoning", "include", "max_output_tokens", "temperature",
"tool_choice", "parallel_tool_calls", "prompt_cache_key", "service_tier",
"extra_headers",
}
normalized: Dict[str, Any] = {
"model": model,
"instructions": instructions,
"input": normalized_input,
"store": False,
}
if normalized_tools is not None:
normalized["tools"] = normalized_tools
# Pass through reasoning config
reasoning = api_kwargs.get("reasoning")
if isinstance(reasoning, dict):
normalized["reasoning"] = reasoning
include = api_kwargs.get("include")
if isinstance(include, list):
normalized["include"] = include
service_tier = api_kwargs.get("service_tier")
if isinstance(service_tier, str) and service_tier.strip():
normalized["service_tier"] = service_tier.strip()
# Pass through max_output_tokens and temperature
max_output_tokens = api_kwargs.get("max_output_tokens")
if isinstance(max_output_tokens, (int, float)) and max_output_tokens > 0:
normalized["max_output_tokens"] = int(max_output_tokens)
temperature = api_kwargs.get("temperature")
if isinstance(temperature, (int, float)):
normalized["temperature"] = float(temperature)
# Pass through tool_choice, parallel_tool_calls, prompt_cache_key
for passthrough_key in ("tool_choice", "parallel_tool_calls", "prompt_cache_key"):
val = api_kwargs.get(passthrough_key)
if val is not None:
normalized[passthrough_key] = val
extra_headers = api_kwargs.get("extra_headers")
if extra_headers is not None:
if not isinstance(extra_headers, dict):
raise ValueError("Codex Responses request 'extra_headers' must be an object.")
normalized_headers: Dict[str, str] = {}
for key, value in extra_headers.items():
if not isinstance(key, str) or not key.strip():
raise ValueError("Codex Responses request 'extra_headers' keys must be non-empty strings.")
if value is None:
continue
normalized_headers[key.strip()] = str(value)
if normalized_headers:
normalized["extra_headers"] = normalized_headers
if allow_stream:
stream = api_kwargs.get("stream")
if stream is not None and stream is not True:
raise ValueError("Codex Responses 'stream' must be true when set.")
if stream is True:
normalized["stream"] = True
allowed_keys.add("stream")
elif "stream" in api_kwargs:
raise ValueError("Codex Responses stream flag is only allowed in fallback streaming requests.")
unexpected = sorted(key for key in api_kwargs if key not in allowed_keys)
if unexpected:
raise ValueError(
f"Codex Responses request has unsupported field(s): {', '.join(unexpected)}."
)
return normalized
return _codex_preflight_codex_api_kwargs(api_kwargs, allow_stream=allow_stream)
def _extract_responses_message_text(self, item: Any) -> str:
"""Extract assistant text from a Responses message output item."""
content = getattr(item, "content", None)
if not isinstance(content, list):
return ""
chunks: List[str] = []
for part in content:
ptype = getattr(part, "type", None)
if ptype not in {"output_text", "text"}:
continue
text = getattr(part, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
return "".join(chunks).strip()
return _codex_extract_responses_message_text(item)
def _extract_responses_reasoning_text(self, item: Any) -> str:
"""Extract a compact reasoning text from a Responses reasoning item."""
summary = getattr(item, "summary", None)
if isinstance(summary, list):
chunks: List[str] = []
for part in summary:
text = getattr(part, "text", None)
if isinstance(text, str) and text:
chunks.append(text)
if chunks:
return "\n".join(chunks).strip()
text = getattr(item, "text", None)
if isinstance(text, str) and text:
return text.strip()
return ""
return _codex_extract_responses_reasoning_text(item)
def _normalize_codex_response(self, response: Any) -> tuple[Any, str]:
"""Normalize a Responses API object to an assistant_message-like object."""
output = getattr(response, "output", None)
if not isinstance(output, list) or not output:
# The Codex backend can return empty output when the answer was
# delivered entirely via stream events. Check output_text as a
# last-resort fallback before raising.
out_text = getattr(response, "output_text", None)
if isinstance(out_text, str) and out_text.strip():
logger.debug(
"Codex response has empty output but output_text is present (%d chars); "
"synthesizing output item.", len(out_text.strip()),
)
output = [SimpleNamespace(
type="message", role="assistant", status="completed",
content=[SimpleNamespace(type="output_text", text=out_text.strip())],
)]
response.output = output
else:
raise RuntimeError("Responses API returned no output items")
response_status = getattr(response, "status", None)
if isinstance(response_status, str):
response_status = response_status.strip().lower()
else:
response_status = None
if response_status in {"failed", "cancelled"}:
error_obj = getattr(response, "error", None)
if isinstance(error_obj, dict):
error_msg = error_obj.get("message") or str(error_obj)
else:
error_msg = str(error_obj) if error_obj else f"Responses API returned status '{response_status}'"
raise RuntimeError(error_msg)
content_parts: List[str] = []
reasoning_parts: List[str] = []
reasoning_items_raw: List[Dict[str, Any]] = []
tool_calls: List[Any] = []
has_incomplete_items = response_status in {"queued", "in_progress", "incomplete"}
saw_commentary_phase = False
saw_final_answer_phase = False
for item in output:
item_type = getattr(item, "type", None)
item_status = getattr(item, "status", None)
if isinstance(item_status, str):
item_status = item_status.strip().lower()
else:
item_status = None
if item_status in {"queued", "in_progress", "incomplete"}:
has_incomplete_items = True
if item_type == "message":
item_phase = getattr(item, "phase", None)
if isinstance(item_phase, str):
normalized_phase = item_phase.strip().lower()
if normalized_phase in {"commentary", "analysis"}:
saw_commentary_phase = True
elif normalized_phase in {"final_answer", "final"}:
saw_final_answer_phase = True
message_text = self._extract_responses_message_text(item)
if message_text:
content_parts.append(message_text)
elif item_type == "reasoning":
reasoning_text = self._extract_responses_reasoning_text(item)
if reasoning_text:
reasoning_parts.append(reasoning_text)
# Capture the full reasoning item for multi-turn continuity.
# encrypted_content is an opaque blob the API needs back on
# subsequent turns to maintain coherent reasoning chains.
encrypted = getattr(item, "encrypted_content", None)
if isinstance(encrypted, str) and encrypted:
raw_item = {"type": "reasoning", "encrypted_content": encrypted}
item_id = getattr(item, "id", None)
if isinstance(item_id, str) and item_id:
raw_item["id"] = item_id
# Capture summary — required by the API when replaying reasoning items
summary = getattr(item, "summary", None)
if isinstance(summary, list):
raw_summary = []
for part in summary:
text = getattr(part, "text", None)
if isinstance(text, str):
raw_summary.append({"type": "summary_text", "text": text})
raw_item["summary"] = raw_summary
reasoning_items_raw.append(raw_item)
elif item_type == "function_call":
if item_status in {"queued", "in_progress", "incomplete"}:
continue
fn_name = getattr(item, "name", "") or ""
arguments = getattr(item, "arguments", "{}")
if not isinstance(arguments, str):
arguments = json.dumps(arguments, ensure_ascii=False)
raw_call_id = getattr(item, "call_id", None)
raw_item_id = getattr(item, "id", None)
embedded_call_id, _ = self._split_responses_tool_id(raw_item_id)
call_id = raw_call_id if isinstance(raw_call_id, str) and raw_call_id.strip() else embedded_call_id
if not isinstance(call_id, str) or not call_id.strip():
call_id = self._deterministic_call_id(fn_name, arguments, len(tool_calls))
call_id = call_id.strip()
response_item_id = raw_item_id if isinstance(raw_item_id, str) else None
response_item_id = self._derive_responses_function_call_id(call_id, response_item_id)
tool_calls.append(SimpleNamespace(
id=call_id,
call_id=call_id,
response_item_id=response_item_id,
type="function",
function=SimpleNamespace(name=fn_name, arguments=arguments),
))
elif item_type == "custom_tool_call":
fn_name = getattr(item, "name", "") or ""
arguments = getattr(item, "input", "{}")
if not isinstance(arguments, str):
arguments = json.dumps(arguments, ensure_ascii=False)
raw_call_id = getattr(item, "call_id", None)
raw_item_id = getattr(item, "id", None)
embedded_call_id, _ = self._split_responses_tool_id(raw_item_id)
call_id = raw_call_id if isinstance(raw_call_id, str) and raw_call_id.strip() else embedded_call_id
if not isinstance(call_id, str) or not call_id.strip():
call_id = self._deterministic_call_id(fn_name, arguments, len(tool_calls))
call_id = call_id.strip()
response_item_id = raw_item_id if isinstance(raw_item_id, str) else None
response_item_id = self._derive_responses_function_call_id(call_id, response_item_id)
tool_calls.append(SimpleNamespace(
id=call_id,
call_id=call_id,
response_item_id=response_item_id,
type="function",
function=SimpleNamespace(name=fn_name, arguments=arguments),
))
final_text = "\n".join([p for p in content_parts if p]).strip()
if not final_text and hasattr(response, "output_text"):
out_text = getattr(response, "output_text", "")
if isinstance(out_text, str):
final_text = out_text.strip()
assistant_message = SimpleNamespace(
content=final_text,
tool_calls=tool_calls,
reasoning="\n\n".join(reasoning_parts).strip() if reasoning_parts else None,
reasoning_content=None,
reasoning_details=None,
codex_reasoning_items=reasoning_items_raw or None,
)
if tool_calls:
finish_reason = "tool_calls"
elif has_incomplete_items or (saw_commentary_phase and not saw_final_answer_phase):
finish_reason = "incomplete"
elif reasoning_items_raw and not final_text:
# Response contains only reasoning (encrypted thinking state) with
# no visible content or tool calls. The model is still thinking and
# needs another turn to produce the actual answer. Marking this as
# "stop" would send it into the empty-content retry loop which burns
# 3 retries then fails — treat it as incomplete instead so the Codex
# continuation path handles it correctly.
finish_reason = "incomplete"
else:
finish_reason = "stop"
return assistant_message, finish_reason
return _codex_normalize_codex_response(response)
def _thread_identity(self) -> str:
thread = threading.current_thread()
@@ -9287,8 +8530,7 @@ class AIAgent:
self.iteration_budget = IterationBudget(self.max_iterations)
# Log conversation turn start for debugging/observability
_preview_text = _summarize_user_message_for_log(user_message)
_msg_preview = (_preview_text[:80] + "...") if len(_preview_text) > 80 else _preview_text
_msg_preview = (user_message[:80] + "...") if len(user_message) > 80 else user_message
_msg_preview = _msg_preview.replace("\n", " ")
logger.info(
"conversation turn: session=%s model=%s provider=%s platform=%s history=%d msg=%r",
@@ -9336,8 +8578,7 @@ class AIAgent:
self._persist_user_message_idx = current_turn_user_idx
if not self.quiet_mode:
_print_preview = _summarize_user_message_for_log(user_message)
self._safe_print(f"💬 Starting conversation: '{_print_preview[:60]}{'...' if len(_print_preview) > 60 else ''}'")
self._safe_print(f"💬 Starting conversation: '{user_message[:60]}{'...' if len(user_message) > 60 else ''}'")
# ── System prompt (cached per session for prefix caching) ──
# Built once on first call, reused for all subsequent calls.
@@ -9727,10 +8968,7 @@ class AIAgent:
),
}}
except Exception:
tc["function"]["arguments"] = _repair_tool_call_arguments(
tc["function"]["arguments"],
tc["function"].get("name", "?"),
)
pass
new_tcs.append(tc)
am["tool_calls"] = new_tcs
@@ -11373,33 +10611,10 @@ class AIAgent:
if self.api_mode == "codex_responses":
assistant_message, finish_reason = self._normalize_codex_response(response)
elif self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import normalize_anthropic_response_v2
_nr = normalize_anthropic_response_v2(
from agent.anthropic_adapter import normalize_anthropic_response
assistant_message, finish_reason = normalize_anthropic_response(
response, strip_tool_prefix=self._is_anthropic_oauth
)
# Back-compat shim: downstream code expects SimpleNamespace with
# .content, .tool_calls, .reasoning, .reasoning_content,
# .reasoning_details attributes. This shim makes the cost of the
# old interface visible — it vanishes when the full transport
# wiring lands (PR 3+).
assistant_message = SimpleNamespace(
content=_nr.content,
tool_calls=[
SimpleNamespace(
id=tc.id,
type="function",
function=SimpleNamespace(name=tc.name, arguments=tc.arguments),
)
for tc in (_nr.tool_calls or [])
] or None,
reasoning=_nr.reasoning,
reasoning_content=None,
reasoning_details=(
_nr.provider_data.get("reasoning_details")
if _nr.provider_data else None
),
)
finish_reason = _nr.finish_reason
else:
assistant_message = response.choices[0].message
@@ -11827,12 +11042,10 @@ class AIAgent:
# should_compress(0) never fires. (#2153)
_compressor = self.context_compressor
if _compressor.last_prompt_tokens > 0:
# Only use prompt_tokens — completion/reasoning
# tokens don't consume context window space.
# Thinking models (GLM-5.1, QwQ, DeepSeek R1)
# inflate completion_tokens with reasoning,
# causing premature compression. (#12026)
_real_tokens = _compressor.last_prompt_tokens
_real_tokens = (
_compressor.last_prompt_tokens
+ _compressor.last_completion_tokens
)
else:
_real_tokens = estimate_messages_tokens_rough(messages)
@@ -12231,9 +11444,8 @@ class AIAgent:
# Determine if conversation completed successfully
completed = final_response is not None and api_call_count < self.max_iterations
# Save trajectory if enabled. ``user_message`` may be a multimodal
# list of parts; the trajectory format wants a plain string.
self._save_trajectory(messages, _summarize_user_message_for_log(user_message), completed)
# Save trajectory if enabled
self._save_trajectory(messages, user_message, completed)
# Clean up VM and browser for this task after conversation completes
self._cleanup_task_resources(effective_task_id)
+14 -2
View File
@@ -630,7 +630,7 @@ function Copy-ConfigTemplates {
New-Item -ItemType Directory -Force -Path "$HermesHome\audio_cache" | Out-Null
New-Item -ItemType Directory -Force -Path "$HermesHome\memories" | Out-Null
New-Item -ItemType Directory -Force -Path "$HermesHome\skills" | Out-Null
New-Item -ItemType Directory -Force -Path "$HermesHome\whatsapp\session" | Out-Null
# Create .env
$envPath = "$HermesHome\.env"
@@ -735,7 +735,19 @@ function Install-NodeDeps {
Pop-Location
}
# Install WhatsApp bridge dependencies
$bridgeDir = "$InstallDir\scripts\whatsapp-bridge"
if (Test-Path "$bridgeDir\package.json") {
Write-Info "Installing WhatsApp bridge dependencies..."
Push-Location $bridgeDir
try {
npm install --silent 2>&1 | Out-Null
Write-Success "WhatsApp bridge dependencies installed"
} catch {
Write-Warn "WhatsApp bridge npm install failed (WhatsApp may not work)"
}
Pop-Location
}
Pop-Location
}
+19 -10
View File
@@ -297,7 +297,7 @@ check_python() {
if command -v python >/dev/null 2>&1; then
PYTHON_PATH="$(command -v python)"
if "$PYTHON_PATH" -c 'import sys; raise SystemExit(0 if sys.version_info >= (3, 11) else 1)' 2>/dev/null; then
PYTHON_FOUND_VERSION="$("$PYTHON_PATH" --version 2>/dev/null)"
PYTHON_FOUND_VERSION=$($PYTHON_PATH --version 2>/dev/null)
log_success "Python found: $PYTHON_FOUND_VERSION"
return 0
fi
@@ -306,7 +306,7 @@ check_python() {
log_info "Installing Python via pkg..."
pkg install -y python >/dev/null
PYTHON_PATH="$(command -v python)"
PYTHON_FOUND_VERSION="$("$PYTHON_PATH" --version 2>/dev/null)"
PYTHON_FOUND_VERSION=$($PYTHON_PATH --version 2>/dev/null)
log_success "Python installed: $PYTHON_FOUND_VERSION"
return 0
fi
@@ -315,17 +315,18 @@ check_python() {
# Let uv handle Python — it can download and manage Python versions
# First check if a suitable Python is already available
if PYTHON_PATH="$("$UV_CMD" python find "$PYTHON_VERSION" 2>/dev/null)"; then
PYTHON_FOUND_VERSION="$("$PYTHON_PATH" --version 2>/dev/null)"
if $UV_CMD python find "$PYTHON_VERSION" &> /dev/null; then
PYTHON_PATH=$($UV_CMD python find "$PYTHON_VERSION")
PYTHON_FOUND_VERSION=$($PYTHON_PATH --version 2>/dev/null)
log_success "Python found: $PYTHON_FOUND_VERSION"
return 0
fi
# Python not found — use uv to install it (no sudo needed!)
log_info "Python $PYTHON_VERSION not found, installing via uv..."
if "$UV_CMD" python install "$PYTHON_VERSION"; then
PYTHON_PATH="$("$UV_CMD" python find "$PYTHON_VERSION")"
PYTHON_FOUND_VERSION="$("$PYTHON_PATH" --version 2>/dev/null)"
if $UV_CMD python install "$PYTHON_VERSION"; then
PYTHON_PATH=$($UV_CMD python find "$PYTHON_VERSION")
PYTHON_FOUND_VERSION=$($PYTHON_PATH --version 2>/dev/null)
log_success "Python installed: $PYTHON_FOUND_VERSION"
else
log_error "Failed to install Python $PYTHON_VERSION"
@@ -1051,7 +1052,7 @@ copy_config_templates() {
log_info "Setting up configuration files..."
# Create ~/.hermes directory structure (config at top level, code in subdir)
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,pairing,hooks,image_cache,audio_cache,memories,skills}
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,pairing,hooks,image_cache,audio_cache,memories,skills,whatsapp/session}
# Create .env at ~/.hermes/.env (top level, easy to find)
if [ ! -f "$HERMES_HOME/.env" ]; then
@@ -1121,7 +1122,7 @@ install_node_deps() {
if [ "$DISTRO" = "termux" ]; then
log_info "Skipping automatic Node/browser dependency setup on Termux"
log_info "Browser automation is not part of the tested Termux install path yet."
log_info "Browser automation and WhatsApp bridge are not part of the tested Termux install path yet."
log_info "If you want to experiment manually later, run: cd $INSTALL_DIR && npm install"
return 0
fi
@@ -1203,7 +1204,15 @@ install_node_deps() {
log_success "TUI dependencies installed"
fi
# Install WhatsApp bridge dependencies
if [ -f "$INSTALL_DIR/scripts/whatsapp-bridge/package.json" ]; then
log_info "Installing WhatsApp bridge dependencies..."
cd "$INSTALL_DIR/scripts/whatsapp-bridge"
npm install --silent 2>/dev/null || {
log_warn "WhatsApp bridge npm install failed (WhatsApp may not work)"
}
log_success "WhatsApp bridge dependencies installed"
fi
}
run_setup_wizard() {
-8
View File
@@ -66,8 +66,6 @@ AUTHOR_MAP = {
"104278804+Sertug17@users.noreply.github.com": "Sertug17",
"112503481+caentzminger@users.noreply.github.com": "caentzminger",
"258577966+voidborne-d@users.noreply.github.com": "voidborne-d",
"sir_even@icloud.com": "sirEven",
"36056348+sirEven@users.noreply.github.com": "sirEven",
"70424851+insecurejezza@users.noreply.github.com": "insecurejezza",
"254021826+dodo-reach@users.noreply.github.com": "dodo-reach",
"259807879+Bartok9@users.noreply.github.com": "Bartok9",
@@ -79,10 +77,8 @@ AUTHOR_MAP = {
"39405770+yyq4193@users.noreply.github.com": "yyq4193",
"Asunfly@users.noreply.github.com": "Asunfly",
"2500400+honghua@users.noreply.github.com": "honghua",
"462836+jplew@users.noreply.github.com": "jplew",
"nish3451@users.noreply.github.com": "nish3451",
"Mibayy@users.noreply.github.com": "Mibayy",
"mibayy@users.noreply.github.com": "Mibayy",
"135070653+sgaofen@users.noreply.github.com": "sgaofen",
"nocoo@users.noreply.github.com": "nocoo",
"30841158+n-WN@users.noreply.github.com": "n-WN",
@@ -111,7 +107,6 @@ AUTHOR_MAP = {
"linux2010@users.noreply.github.com": "Linux2010",
"elmatadorgh@users.noreply.github.com": "elmatadorgh",
"alexazzjjtt@163.com": "alexzhu0",
"1180176+Swift42@users.noreply.github.com": "Swift42",
"ruzzgarcn@gmail.com": "Ruzzgar",
"alireza78.crypto@gmail.com": "alireza78a",
"brooklyn.bb.nicholson@gmail.com": "brooklynnicholson",
@@ -178,9 +173,6 @@ AUTHOR_MAP = {
"1115117931@qq.com": "aaronagent",
"1506751656@qq.com": "hqhq1025",
"364939526@qq.com": "luyao618",
"hgk324@gmail.com": "houziershi",
"176644217+PStarH@users.noreply.github.com": "PStarH",
"51058514+Sanjays2402@users.noreply.github.com": "Sanjays2402",
"906014227@qq.com": "bingo906",
"aaronwong1999@icloud.com": "AaronWong1999",
"agents@kylefrench.dev": "DeployFaith",
+9 -30
View File
@@ -1,7 +1,7 @@
---
name: xurl
description: Interact with X/Twitter via xurl, the official X API CLI. Use for posting, replying, quoting, searching, timelines, mentions, likes, reposts, bookmarks, follows, DMs, media upload, and raw v2 endpoint access.
version: 1.1.0
version: 1.0.0
author: xdevplatform + openclaw + Hermes Agent
license: MIT
platforms: [linux, macos]
@@ -90,16 +90,12 @@ These steps must be performed by the user directly, NOT by the agent, because th
```bash
xurl auth apps add my-app --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET
```
5. Authenticate (specify `--app` to bind the token to your app):
5. Authenticate:
```bash
xurl auth oauth2 --app my-app
xurl auth oauth2
```
(This opens a browser for the OAuth 2.0 PKCE flow.)
6. Set the app as default so all commands use it:
```bash
xurl auth default my-app
```
7. Verify:
6. Verify:
```bash
xurl auth status
xurl whoami
@@ -107,8 +103,6 @@ These steps must be performed by the user directly, NOT by the agent, because th
After this, the agent can use any command below without further setup. OAuth 2.0 tokens auto-refresh.
> **Common pitfall:** If you omit `--app my-app` from `xurl auth oauth2`, the OAuth token is saved to the built-in `default` app profile — which has no client-id or client-secret. Commands will fail with auth errors even though the OAuth flow appeared to succeed. If you hit this, re-run `xurl auth oauth2 --app my-app` and `xurl auth default my-app`.
---
## Quick Reference
@@ -365,26 +359,11 @@ xurl --app staging /2/users/me # one-off against staging
## Agent Workflow
1. Verify prerequisites: `xurl --help` and `xurl auth status`.
2. **Check default app has credentials.** Parse the `auth status` output. The default app is marked with `▸`. If the default app shows `oauth2: (none)` but another app has a valid oauth2 user, tell the user to run `xurl auth default <that-app>` to fix it. This is the most common setup mistake — the user added an app with a custom name but never set it as default, so xurl keeps trying the empty `default` profile.
3. If auth is missing entirely, stop and direct the user to the "One-Time User Setup" section — do NOT attempt to register apps or pass secrets yourself.
4. Start with a cheap read (`xurl whoami`, `xurl user @handle`, `xurl search ... -n 3`) to confirm reachability.
5. Confirm the target post/user and the user's intent before any write action (post, reply, like, repost, DM, follow, block, delete).
6. Use JSON output directly — every response is already structured.
7. Never paste `~/.xurl` contents back into the conversation.
---
## Troubleshooting
| Symptom | Cause | Fix |
| --- | --- | --- |
| Auth errors after successful OAuth flow | Token saved to `default` app (no client-id/secret) instead of your named app | `xurl auth oauth2 --app my-app` then `xurl auth default my-app` |
| `unauthorized_client` during OAuth | App type set to "Native App" in X dashboard | Change to "Web app, automated app or bot" in User Authentication Settings |
| 401 on every request | Token expired or wrong default app | Check `xurl auth status` — verify `▸` points to an app with oauth2 tokens |
| `client-forbidden` / `client-not-enrolled` | X platform enrollment issue | Dashboard → Apps → Manage → Move to "Pay-per-use" package → Production environment |
| `CreditsDepleted` | $0 balance on X API | Buy credits (min $5) in Developer Console → Billing |
| `media processing failed` on image upload | Default category is `amplify_video` | Add `--category tweet_image --media-type image/png` |
| Two "Client Secret" values in X dashboard | UI bug — first is actually Client ID | Confirm on the "Keys and tokens" page; ID ends in `MTpjaQ` |
2. If auth is missing, stop and direct the user to the "One-Time User Setup" section — do NOT attempt to register apps or pass secrets yourself.
3. Start with a cheap read (`xurl whoami`, `xurl user @handle`, `xurl search ... -n 3`) to confirm reachability.
4. Confirm the target post/user and the user's intent before any write action (post, reply, like, repost, DM, follow, block, delete).
5. Use JSON output directly — every response is already structured.
6. Never paste `~/.xurl` contents back into the conversation.
---
-238
View File
@@ -1,238 +0,0 @@
"""Regression tests: normalize_anthropic_response_v2 vs v1.
Constructs mock Anthropic responses and asserts that the v2 function
(returning NormalizedResponse) produces identical field values to the
original v1 function (returning SimpleNamespace + finish_reason).
"""
import json
import pytest
from types import SimpleNamespace
from agent.anthropic_adapter import (
normalize_anthropic_response,
normalize_anthropic_response_v2,
)
from agent.transports.types import NormalizedResponse, ToolCall
# ---------------------------------------------------------------------------
# Helpers to build mock Anthropic SDK responses
# ---------------------------------------------------------------------------
def _text_block(text: str):
return SimpleNamespace(type="text", text=text)
def _thinking_block(thinking: str, signature: str = "sig_abc"):
return SimpleNamespace(type="thinking", thinking=thinking, signature=signature)
def _tool_use_block(id: str, name: str, input: dict):
return SimpleNamespace(type="tool_use", id=id, name=name, input=input)
def _response(content_blocks, stop_reason="end_turn"):
return SimpleNamespace(
content=content_blocks,
stop_reason=stop_reason,
usage=SimpleNamespace(
input_tokens=10,
output_tokens=5,
),
)
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestTextOnly:
"""Text-only response — no tools, no thinking."""
def setup_method(self):
self.resp = _response([_text_block("Hello world")])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_type(self):
assert isinstance(self.v2, NormalizedResponse)
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_finish_reason_matches(self):
assert self.v2.finish_reason == self.v1_finish
def test_no_tool_calls(self):
assert self.v2.tool_calls is None
assert self.v1_msg.tool_calls is None
def test_no_reasoning(self):
assert self.v2.reasoning is None
assert self.v1_msg.reasoning is None
class TestWithToolCalls:
"""Response with tool calls."""
def setup_method(self):
self.resp = _response(
[
_text_block("I'll check that"),
_tool_use_block("toolu_abc", "terminal", {"command": "ls"}),
_tool_use_block("toolu_def", "read_file", {"path": "/tmp"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_finish_reason(self):
assert self.v2.finish_reason == "tool_calls"
assert self.v1_finish == "tool_calls"
def test_tool_call_count(self):
assert len(self.v2.tool_calls) == 2
assert len(self.v1_msg.tool_calls) == 2
def test_tool_call_ids_match(self):
for i in range(2):
assert self.v2.tool_calls[i].id == self.v1_msg.tool_calls[i].id
def test_tool_call_names_match(self):
assert self.v2.tool_calls[0].name == "terminal"
assert self.v2.tool_calls[1].name == "read_file"
for i in range(2):
assert self.v2.tool_calls[i].name == self.v1_msg.tool_calls[i].function.name
def test_tool_call_arguments_match(self):
for i in range(2):
assert self.v2.tool_calls[i].arguments == self.v1_msg.tool_calls[i].function.arguments
def test_content_preserved(self):
assert self.v2.content == self.v1_msg.content
assert "check that" in self.v2.content
class TestWithThinking:
"""Response with thinking blocks (Claude 3.5+ extended thinking)."""
def setup_method(self):
self.resp = _response([
_thinking_block("Let me think about this carefully..."),
_text_block("The answer is 42."),
])
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
assert "think about this" in self.v2.reasoning
def test_reasoning_details_in_provider_data(self):
v1_details = self.v1_msg.reasoning_details
v2_details = self.v2.provider_data.get("reasoning_details") if self.v2.provider_data else None
assert v1_details is not None
assert v2_details is not None
assert len(v2_details) == len(v1_details)
def test_content_excludes_thinking(self):
assert self.v2.content == "The answer is 42."
class TestMixed:
"""Response with thinking + text + tool calls."""
def setup_method(self):
self.resp = _response(
[
_thinking_block("Planning my approach..."),
_text_block("I'll run the command"),
_tool_use_block("toolu_xyz", "terminal", {"command": "pwd"}),
],
stop_reason="tool_use",
)
self.v1_msg, self.v1_finish = normalize_anthropic_response(self.resp)
self.v2 = normalize_anthropic_response_v2(self.resp)
def test_all_fields_present(self):
assert self.v2.content is not None
assert self.v2.tool_calls is not None
assert self.v2.reasoning is not None
assert self.v2.finish_reason == "tool_calls"
def test_content_matches(self):
assert self.v2.content == self.v1_msg.content
def test_reasoning_matches(self):
assert self.v2.reasoning == self.v1_msg.reasoning
def test_tool_call_matches(self):
assert self.v2.tool_calls[0].id == self.v1_msg.tool_calls[0].id
assert self.v2.tool_calls[0].name == self.v1_msg.tool_calls[0].function.name
class TestStopReasons:
"""Verify finish_reason mapping matches between v1 and v2."""
@pytest.mark.parametrize("stop_reason,expected", [
("end_turn", "stop"),
("tool_use", "tool_calls"),
("max_tokens", "length"),
("stop_sequence", "stop"),
("refusal", "content_filter"),
("model_context_window_exceeded", "length"),
("unknown_future_reason", "stop"),
])
def test_stop_reason_mapping(self, stop_reason, expected):
resp = _response([_text_block("x")], stop_reason=stop_reason)
v1_msg, v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.finish_reason == v1_finish == expected
class TestStripToolPrefix:
"""Verify mcp_ prefix stripping works identically."""
def test_prefix_stripped(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=True)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=True)
assert v1_msg.tool_calls[0].function.name == "terminal"
assert v2.tool_calls[0].name == "terminal"
def test_prefix_kept(self):
resp = _response(
[_tool_use_block("toolu_1", "mcp_terminal", {"cmd": "ls"})],
stop_reason="tool_use",
)
v1_msg, _ = normalize_anthropic_response(resp, strip_tool_prefix=False)
v2 = normalize_anthropic_response_v2(resp, strip_tool_prefix=False)
assert v1_msg.tool_calls[0].function.name == "mcp_terminal"
assert v2.tool_calls[0].name == "mcp_terminal"
class TestEdgeCases:
"""Edge cases: empty content, no blocks, etc."""
def test_empty_content_blocks(self):
resp = _response([])
v1_msg, v1_finish = normalize_anthropic_response(resp)
v2 = normalize_anthropic_response_v2(resp)
assert v2.content == v1_msg.content
assert v2.content is None
def test_no_reasoning_details_means_none_provider_data(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert v2.provider_data is None
def test_v2_returns_dataclass_not_namespace(self):
resp = _response([_text_block("hi")])
v2 = normalize_anthropic_response_v2(resp)
assert isinstance(v2, NormalizedResponse)
assert not isinstance(v2, SimpleNamespace)
+2 -54
View File
@@ -51,12 +51,6 @@ def populated_db(db):
db.append_message("s1", role="assistant", content="I found the bug. Let me fix it.",
tool_calls=[{"function": {"name": "patch"}}])
db.append_message("s1", role="tool", content="patched successfully", tool_name="patch")
db.append_message(
"s1",
role="assistant",
content="Let me load the PR workflow skill.",
tool_calls=[{"function": {"name": "skill_view", "arguments": '{"name":"github-pr-workflow"}'}}],
)
db.append_message("s1", role="user", content="Thanks!")
db.append_message("s1", role="assistant", content="You're welcome!")
@@ -94,12 +88,6 @@ def populated_db(db):
db.append_message("s3", role="assistant", content="And search files",
tool_calls=[{"function": {"name": "search_files"}}])
db.append_message("s3", role="tool", content="found stuff", tool_name="search_files")
db.append_message(
"s3",
role="assistant",
content="Load the debugging skill.",
tool_calls=[{"function": {"name": "skill_view", "arguments": '{"name":"systematic-debugging"}'}}],
)
# Session 4: Discord, same model as s1, ended, 1 day ago
db.create_session(
@@ -112,15 +100,6 @@ def populated_db(db):
db.update_token_counts("s4", input_tokens=10000, output_tokens=5000)
db.append_message("s4", role="user", content="Quick question")
db.append_message("s4", role="assistant", content="Sure, go ahead")
db.append_message(
"s4",
role="assistant",
content="Load and update GitHub skills.",
tool_calls=[
{"function": {"name": "skill_view", "arguments": '{"name":"github-pr-workflow"}'}},
{"function": {"name": "skill_manage", "arguments": '{"name":"github-code-review"}'}},
],
)
# Session 5: Old session, 45 days ago (should be excluded from 30-day window)
db.create_session(
@@ -353,35 +332,6 @@ class TestInsightsPopulated:
total_pct = sum(t["percentage"] for t in tools)
assert total_pct == pytest.approx(100.0, abs=0.1)
def test_skill_breakdown(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
skills = report["skills"]
assert skills["summary"]["distinct_skills_used"] == 3
assert skills["summary"]["total_skill_loads"] == 3
assert skills["summary"]["total_skill_edits"] == 1
assert skills["summary"]["total_skill_actions"] == 4
top_skill = skills["top_skills"][0]
assert top_skill["skill"] == "github-pr-workflow"
assert top_skill["view_count"] == 2
assert top_skill["manage_count"] == 0
assert top_skill["total_count"] == 2
assert top_skill["last_used_at"] is not None
def test_skill_breakdown_respects_days_filter(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=3)
skills = report["skills"]
assert skills["summary"]["distinct_skills_used"] == 2
assert skills["summary"]["total_skill_loads"] == 2
assert skills["summary"]["total_skill_edits"] == 1
skill_names = [s["skill"] for s in skills["top_skills"]]
assert "systematic-debugging" not in skill_names
def test_activity_patterns(self, populated_db):
engine = InsightsEngine(populated_db)
report = engine.generate(days=30)
@@ -451,7 +401,6 @@ class TestTerminalFormatting:
assert "Overview" in text
assert "Models Used" in text
assert "Top Tools" in text
assert "Top Skills" in text
assert "Activity Patterns" in text
assert "Notable Sessions" in text
@@ -520,9 +469,8 @@ class TestGatewayFormatting:
report = engine.generate(days=30)
text = engine.format_gateway(report)
assert "$" in text
assert "Top Skills" in text
assert "Est. cost" in text
assert "$" not in text
assert "Est. cost" not in text
assert "cache" not in text.lower()
def test_gateway_format_shows_models(self, populated_db):
View File
-151
View File
@@ -1,151 +0,0 @@
"""Tests for agent/transports/types.py — dataclass construction + helpers."""
import json
import pytest
from agent.transports.types import (
NormalizedResponse,
ToolCall,
Usage,
build_tool_call,
map_finish_reason,
)
# ---------------------------------------------------------------------------
# ToolCall
# ---------------------------------------------------------------------------
class TestToolCall:
def test_basic_construction(self):
tc = ToolCall(id="call_abc", name="terminal", arguments='{"cmd": "ls"}')
assert tc.id == "call_abc"
assert tc.name == "terminal"
assert tc.arguments == '{"cmd": "ls"}'
assert tc.provider_data is None
def test_none_id(self):
tc = ToolCall(id=None, name="read_file", arguments="{}")
assert tc.id is None
def test_provider_data(self):
tc = ToolCall(
id="call_x",
name="t",
arguments="{}",
provider_data={"call_id": "call_x", "response_item_id": "fc_x"},
)
assert tc.provider_data["call_id"] == "call_x"
assert tc.provider_data["response_item_id"] == "fc_x"
# ---------------------------------------------------------------------------
# Usage
# ---------------------------------------------------------------------------
class TestUsage:
def test_defaults(self):
u = Usage()
assert u.prompt_tokens == 0
assert u.completion_tokens == 0
assert u.total_tokens == 0
assert u.cached_tokens == 0
def test_explicit(self):
u = Usage(prompt_tokens=100, completion_tokens=50, total_tokens=150, cached_tokens=80)
assert u.total_tokens == 150
# ---------------------------------------------------------------------------
# NormalizedResponse
# ---------------------------------------------------------------------------
class TestNormalizedResponse:
def test_text_only(self):
r = NormalizedResponse(content="hello", tool_calls=None, finish_reason="stop")
assert r.content == "hello"
assert r.tool_calls is None
assert r.finish_reason == "stop"
assert r.reasoning is None
assert r.usage is None
assert r.provider_data is None
def test_with_tool_calls(self):
tcs = [ToolCall(id="call_1", name="terminal", arguments='{"cmd":"pwd"}')]
r = NormalizedResponse(content=None, tool_calls=tcs, finish_reason="tool_calls")
assert r.finish_reason == "tool_calls"
assert len(r.tool_calls) == 1
assert r.tool_calls[0].name == "terminal"
def test_with_reasoning(self):
r = NormalizedResponse(
content="answer",
tool_calls=None,
finish_reason="stop",
reasoning="I thought about it",
)
assert r.reasoning == "I thought about it"
def test_with_provider_data(self):
r = NormalizedResponse(
content=None,
tool_calls=None,
finish_reason="stop",
provider_data={"reasoning_details": [{"type": "thinking", "thinking": "hmm"}]},
)
assert r.provider_data["reasoning_details"][0]["type"] == "thinking"
# ---------------------------------------------------------------------------
# build_tool_call
# ---------------------------------------------------------------------------
class TestBuildToolCall:
def test_dict_arguments_serialized(self):
tc = build_tool_call(id="call_1", name="terminal", arguments={"cmd": "ls"})
assert tc.arguments == json.dumps({"cmd": "ls"})
assert tc.provider_data is None
def test_string_arguments_passthrough(self):
tc = build_tool_call(id="call_2", name="read_file", arguments='{"path": "/tmp"}')
assert tc.arguments == '{"path": "/tmp"}'
def test_provider_fields(self):
tc = build_tool_call(
id="call_3",
name="terminal",
arguments="{}",
call_id="call_3",
response_item_id="fc_3",
)
assert tc.provider_data == {"call_id": "call_3", "response_item_id": "fc_3"}
def test_none_id(self):
tc = build_tool_call(id=None, name="t", arguments="{}")
assert tc.id is None
# ---------------------------------------------------------------------------
# map_finish_reason
# ---------------------------------------------------------------------------
class TestMapFinishReason:
ANTHROPIC_MAP = {
"end_turn": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"stop_sequence": "stop",
"refusal": "content_filter",
}
def test_known_reason(self):
assert map_finish_reason("end_turn", self.ANTHROPIC_MAP) == "stop"
assert map_finish_reason("tool_use", self.ANTHROPIC_MAP) == "tool_calls"
assert map_finish_reason("max_tokens", self.ANTHROPIC_MAP) == "length"
assert map_finish_reason("refusal", self.ANTHROPIC_MAP) == "content_filter"
def test_unknown_reason_defaults_to_stop(self):
assert map_finish_reason("something_new", self.ANTHROPIC_MAP) == "stop"
def test_none_reason(self):
assert map_finish_reason(None, self.ANTHROPIC_MAP) == "stop"
-1
View File
@@ -108,7 +108,6 @@ def make_restart_runner(
runner.hooks.emit = AsyncMock()
runner.pairing_store = MagicMock()
runner.session_store = MagicMock()
runner.session_store._entries = {}
runner.delivery_router = MagicMock()
platform_adapter = adapter or RestartTestAdapter()
-308
View File
@@ -1,308 +0,0 @@
"""End-to-end tests for inline image inputs on /v1/chat/completions and /v1/responses.
Covers the multimodal normalization path added to the API server. Unlike the
adapter-level tests that patch ``_run_agent``, these tests patch
``AIAgent.run_conversation`` instead so the adapter's full request-handling
path (including the ``run_agent`` prologue that used to crash on list content)
executes against a real aiohttp app.
"""
from unittest.mock import MagicMock, patch
import pytest
from aiohttp import web
from aiohttp.test_utils import TestClient, TestServer
from gateway.config import PlatformConfig
from gateway.platforms.api_server import (
APIServerAdapter,
_content_has_visible_payload,
_normalize_multimodal_content,
cors_middleware,
security_headers_middleware,
)
# ---------------------------------------------------------------------------
# Pure-function tests for _normalize_multimodal_content
# ---------------------------------------------------------------------------
class TestNormalizeMultimodalContent:
def test_string_passthrough(self):
assert _normalize_multimodal_content("hello") == "hello"
def test_none_returns_empty_string(self):
assert _normalize_multimodal_content(None) == ""
def test_text_only_list_collapses_to_string(self):
content = [{"type": "text", "text": "hi"}, {"type": "text", "text": "there"}]
assert _normalize_multimodal_content(content) == "hi\nthere"
def test_responses_input_text_canonicalized(self):
content = [{"type": "input_text", "text": "hello"}]
assert _normalize_multimodal_content(content) == "hello"
def test_image_url_preserved_with_text(self):
content = [
{"type": "text", "text": "describe this"},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png", "detail": "high"}},
]
out = _normalize_multimodal_content(content)
assert isinstance(out, list)
assert out == [
{"type": "text", "text": "describe this"},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png", "detail": "high"}},
]
def test_input_image_converted_to_canonical_shape(self):
content = [
{"type": "input_text", "text": "hi"},
{"type": "input_image", "image_url": "https://example.com/cat.png"},
]
out = _normalize_multimodal_content(content)
assert out == [
{"type": "text", "text": "hi"},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png"}},
]
def test_data_image_url_accepted(self):
content = [{"type": "image_url", "image_url": {"url": "data:image/png;base64,AAAA"}}]
out = _normalize_multimodal_content(content)
assert out == [{"type": "image_url", "image_url": {"url": "data:image/png;base64,AAAA"}}]
def test_non_image_data_url_rejected(self):
content = [{"type": "image_url", "image_url": {"url": "data:text/plain;base64,SGVsbG8="}}]
with pytest.raises(ValueError) as exc:
_normalize_multimodal_content(content)
assert str(exc.value).startswith("unsupported_content_type:")
def test_file_part_rejected(self):
with pytest.raises(ValueError) as exc:
_normalize_multimodal_content([{"type": "file", "file": {"file_id": "f_1"}}])
assert str(exc.value).startswith("unsupported_content_type:")
def test_input_file_part_rejected(self):
with pytest.raises(ValueError) as exc:
_normalize_multimodal_content([{"type": "input_file", "file_id": "f_1"}])
assert str(exc.value).startswith("unsupported_content_type:")
def test_missing_url_rejected(self):
with pytest.raises(ValueError) as exc:
_normalize_multimodal_content([{"type": "image_url", "image_url": {}}])
assert str(exc.value).startswith("invalid_image_url:")
def test_bad_scheme_rejected(self):
with pytest.raises(ValueError) as exc:
_normalize_multimodal_content([{"type": "image_url", "image_url": {"url": "ftp://example.com/x.png"}}])
assert str(exc.value).startswith("invalid_image_url:")
def test_unknown_part_type_rejected(self):
with pytest.raises(ValueError) as exc:
_normalize_multimodal_content([{"type": "audio", "audio": {}}])
assert str(exc.value).startswith("unsupported_content_type:")
class TestContentHasVisiblePayload:
def test_non_empty_string(self):
assert _content_has_visible_payload("hello")
def test_whitespace_only_string(self):
assert not _content_has_visible_payload(" ")
def test_list_with_image_only(self):
assert _content_has_visible_payload([{"type": "image_url", "image_url": {"url": "x"}}])
def test_list_with_only_empty_text(self):
assert not _content_has_visible_payload([{"type": "text", "text": ""}])
# ---------------------------------------------------------------------------
# HTTP integration — real aiohttp client hitting the adapter handlers
# ---------------------------------------------------------------------------
def _make_adapter() -> APIServerAdapter:
return APIServerAdapter(PlatformConfig(enabled=True))
def _create_app(adapter: APIServerAdapter) -> web.Application:
mws = [mw for mw in (cors_middleware, security_headers_middleware) if mw is not None]
app = web.Application(middlewares=mws)
app["api_server_adapter"] = adapter
app.router.add_post("/v1/chat/completions", adapter._handle_chat_completions)
app.router.add_post("/v1/responses", adapter._handle_responses)
app.router.add_get("/v1/responses/{response_id}", adapter._handle_get_response)
return app
@pytest.fixture
def adapter():
return _make_adapter()
class TestChatCompletionsMultimodalHTTP:
@pytest.mark.asyncio
async def test_inline_image_preserved_to_run_agent(self, adapter):
"""Multimodal user content reaches _run_agent as a list of parts."""
image_payload = [
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png", "detail": "high"}},
]
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(
adapter,
"_run_agent",
new=MagicMock(),
) as mock_run:
async def _stub(**kwargs):
mock_run.captured = kwargs
return (
{"final_response": "A cat.", "messages": [], "api_calls": 1},
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
)
mock_run.side_effect = _stub
resp = await cli.post(
"/v1/chat/completions",
json={
"model": "hermes-agent",
"messages": [{"role": "user", "content": image_payload}],
},
)
assert resp.status == 200, await resp.text()
assert mock_run.captured["user_message"] == image_payload
@pytest.mark.asyncio
async def test_text_only_array_collapses_to_string(self, adapter):
"""Text-only array becomes a plain string so logging stays unchanged."""
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new=MagicMock()) as mock_run:
async def _stub(**kwargs):
mock_run.captured = kwargs
return (
{"final_response": "ok", "messages": [], "api_calls": 1},
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
)
mock_run.side_effect = _stub
resp = await cli.post(
"/v1/chat/completions",
json={
"model": "hermes-agent",
"messages": [
{"role": "user", "content": [{"type": "text", "text": "hello"}]},
],
},
)
assert resp.status == 200, await resp.text()
assert mock_run.captured["user_message"] == "hello"
@pytest.mark.asyncio
async def test_file_part_returns_400(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/v1/chat/completions",
json={
"model": "hermes-agent",
"messages": [
{"role": "user", "content": [{"type": "file", "file": {"file_id": "f_1"}}]},
],
},
)
assert resp.status == 400
body = await resp.json()
assert body["error"]["code"] == "unsupported_content_type"
assert body["error"]["param"] == "messages[0].content"
@pytest.mark.asyncio
async def test_non_image_data_url_returns_400(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/v1/chat/completions",
json={
"model": "hermes-agent",
"messages": [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": "data:text/plain;base64,SGVsbG8="},
},
],
},
],
},
)
assert resp.status == 400
body = await resp.json()
assert body["error"]["code"] == "unsupported_content_type"
class TestResponsesMultimodalHTTP:
@pytest.mark.asyncio
async def test_input_image_canonicalized_and_forwarded(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
with patch.object(adapter, "_run_agent", new=MagicMock()) as mock_run:
async def _stub(**kwargs):
mock_run.captured = kwargs
return (
{"final_response": "ok", "messages": [], "api_calls": 1},
{"input_tokens": 0, "output_tokens": 0, "total_tokens": 0},
)
mock_run.side_effect = _stub
resp = await cli.post(
"/v1/responses",
json={
"model": "hermes-agent",
"input": [
{
"role": "user",
"content": [
{"type": "input_text", "text": "Describe."},
{
"type": "input_image",
"image_url": "https://example.com/cat.png",
},
],
}
],
},
)
assert resp.status == 200, await resp.text()
expected = [
{"type": "text", "text": "Describe."},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png"}},
]
assert mock_run.captured["user_message"] == expected
@pytest.mark.asyncio
async def test_input_file_returns_400(self, adapter):
app = _create_app(adapter)
async with TestClient(TestServer(app)) as cli:
resp = await cli.post(
"/v1/responses",
json={
"model": "hermes-agent",
"input": [
{
"role": "user",
"content": [{"type": "input_file", "file_id": "f_1"}],
}
],
},
)
assert resp.status == 400
body = await resp.json()
assert body["error"]["code"] == "unsupported_content_type"
+1 -30
View File
@@ -1,7 +1,6 @@
import asyncio
import shutil
import subprocess
from datetime import datetime
from unittest.mock import AsyncMock, MagicMock
import pytest
@@ -9,7 +8,7 @@ import pytest
import gateway.run as gateway_run
from gateway.platforms.base import MessageEvent, MessageType
from gateway.restart import DEFAULT_GATEWAY_RESTART_DRAIN_TIMEOUT
from gateway.session import SessionEntry, build_session_key
from gateway.session import build_session_key
from tests.gateway.restart_test_helpers import make_restart_runner, make_restart_source
@@ -243,31 +242,3 @@ async def test_shutdown_notification_send_failure_does_not_block():
# Should not raise
await runner._notify_active_sessions_of_shutdown()
@pytest.mark.asyncio
async def test_shutdown_notification_uses_persisted_origin_for_colon_ids():
"""Shutdown notifications should route from persisted origin, not reparsed keys."""
runner, adapter = make_restart_runner()
adapter.send = AsyncMock()
source = make_restart_source(chat_id="!room123:example.org", chat_type="group")
source.platform = gateway_run.Platform.MATRIX
session_key = build_session_key(source)
runner._running_agents[session_key] = MagicMock()
runner.session_store._entries = {
session_key: SessionEntry(
session_key=session_key,
session_id="sess-1",
created_at=datetime.now(),
updated_at=datetime.now(),
origin=source,
platform=source.platform,
chat_type=source.chat_type,
)
}
runner.adapters = {gateway_run.Platform.MATRIX: adapter}
await runner._notify_active_sessions_of_shutdown()
assert adapter.send.await_count == 1
assert adapter.send.await_args.args[0] == "!room123:example.org"
+1 -42
View File
@@ -23,7 +23,6 @@ from gateway.platforms.base import (
MessageType,
SendResult,
SUPPORTED_DOCUMENT_TYPES,
SUPPORTED_VIDEO_TYPES,
)
@@ -118,12 +117,6 @@ def _make_update(msg):
return update
def _make_video(file_obj=None):
video = MagicMock()
video.get_file = AsyncMock(return_value=file_obj or _make_file_obj(b"video-bytes"))
return video
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@@ -139,13 +132,10 @@ def adapter():
@pytest.fixture(autouse=True)
def _redirect_cache(tmp_path, monkeypatch):
"""Point document/video cache to tmp_path so tests don't touch ~/.hermes."""
"""Point document cache to tmp_path so tests don't touch ~/.hermes."""
monkeypatch.setattr(
"gateway.platforms.base.DOCUMENT_CACHE_DIR", tmp_path / "doc_cache"
)
monkeypatch.setattr(
"gateway.platforms.base.VIDEO_CACHE_DIR", tmp_path / "video_cache"
)
# ---------------------------------------------------------------------------
@@ -358,37 +348,6 @@ class TestDocumentDownloadBlock:
adapter.handle_message.assert_called_once()
class TestVideoDownloadBlock:
@pytest.mark.asyncio
async def test_native_video_is_cached(self, adapter):
file_obj = _make_file_obj(b"fake-mp4")
file_obj.file_path = "videos/clip.mp4"
msg = _make_message()
msg.video = _make_video(file_obj)
update = _make_update(msg)
await adapter._handle_media_message(update, MagicMock())
event = adapter.handle_message.call_args[0][0]
assert event.message_type == MessageType.VIDEO
assert len(event.media_urls) == 1
assert os.path.exists(event.media_urls[0])
assert event.media_types == [SUPPORTED_VIDEO_TYPES[".mp4"]]
@pytest.mark.asyncio
async def test_mp4_document_is_treated_as_video(self, adapter):
file_obj = _make_file_obj(b"fake-mp4-doc")
doc = _make_document(file_name="good.mp4", mime_type="video/mp4", file_size=1024, file_obj=file_obj)
msg = _make_message(document=doc)
update = _make_update(msg)
await adapter._handle_media_message(update, MagicMock())
event = adapter.handle_message.call_args[0][0]
assert event.message_type == MessageType.VIDEO
assert len(event.media_urls) == 1
assert os.path.exists(event.media_urls[0])
assert event.media_types == [SUPPORTED_VIDEO_TYPES[".mp4"]]
# ---------------------------------------------------------------------------
# TestMediaGroups — media group (album) buffering
# ---------------------------------------------------------------------------
+2 -2
View File
@@ -54,7 +54,7 @@ def test_get_codex_model_ids_falls_back_to_curated_defaults(tmp_path, monkeypatc
assert models[: len(DEFAULT_CODEX_MODELS)] == DEFAULT_CODEX_MODELS
assert "gpt-5.4" in models
assert "gpt-5.3-codex-spark" not in models
assert "gpt-5.3-codex-spark" in models
def test_get_codex_model_ids_adds_forward_compat_models_from_templates(monkeypatch):
@@ -65,7 +65,7 @@ def test_get_codex_model_ids_adds_forward_compat_models_from_templates(monkeypat
models = get_codex_model_ids(access_token="codex-access-token")
assert models == ["gpt-5.2-codex", "gpt-5.4-mini", "gpt-5.4", "gpt-5.3-codex"]
assert models == ["gpt-5.2-codex", "gpt-5.4-mini", "gpt-5.4", "gpt-5.3-codex", "gpt-5.3-codex-spark"]
def test_model_command_uses_runtime_access_token_for_codex_list(monkeypatch):
-26
View File
@@ -688,32 +688,6 @@ class TestTelegramMenuCommands:
f"Command '{name}' is {len(name)} chars (limit {_TG_NAME_LIMIT})"
)
def test_includes_plugin_commands_via_lazy_discovery(self, tmp_path, monkeypatch):
"""Telegram menu generation should discover plugin slash commands on first access."""
from unittest.mock import patch
import hermes_cli.plugins as plugins_mod
plugin_dir = tmp_path / "plugins" / "cmd-plugin"
plugin_dir.mkdir(parents=True, exist_ok=True)
(plugin_dir / "plugin.yaml").write_text(
"name: cmd-plugin\nversion: 0.1.0\ndescription: Test plugin\n"
)
(plugin_dir / "__init__.py").write_text(
"def register(ctx):\n"
" ctx.register_command('lcm', lambda args: 'ok', description='LCM status and diagnostics')\n"
)
# Opt-in: plugins are opt-in by default, so enable in config.yaml
(tmp_path / "config.yaml").write_text(
"plugins:\n enabled:\n - cmd-plugin\n"
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
with patch.object(plugins_mod, "_plugin_manager", None):
menu, _ = telegram_menu_commands(max_commands=100)
menu_names = {name for name, _ in menu}
assert "lcm" in menu_names
def test_excludes_telegram_disabled_skills(self, tmp_path, monkeypatch):
"""Skills disabled for telegram should not appear in the menu."""
from unittest.mock import patch, MagicMock
+3 -3
View File
@@ -459,7 +459,7 @@ class TestCustomProviderCompatibility:
migrate_config(interactive=False, quiet=True)
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert raw["_config_version"] == 21
assert raw["_config_version"] == 20
assert raw["providers"]["openai-direct"] == {
"api": "https://api.openai.com/v1",
"api_key": "test-key",
@@ -606,7 +606,7 @@ class TestInterimAssistantMessageConfig:
migrate_config(interactive=False, quiet=True)
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert raw["_config_version"] == 21
assert raw["_config_version"] == 20
assert raw["display"]["tool_progress"] == "off"
assert raw["display"]["interim_assistant_messages"] is True
@@ -626,7 +626,7 @@ class TestDiscordChannelPromptsConfig:
migrate_config(interactive=False, quiet=True)
raw = yaml.safe_load(config_path.read_text(encoding="utf-8"))
assert raw["_config_version"] == 21
assert raw["_config_version"] == 20
assert raw["discord"]["auto_thread"] is True
assert raw["discord"]["channel_prompts"] == {}
-60
View File
@@ -540,63 +540,3 @@ class TestValidateCodexAutoCorrection:
assert result["recognized"] is False
assert result.get("corrected_model") is None
assert "not found" in result["message"]
# -- probe_api_models — Cloudflare UA mitigation --------------------------------
class TestProbeApiModelsUserAgent:
"""Probing custom /v1/models must send a Hermes User-Agent.
Some custom Claude proxies (e.g. ``packyapi.com``) sit behind Cloudflare with
Browser Integrity Check enabled. The default ``Python-urllib/3.x`` signature
is rejected with HTTP 403 ``error code: 1010``, which ``probe_api_models``
swallowed into ``{"models": None}``, surfacing to users as a misleading
"Could not reach the ... API to validate ..." error even though the
endpoint is reachable and the listing exists.
"""
def _make_mock_response(self, body: bytes):
from unittest.mock import MagicMock
mock_resp = MagicMock()
mock_resp.__enter__ = MagicMock(return_value=mock_resp)
mock_resp.__exit__ = MagicMock(return_value=False)
mock_resp.read = MagicMock(return_value=body)
return mock_resp
def test_probe_sends_hermes_user_agent(self):
from unittest.mock import patch
body = b'{"data":[{"id":"claude-opus-4.7"}]}'
with patch(
"hermes_cli.models.urllib.request.urlopen",
return_value=self._make_mock_response(body),
) as mock_urlopen:
result = probe_api_models("sk-test", "https://example.com/v1")
assert result["models"] == ["claude-opus-4.7"]
# The urlopen call receives a Request object as its first positional arg
req = mock_urlopen.call_args[0][0]
ua = req.get_header("User-agent") # urllib title-cases header names
assert ua, "probe_api_models must send a User-Agent header"
assert ua.startswith("hermes-cli/"), (
f"User-Agent must advertise hermes-cli, got {ua!r}"
)
# Must not fall back to urllib's default — that's what Cloudflare 1010 blocks.
assert not ua.startswith("Python-urllib")
def test_probe_user_agent_sent_without_api_key(self):
"""UA must be present even for endpoints that don't need auth."""
from unittest.mock import patch
body = b'{"data":[]}'
with patch(
"hermes_cli.models.urllib.request.urlopen",
return_value=self._make_mock_response(body),
) as mock_urlopen:
probe_api_models(None, "https://example.com/v1")
req = mock_urlopen.call_args[0][0]
ua = req.get_header("User-agent")
assert ua and ua.startswith("hermes-cli/")
# No Authorization was set, but UA must still be present.
assert req.get_header("Authorization") is None
+11 -157
View File
@@ -30,19 +30,8 @@ from hermes_cli.plugins import (
def _make_plugin_dir(base: Path, name: str, *, register_body: str = "pass",
manifest_extra: dict | None = None,
auto_enable: bool = True) -> Path:
"""Create a minimal plugin directory with plugin.yaml + __init__.py.
If *auto_enable* is True (default), also write the plugin's name into
``<hermes_home>/config.yaml`` under ``plugins.enabled``. Plugins are
opt-in by default, so tests that expect the plugin to actually load
need this. Pass ``auto_enable=False`` for tests that exercise the
unenabled path.
*base* is expected to be ``<hermes_home>/plugins/``; we derive
``<hermes_home>`` from it by walking one level up.
"""
manifest_extra: dict | None = None) -> Path:
"""Create a minimal plugin directory with plugin.yaml + __init__.py."""
plugin_dir = base / name
plugin_dir.mkdir(parents=True, exist_ok=True)
@@ -54,31 +43,6 @@ def _make_plugin_dir(base: Path, name: str, *, register_body: str = "pass",
(plugin_dir / "__init__.py").write_text(
f"def register(ctx):\n {register_body}\n"
)
if auto_enable:
# Write/merge plugins.enabled in <HERMES_HOME>/config.yaml.
# Config is always read from HERMES_HOME (not from the project
# dir for project plugins), so that's where we opt in.
import os
hermes_home_str = os.environ.get("HERMES_HOME")
if hermes_home_str:
hermes_home = Path(hermes_home_str)
else:
hermes_home = base.parent
hermes_home.mkdir(parents=True, exist_ok=True)
cfg_path = hermes_home / "config.yaml"
cfg: dict = {}
if cfg_path.exists():
try:
cfg = yaml.safe_load(cfg_path.read_text()) or {}
except Exception:
cfg = {}
plugins_cfg = cfg.setdefault("plugins", {})
enabled = plugins_cfg.setdefault("enabled", [])
if isinstance(enabled, list) and name not in enabled:
enabled.append(name)
cfg_path.write_text(yaml.safe_dump(cfg))
return plugin_dir
@@ -138,12 +102,7 @@ class TestPluginDiscovery:
mgr.discover_and_load()
mgr.discover_and_load() # second call should no-op
# Filter out bundled plugins — they're always discovered.
non_bundled = {
n: p for n, p in mgr._plugins.items()
if p.manifest.source != "bundled"
}
assert len(non_bundled) == 1
assert len(mgr._plugins) == 1
def test_discover_skips_dir_without_manifest(self, tmp_path, monkeypatch):
"""Directories without plugin.yaml are silently skipped."""
@@ -154,12 +113,7 @@ class TestPluginDiscovery:
mgr = PluginManager()
mgr.discover_and_load()
# Filter out bundled plugins — they're always discovered.
non_bundled = {
n: p for n, p in mgr._plugins.items()
if p.manifest.source != "bundled"
}
assert len(non_bundled) == 0
assert len(mgr._plugins) == 0
def test_entry_points_scanned(self, tmp_path, monkeypatch):
"""Entry-point based plugins are discovered (mocked)."""
@@ -198,13 +152,7 @@ class TestPluginLoading:
plugin_dir = plugins_dir / "bad_plugin"
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.yaml").write_text(yaml.dump({"name": "bad_plugin"}))
# Explicitly enable so the loader tries to import it and hits the
# missing-init error.
hermes_home = tmp_path / "hermes_test"
(hermes_home / "config.yaml").write_text(
yaml.safe_dump({"plugins": {"enabled": ["bad_plugin"]}})
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
@@ -212,8 +160,6 @@ class TestPluginLoading:
assert "bad_plugin" in mgr._plugins
assert not mgr._plugins["bad_plugin"].enabled
assert mgr._plugins["bad_plugin"].error is not None
# Should be the missing-init error, not "not enabled".
assert "not enabled" not in mgr._plugins["bad_plugin"].error
def test_load_missing_register_fn(self, tmp_path, monkeypatch):
"""Plugin without register() function records an error."""
@@ -222,12 +168,7 @@ class TestPluginLoading:
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.yaml").write_text(yaml.dump({"name": "no_reg"}))
(plugin_dir / "__init__.py").write_text("# no register function\n")
# Explicitly enable it so the loader actually tries to import.
hermes_home = tmp_path / "hermes_test"
(hermes_home / "config.yaml").write_text(
yaml.safe_dump({"plugins": {"enabled": ["no_reg"]}})
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
@@ -463,11 +404,7 @@ class TestPluginContext:
' handler=lambda args, **kw: "echo",\n'
' )\n'
)
hermes_home = tmp_path / "hermes_test"
(hermes_home / "config.yaml").write_text(
yaml.safe_dump({"plugins": {"enabled": ["tool_plugin"]}})
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
@@ -501,11 +438,7 @@ class TestPluginToolVisibility:
' handler=lambda args, **kw: "ok",\n'
' )\n'
)
hermes_home = tmp_path / "hermes_test"
(hermes_home / "config.yaml").write_text(
yaml.safe_dump({"plugins": {"enabled": ["vis_plugin"]}})
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
@@ -795,81 +728,6 @@ class TestPluginCommands:
assert "cmd-b" in cmds
assert cmds["cmd-a"]["description"] == "A"
def test_get_plugin_command_handler_discovers_plugins_lazily(self, tmp_path, monkeypatch):
"""Handler lookup should work before any explicit discover_plugins() call."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(
plugins_dir,
"cmd-plugin",
register_body='ctx.register_command("lazycmd", lambda a: f"ok:{a}", description="Lazy")',
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
import hermes_cli.plugins as plugins_mod
with patch.object(plugins_mod, "_plugin_manager", None):
handler = get_plugin_command_handler("lazycmd")
assert handler is not None
assert handler("x") == "ok:x"
def test_get_plugin_commands_discovers_plugins_lazily(self, tmp_path, monkeypatch):
"""Command listing should trigger plugin discovery on first access."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
_make_plugin_dir(
plugins_dir,
"cmd-plugin",
register_body='ctx.register_command("lazycmd", lambda a: a, description="Lazy")',
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
import hermes_cli.plugins as plugins_mod
with patch.object(plugins_mod, "_plugin_manager", None):
cmds = get_plugin_commands()
assert "lazycmd" in cmds
assert cmds["lazycmd"]["description"] == "Lazy"
def test_get_plugin_context_engine_discovers_plugins_lazily(self, tmp_path, monkeypatch):
"""Context engine lookup should work before any explicit discover_plugins() call."""
hermes_home = tmp_path / "hermes_test"
plugins_dir = hermes_home / "plugins"
plugin_dir = plugins_dir / "engine-plugin"
plugin_dir.mkdir(parents=True, exist_ok=True)
(plugin_dir / "plugin.yaml").write_text(
yaml.dump({
"name": "engine-plugin",
"version": "0.1.0",
"description": "Test engine plugin",
})
)
(plugin_dir / "__init__.py").write_text(
"from agent.context_engine import ContextEngine\n\n"
"class StubEngine(ContextEngine):\n"
" @property\n"
" def name(self):\n"
" return 'stub-engine'\n\n"
" def update_from_response(self, usage):\n"
" return None\n\n"
" def should_compress(self, prompt_tokens):\n"
" return False\n\n"
" def compress(self, messages, current_tokens):\n"
" return messages\n\n"
"def register(ctx):\n"
" ctx.register_context_engine(StubEngine())\n"
)
# Opt-in: plugins are opt-in by default, so enable in config.yaml
(hermes_home / "config.yaml").write_text(
yaml.safe_dump({"plugins": {"enabled": ["engine-plugin"]}})
)
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
import hermes_cli.plugins as plugins_mod
with patch.object(plugins_mod, "_plugin_manager", None):
engine = plugins_mod.get_plugin_context_engine()
assert engine is not None
assert engine.name == "stub-engine"
def test_commands_tracked_on_loaded_plugin(self, tmp_path, monkeypatch):
"""Commands registered during discover_and_load() are tracked on LoadedPlugin."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
@@ -891,24 +749,20 @@ class TestPluginCommands:
def test_commands_in_list_plugins_output(self, tmp_path, monkeypatch):
"""list_plugins() includes command count."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
# Set HERMES_HOME BEFORE _make_plugin_dir so auto-enable targets
# the right config.yaml.
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
_make_plugin_dir(
plugins_dir, "cmd-plugin",
register_body=(
'ctx.register_command("mycmd", lambda a: "ok", description="Test")'
),
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
info = mgr.list_plugins()
# Filter out bundled plugins — they're always discovered.
cmd_info = [p for p in info if p["name"] == "cmd-plugin"]
assert len(cmd_info) == 1
assert cmd_info[0]["commands"] == 1
assert len(info) == 1
assert info[0]["commands"] == 1
def test_handler_receives_raw_args(self):
"""The handler is called with the raw argument string."""
@@ -1,137 +0,0 @@
"""Tests for providers config entry validation and normalization.
Covers Issue #9332: camelCase keys silently ignored, non-URL strings
accepted as base_url, and unknown keys go unreported.
"""
import logging
from unittest.mock import patch
import pytest
from hermes_cli.config import _normalize_custom_provider_entry
class TestNormalizeCustomProviderEntry:
"""Tests for _normalize_custom_provider_entry validation."""
def test_valid_entry_snake_case(self):
"""Standard snake_case entry should normalize correctly."""
entry = {
"base_url": "https://api.example.com/v1",
"api_key": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="myhost")
assert result is not None
assert result["name"] == "myhost"
assert result["base_url"] == "https://api.example.com/v1"
assert result["api_key"] == "sk-test-key"
def test_camel_case_api_key_mapped(self):
"""camelCase apiKey should be auto-mapped to api_key."""
entry = {
"base_url": "https://api.example.com/v1",
"apiKey": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="myhost")
assert result is not None
assert result["api_key"] == "sk-test-key"
def test_camel_case_base_url_mapped(self):
"""camelCase baseUrl should be auto-mapped to base_url."""
entry = {
"baseUrl": "https://api.example.com/v1",
"api_key": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="myhost")
assert result is not None
assert result["base_url"] == "https://api.example.com/v1"
def test_non_url_api_field_rejected(self):
"""Non-URL string in 'api' field should be skipped with a warning."""
entry = {
"api": "openai-reverse-proxy",
"api_key": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="nvidia")
# Should return None because no valid URL was found
assert result is None
def test_valid_url_in_api_field_accepted(self):
"""Valid URL in 'api' field should still be accepted."""
entry = {
"api": "https://integrate.api.nvidia.com/v1",
"api_key": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="nvidia")
assert result is not None
assert result["base_url"] == "https://integrate.api.nvidia.com/v1"
def test_base_url_preferred_over_api(self):
"""base_url should be checked before api field."""
entry = {
"base_url": "https://correct.example.com/v1",
"api": "https://wrong.example.com/v1",
"api_key": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="test")
assert result is not None
assert result["base_url"] == "https://correct.example.com/v1"
def test_unknown_keys_logged(self, caplog):
"""Unknown config keys should produce a warning."""
entry = {
"base_url": "https://api.example.com/v1",
"api_key": "sk-test-key",
"unknownField": "value",
"anotherBad": 42,
}
with caplog.at_level(logging.WARNING):
result = _normalize_custom_provider_entry(entry, provider_key="test")
assert result is not None
assert any("unknown config keys" in r.message.lower() for r in caplog.records)
def test_camel_case_warning_logged(self, caplog):
"""camelCase alias mapping should produce a warning."""
entry = {
"baseUrl": "https://api.example.com/v1",
"apiKey": "sk-test-key",
}
with caplog.at_level(logging.WARNING):
result = _normalize_custom_provider_entry(entry, provider_key="test")
assert result is not None
camel_warnings = [r for r in caplog.records if "camelcase" in r.message.lower() or "auto-mapped" in r.message.lower()]
assert len(camel_warnings) >= 1
def test_snake_case_takes_precedence_over_camel(self):
"""If both snake_case and camelCase exist, snake_case wins."""
entry = {
"api_key": "snake-key",
"apiKey": "camel-key",
"base_url": "https://api.example.com/v1",
}
result = _normalize_custom_provider_entry(entry, provider_key="test")
assert result is not None
assert result["api_key"] == "snake-key"
def test_non_dict_returns_none(self):
"""Non-dict entry should return None."""
assert _normalize_custom_provider_entry("not-a-dict") is None
assert _normalize_custom_provider_entry(42) is None
assert _normalize_custom_provider_entry(None) is None
def test_no_url_returns_none(self):
"""Entry with no valid URL in any field should return None."""
entry = {
"api_key": "sk-test-key",
}
result = _normalize_custom_provider_entry(entry, provider_key="test")
assert result is None
def test_no_name_returns_none(self):
"""Entry with no name and no provider_key should return None."""
entry = {
"base_url": "https://api.example.com/v1",
}
result = _normalize_custom_provider_entry(entry, provider_key="")
assert result is None
+3 -80
View File
@@ -101,19 +101,14 @@ class TestWebServerEndpoints:
"""Test the FastAPI REST endpoints using Starlette TestClient."""
@pytest.fixture(autouse=True)
def _setup_test_client(self, monkeypatch, _isolate_hermes_home):
"""Create a TestClient and isolate the state DB under the test HERMES_HOME."""
def _setup_test_client(self):
"""Create a TestClient — import is deferred to avoid requiring fastapi."""
try:
from starlette.testclient import TestClient
except ImportError:
pytest.skip("fastapi/starlette not installed")
import hermes_state
from hermes_constants import get_hermes_home
from hermes_cli.web_server import app, _SESSION_TOKEN
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", get_hermes_home() / "state.db")
self.client = TestClient(app)
self.client.headers["Authorization"] = f"Bearer {_SESSION_TOKEN}"
@@ -516,18 +511,12 @@ class TestNewEndpoints:
"""Tests for session detail, logs, cron, skills, tools, raw config, analytics."""
@pytest.fixture(autouse=True)
def _setup(self, monkeypatch, _isolate_hermes_home):
def _setup(self):
try:
from starlette.testclient import TestClient
except ImportError:
pytest.skip("fastapi/starlette not installed")
import hermes_state
from hermes_constants import get_hermes_home
from hermes_cli.web_server import app, _SESSION_TOKEN
monkeypatch.setattr(hermes_state, "DEFAULT_DB_PATH", get_hermes_home() / "state.db")
self.client = TestClient(app)
self.client.headers["Authorization"] = f"Bearer {_SESSION_TOKEN}"
@@ -703,74 +692,8 @@ class TestNewEndpoints:
assert "daily" in data
assert "by_model" in data
assert "totals" in data
assert "skills" in data
assert isinstance(data["daily"], list)
assert "total_sessions" in data["totals"]
assert data["skills"] == {
"summary": {
"total_skill_loads": 0,
"total_skill_edits": 0,
"total_skill_actions": 0,
"distinct_skills_used": 0,
},
"top_skills": [],
}
def test_analytics_usage_includes_skill_breakdown(self):
from hermes_state import SessionDB
db = SessionDB()
try:
db.create_session(
session_id="skills-analytics-test",
source="cli",
model="anthropic/claude-sonnet-4",
)
db.update_token_counts(
"skills-analytics-test",
input_tokens=120,
output_tokens=45,
)
db.append_message(
"skills-analytics-test",
role="assistant",
content="Loading and updating skills.",
tool_calls=[
{
"function": {
"name": "skill_view",
"arguments": '{"name":"github-pr-workflow"}',
}
},
{
"function": {
"name": "skill_manage",
"arguments": '{"name":"github-code-review"}',
}
},
],
)
finally:
db.close()
resp = self.client.get("/api/analytics/usage?days=7")
assert resp.status_code == 200
data = resp.json()
assert data["skills"]["summary"] == {
"total_skill_loads": 1,
"total_skill_edits": 1,
"total_skill_actions": 2,
"distinct_skills_used": 2,
}
assert len(data["skills"]["top_skills"]) == 2
top_skill = data["skills"]["top_skills"][0]
assert top_skill["skill"] == "github-pr-workflow"
assert top_skill["view_count"] == 1
assert top_skill["manage_count"] == 0
assert top_skill["total_count"] == 1
assert top_skill["last_used_at"] is not None
def test_session_token_endpoint_removed(self):
"""GET /api/auth/session-token no longer exists."""
-427
View File
@@ -1,427 +0,0 @@
"""Tests for the disk-cleanup plugin.
Covers the bundled plugin at ``plugins/disk-cleanup/``:
* ``disk_cleanup`` library: track / forget / dry_run / quick / status,
``is_safe_path`` and ``guess_category`` filtering.
* Plugin ``__init__``: ``post_tool_call`` hook auto-tracks files created
by ``write_file`` / ``terminal``; ``on_session_end`` hook runs quick
cleanup when anything was tracked during the turn.
* Slash command handler: status / dry-run / quick / track / forget /
unknown subcommand behaviours.
* Bundled-plugin discovery via ``PluginManager.discover_and_load``.
"""
import importlib
import json
import sys
from pathlib import Path
import pytest
@pytest.fixture(autouse=True)
def _isolate_env(tmp_path, monkeypatch):
"""Isolate HERMES_HOME for each test.
The global hermetic fixture already redirects HERMES_HOME to a tempdir,
but we want the plugin to work with a predictable subpath. We reset
HERMES_HOME here for clarity.
"""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
yield hermes_home
def _load_lib():
"""Import the plugin's library module directly from the repo path."""
repo_root = Path(__file__).resolve().parents[2]
lib_path = repo_root / "plugins" / "disk-cleanup" / "disk_cleanup.py"
spec = importlib.util.spec_from_file_location(
"disk_cleanup_under_test", lib_path
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _load_plugin_init():
"""Import the plugin's __init__.py (which depends on the library)."""
repo_root = Path(__file__).resolve().parents[2]
plugin_dir = repo_root / "plugins" / "disk-cleanup"
# Use the PluginManager's module naming convention so relative imports work.
spec = importlib.util.spec_from_file_location(
"hermes_plugins.disk_cleanup",
plugin_dir / "__init__.py",
submodule_search_locations=[str(plugin_dir)],
)
# Ensure parent namespace package exists for the relative `. import disk_cleanup`
import types
if "hermes_plugins" not in sys.modules:
ns = types.ModuleType("hermes_plugins")
ns.__path__ = []
sys.modules["hermes_plugins"] = ns
mod = importlib.util.module_from_spec(spec)
mod.__package__ = "hermes_plugins.disk_cleanup"
mod.__path__ = [str(plugin_dir)]
sys.modules["hermes_plugins.disk_cleanup"] = mod
spec.loader.exec_module(mod)
return mod
# ---------------------------------------------------------------------------
# Library tests
# ---------------------------------------------------------------------------
class TestIsSafePath:
def test_accepts_path_under_hermes_home(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "subdir" / "file.txt"
p.parent.mkdir()
p.write_text("x")
assert dg.is_safe_path(p) is True
def test_rejects_outside_hermes_home(self, _isolate_env):
dg = _load_lib()
assert dg.is_safe_path(Path("/etc/passwd")) is False
def test_accepts_tmp_hermes_prefix(self, _isolate_env, tmp_path):
dg = _load_lib()
assert dg.is_safe_path(Path("/tmp/hermes-abc/x.log")) is True
def test_rejects_plain_tmp(self, _isolate_env):
dg = _load_lib()
assert dg.is_safe_path(Path("/tmp/other.log")) is False
def test_rejects_windows_mount(self, _isolate_env):
dg = _load_lib()
assert dg.is_safe_path(Path("/mnt/c/Users/x/test.txt")) is False
class TestGuessCategory:
def test_test_prefix(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "test_foo.py"
p.write_text("x")
assert dg.guess_category(p) == "test"
def test_tmp_prefix(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "tmp_foo.log"
p.write_text("x")
assert dg.guess_category(p) == "test"
def test_dot_test_suffix(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "mything.test.js"
p.write_text("x")
assert dg.guess_category(p) == "test"
def test_skips_protected_top_level(self, _isolate_env):
dg = _load_lib()
logs_dir = _isolate_env / "logs"
logs_dir.mkdir()
p = logs_dir / "test_log.txt"
p.write_text("x")
# Even though it matches test_* pattern, logs/ is excluded.
assert dg.guess_category(p) is None
def test_cron_subtree_categorised(self, _isolate_env):
dg = _load_lib()
cron_dir = _isolate_env / "cron"
cron_dir.mkdir()
p = cron_dir / "job_output.md"
p.write_text("x")
assert dg.guess_category(p) == "cron-output"
def test_ordinary_file_returns_none(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "notes.md"
p.write_text("x")
assert dg.guess_category(p) is None
class TestTrackForgetQuick:
def test_track_then_quick_deletes_test(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "test_a.py"
p.write_text("x")
assert dg.track(str(p), "test", silent=True) is True
summary = dg.quick()
assert summary["deleted"] == 1
assert not p.exists()
def test_track_dedup(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "test_a.py"
p.write_text("x")
assert dg.track(str(p), "test", silent=True) is True
# Second call returns False (already tracked)
assert dg.track(str(p), "test", silent=True) is False
def test_track_rejects_outside_home(self, _isolate_env):
dg = _load_lib()
# /etc/hostname exists on most Linux boxes; fall back if not.
outside = "/etc/hostname" if Path("/etc/hostname").exists() else "/etc/passwd"
assert dg.track(outside, "test", silent=True) is False
def test_track_skips_missing(self, _isolate_env):
dg = _load_lib()
assert dg.track(str(_isolate_env / "nope.txt"), "test", silent=True) is False
def test_forget_removes_entry(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "keep.tmp"
p.write_text("x")
dg.track(str(p), "temp", silent=True)
assert dg.forget(str(p)) == 1
assert p.exists() # forget does NOT delete the file
def test_quick_preserves_unexpired_temp(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "fresh.tmp"
p.write_text("x")
dg.track(str(p), "temp", silent=True)
summary = dg.quick()
assert summary["deleted"] == 0
assert p.exists()
def test_quick_preserves_protected_top_level_dirs(self, _isolate_env):
dg = _load_lib()
for d in ("logs", "memories", "sessions", "cron", "cache"):
(_isolate_env / d).mkdir()
dg.quick()
for d in ("logs", "memories", "sessions", "cron", "cache"):
assert (_isolate_env / d).exists(), f"{d}/ should be preserved"
class TestStatus:
def test_empty_status(self, _isolate_env):
dg = _load_lib()
s = dg.status()
assert s["total_tracked"] == 0
assert s["top10"] == []
def test_status_with_entries(self, _isolate_env):
dg = _load_lib()
p = _isolate_env / "big.tmp"
p.write_text("y" * 100)
dg.track(str(p), "temp", silent=True)
s = dg.status()
assert s["total_tracked"] == 1
assert len(s["top10"]) == 1
rendered = dg.format_status(s)
assert "temp" in rendered
assert "big.tmp" in rendered
class TestDryRun:
def test_classifies_by_category(self, _isolate_env):
dg = _load_lib()
test_f = _isolate_env / "test_x.py"
test_f.write_text("x")
big = _isolate_env / "big.bin"
big.write_bytes(b"z" * 10)
dg.track(str(test_f), "test", silent=True)
dg.track(str(big), "other", silent=True)
auto, prompt = dg.dry_run()
# test → auto, other → neither (doesn't hit any rule)
assert any(i["path"] == str(test_f) for i in auto)
# ---------------------------------------------------------------------------
# Plugin hooks tests
# ---------------------------------------------------------------------------
class TestPostToolCallHook:
def test_write_file_test_pattern_tracked(self, _isolate_env):
pi = _load_plugin_init()
p = _isolate_env / "test_created.py"
p.write_text("x")
pi._on_post_tool_call(
tool_name="write_file",
args={"path": str(p), "content": "x"},
result="OK",
task_id="t1", session_id="s1",
)
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
data = json.loads(tracked_file.read_text())
assert len(data) == 1
assert data[0]["category"] == "test"
def test_write_file_non_test_not_tracked(self, _isolate_env):
pi = _load_plugin_init()
p = _isolate_env / "notes.md"
p.write_text("x")
pi._on_post_tool_call(
tool_name="write_file",
args={"path": str(p), "content": "x"},
result="OK",
task_id="t2", session_id="s2",
)
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
assert not tracked_file.exists() or tracked_file.read_text().strip() == "[]"
def test_terminal_command_picks_up_paths(self, _isolate_env):
pi = _load_plugin_init()
p = _isolate_env / "tmp_created.log"
p.write_text("x")
pi._on_post_tool_call(
tool_name="terminal",
args={"command": f"touch {p}"},
result=f"created {p}\n",
task_id="t3", session_id="s3",
)
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
data = json.loads(tracked_file.read_text())
assert any(Path(i["path"]) == p.resolve() for i in data)
def test_ignores_unrelated_tool(self, _isolate_env):
pi = _load_plugin_init()
pi._on_post_tool_call(
tool_name="read_file",
args={"path": str(_isolate_env / "test_x.py")},
result="contents",
task_id="t4", session_id="s4",
)
# read_file should never trigger tracking.
tracked_file = _isolate_env / "disk-cleanup" / "tracked.json"
assert not tracked_file.exists() or tracked_file.read_text().strip() == "[]"
class TestOnSessionEndHook:
def test_runs_quick_when_test_files_tracked(self, _isolate_env):
pi = _load_plugin_init()
p = _isolate_env / "test_cleanup.py"
p.write_text("x")
pi._on_post_tool_call(
tool_name="write_file",
args={"path": str(p), "content": "x"},
result="OK",
task_id="", session_id="s1",
)
assert p.exists()
pi._on_session_end(session_id="s1", completed=True, interrupted=False)
assert not p.exists(), "test file should be auto-deleted"
def test_noop_when_no_test_tracked(self, _isolate_env):
pi = _load_plugin_init()
# Nothing tracked → on_session_end should not raise.
pi._on_session_end(session_id="empty", completed=True, interrupted=False)
# ---------------------------------------------------------------------------
# Slash command
# ---------------------------------------------------------------------------
class TestSlashCommand:
def test_help(self, _isolate_env):
pi = _load_plugin_init()
out = pi._handle_slash("help")
assert "disk-cleanup" in out
assert "status" in out
def test_status_empty(self, _isolate_env):
pi = _load_plugin_init()
out = pi._handle_slash("status")
assert "nothing tracked" in out
def test_track_rejects_missing(self, _isolate_env):
pi = _load_plugin_init()
out = pi._handle_slash(
f"track {_isolate_env / 'nope.txt'} temp"
)
assert "Not tracked" in out
def test_track_rejects_bad_category(self, _isolate_env):
pi = _load_plugin_init()
p = _isolate_env / "a.tmp"
p.write_text("x")
out = pi._handle_slash(f"track {p} banana")
assert "Unknown category" in out
def test_track_and_forget(self, _isolate_env):
pi = _load_plugin_init()
p = _isolate_env / "a.tmp"
p.write_text("x")
out = pi._handle_slash(f"track {p} temp")
assert "Tracked" in out
out = pi._handle_slash(f"forget {p}")
assert "Removed 1" in out
def test_unknown_subcommand(self, _isolate_env):
pi = _load_plugin_init()
out = pi._handle_slash("foobar")
assert "Unknown subcommand" in out
def test_quick_on_empty(self, _isolate_env):
pi = _load_plugin_init()
out = pi._handle_slash("quick")
assert "Cleaned 0 files" in out
# ---------------------------------------------------------------------------
# Bundled-plugin discovery
# ---------------------------------------------------------------------------
class TestBundledDiscovery:
def _write_enabled_config(self, hermes_home, names):
"""Write plugins.enabled allow-list to config.yaml."""
import yaml
cfg_path = hermes_home / "config.yaml"
cfg_path.write_text(yaml.safe_dump({"plugins": {"enabled": list(names)}}))
def test_disk_cleanup_discovered_but_not_loaded_by_default(self, _isolate_env):
"""Bundled plugins are discovered but NOT loaded without opt-in."""
from hermes_cli import plugins as pmod
mgr = pmod.PluginManager()
mgr.discover_and_load()
# Discovered — appears in the registry
assert "disk-cleanup" in mgr._plugins
loaded = mgr._plugins["disk-cleanup"]
assert loaded.manifest.source == "bundled"
# But NOT enabled — no hooks or commands registered
assert not loaded.enabled
assert loaded.error and "not enabled" in loaded.error
def test_disk_cleanup_loads_when_enabled(self, _isolate_env):
"""Adding to plugins.enabled activates the bundled plugin."""
self._write_enabled_config(_isolate_env, ["disk-cleanup"])
from hermes_cli import plugins as pmod
mgr = pmod.PluginManager()
mgr.discover_and_load()
loaded = mgr._plugins["disk-cleanup"]
assert loaded.enabled
assert "post_tool_call" in loaded.hooks_registered
assert "on_session_end" in loaded.hooks_registered
assert "disk-cleanup" in loaded.commands_registered
def test_disabled_beats_enabled(self, _isolate_env):
"""plugins.disabled wins even if the plugin is also in plugins.enabled."""
import yaml
cfg_path = _isolate_env / "config.yaml"
cfg_path.write_text(yaml.safe_dump({
"plugins": {
"enabled": ["disk-cleanup"],
"disabled": ["disk-cleanup"],
}
}))
from hermes_cli import plugins as pmod
mgr = pmod.PluginManager()
mgr.discover_and_load()
loaded = mgr._plugins["disk-cleanup"]
assert not loaded.enabled
assert loaded.error == "disabled via config"
def test_memory_and_context_engine_subdirs_skipped(self, _isolate_env):
"""Bundled scan must NOT pick up plugins/memory or plugins/context_engine
as top-level plugins they have their own discovery paths."""
self._write_enabled_config(
_isolate_env, ["memory", "context_engine", "disk-cleanup"]
)
from hermes_cli import plugins as pmod
mgr = pmod.PluginManager()
mgr.discover_and_load()
assert "memory" not in mgr._plugins
assert "context_engine" not in mgr._plugins
@@ -1,61 +0,0 @@
"""Verify compression trigger excludes reasoning/completion tokens (#12026).
Thinking models (GLM-5.1, QwQ, DeepSeek R1) inflate completion_tokens with
reasoning tokens that don't consume context window space. The compression
trigger must use only prompt_tokens so sessions aren't prematurely split.
"""
import types
import pytest
from unittest.mock import MagicMock, patch
def _make_agent_stub(prompt_tokens, completion_tokens, threshold_tokens):
"""Create a minimal stub that exercises the compression check path."""
compressor = types.SimpleNamespace(
last_prompt_tokens=prompt_tokens,
last_completion_tokens=completion_tokens,
threshold_tokens=threshold_tokens,
)
# Replicate the fixed logic from run_agent.py ~line 11273
if compressor.last_prompt_tokens > 0:
real_tokens = compressor.last_prompt_tokens # Fixed: no completion
else:
real_tokens = 0
return real_tokens, compressor
class TestCompressionTriggerExcludesReasoning:
def test_high_reasoning_tokens_should_not_trigger_compression(self):
"""With the old bug, 40k prompt + 80k reasoning = 120k > 100k threshold.
After the fix, only 40k prompt is compared no compression."""
real_tokens, comp = _make_agent_stub(
prompt_tokens=40_000,
completion_tokens=80_000, # reasoning-heavy model
threshold_tokens=100_000,
)
assert real_tokens == 40_000
assert real_tokens < comp.threshold_tokens, (
"Should NOT trigger compression — only prompt tokens matter"
)
def test_high_prompt_tokens_should_trigger_compression(self):
"""When prompt tokens genuinely exceed the threshold, compress."""
real_tokens, comp = _make_agent_stub(
prompt_tokens=110_000,
completion_tokens=5_000,
threshold_tokens=100_000,
)
assert real_tokens == 110_000
assert real_tokens >= comp.threshold_tokens, (
"Should trigger compression — prompt tokens exceed threshold"
)
def test_zero_prompt_tokens_falls_back(self):
"""When provider returns 0 prompt tokens, real_tokens is 0 (fallback path)."""
real_tokens, _ = _make_agent_stub(
prompt_tokens=0,
completion_tokens=50_000,
threshold_tokens=100_000,
)
assert real_tokens == 0
@@ -1,107 +0,0 @@
"""Tests for _repair_tool_call_arguments — malformed JSON repair pipeline."""
import json
import pytest
from run_agent import _repair_tool_call_arguments
class TestRepairToolCallArguments:
"""Verify each repair stage in the pipeline."""
# -- Stage 1: empty / whitespace-only --
def test_empty_string_returns_empty_object(self):
assert _repair_tool_call_arguments("", "t") == "{}"
def test_whitespace_only_returns_empty_object(self):
assert _repair_tool_call_arguments(" \n\t ", "t") == "{}"
def test_none_type_returns_empty_object(self):
"""Non-string input (e.g. None from a broken model response)."""
assert _repair_tool_call_arguments(None, "t") == "{}"
# -- Stage 2: Python None literal --
def test_python_none_literal(self):
assert _repair_tool_call_arguments("None", "t") == "{}"
def test_python_none_with_whitespace(self):
assert _repair_tool_call_arguments(" None ", "t") == "{}"
# -- Stage 3: trailing comma repair --
def test_trailing_comma_in_object(self):
result = _repair_tool_call_arguments('{"key": "value",}', "t")
assert json.loads(result) == {"key": "value"}
def test_trailing_comma_in_array(self):
result = _repair_tool_call_arguments('{"a": [1, 2,]}', "t")
parsed = json.loads(result)
assert parsed == {"a": [1, 2]}
def test_multiple_trailing_commas(self):
result = _repair_tool_call_arguments('{"a": 1, "b": 2,}', "t")
parsed = json.loads(result)
assert parsed["a"] == 1
assert parsed["b"] == 2
# -- Stage 4: unclosed brackets --
def test_unclosed_brace(self):
result = _repair_tool_call_arguments('{"key": "value"', "t")
parsed = json.loads(result)
assert parsed == {"key": "value"}
def test_unclosed_bracket_and_brace(self):
result = _repair_tool_call_arguments('{"a": [1, 2', "t")
# Bracket counting adds ']' then '}', producing {"a": [1, 2]}
# which is valid JSON. But the naive count can't always recover
# complex nesting — verify we at least get valid JSON.
json.loads(result)
# -- Stage 5: excess closing delimiters --
def test_extra_closing_brace(self):
result = _repair_tool_call_arguments('{"key": "value"}}', "t")
parsed = json.loads(result)
assert parsed == {"key": "value"}
def test_extra_closing_bracket(self):
result = _repair_tool_call_arguments('{"a": [1]]}', "t")
# Should produce valid JSON
json.loads(result)
# -- Stage 6: last resort --
def test_unrepairable_garbage_returns_empty_object(self):
assert _repair_tool_call_arguments("totally not json", "t") == "{}"
def test_unrepairable_partial_returns_empty_object(self):
# Truncated in the middle of a string key — bracket closing won't help
assert _repair_tool_call_arguments('{"truncated": "val', "t") == "{}"
# -- Valid JSON passthrough (this path is via except, but still works) --
def test_already_valid_json_passes_through(self):
"""When json.loads fails for a non-JSON reason (shouldn't normally
happen), but the repair pipeline still produces valid output."""
raw = '{"path": "/tmp/foo", "content": "hello"}'
result = _repair_tool_call_arguments(raw, "t")
parsed = json.loads(result)
assert parsed["path"] == "/tmp/foo"
# -- Combined repairs --
def test_trailing_comma_plus_unclosed_brace(self):
result = _repair_tool_call_arguments('{"a": 1, "b": 2,', "t")
# Trailing comma stripped first, then closing brace added.
# May or may not fully recover — verify valid JSON at minimum.
json.loads(result)
def test_real_world_glm_truncation(self):
"""Simulates GLM-5.1 truncating mid-argument."""
raw = '{"command": "ls -la /tmp", "timeout": 30, "background":'
result = _repair_tool_call_arguments(raw, "terminal")
# Should at least be valid JSON, even if background is lost
json.loads(result)
@@ -1,103 +0,0 @@
"""Regression tests for run_conversation's prologue handling of multimodal content.
PR #5621 and earlier multimodal PRs hit an ``AttributeError`` in
``run_agent.run_conversation`` because the prologue unconditionally called
``user_message[:80] + "..."`` / ``.replace()`` / ``_safe_print(f"...{user_message[:60]}")``
on what was now a list. These tests cover the two fixes:
1. ``_summarize_user_message_for_log`` accepts strings, lists, and ``None``.
2. ``_chat_content_to_responses_parts`` converts chat-style content to the
Responses API ``input_text`` / ``input_image`` shape.
They do NOT boot the full AIAgent the prologue-fix guarantees are pure
function contracts at module scope.
"""
from run_agent import _chat_content_to_responses_parts, _summarize_user_message_for_log
class TestSummarizeUserMessageForLog:
def test_plain_string_passthrough(self):
assert _summarize_user_message_for_log("hello world") == "hello world"
def test_none_returns_empty_string(self):
assert _summarize_user_message_for_log(None) == ""
def test_text_only_list(self):
content = [{"type": "text", "text": "hi"}, {"type": "text", "text": "there"}]
assert _summarize_user_message_for_log(content) == "hi there"
def test_list_with_image_only(self):
content = [{"type": "image_url", "image_url": {"url": "https://x"}}]
# Image-only: "[1 image]" marker, no trailing space.
assert _summarize_user_message_for_log(content) == "[1 image]"
def test_list_with_text_and_image(self):
content = [
{"type": "text", "text": "describe this"},
{"type": "image_url", "image_url": {"url": "https://x"}},
]
summary = _summarize_user_message_for_log(content)
assert "[1 image]" in summary
assert "describe this" in summary
def test_list_with_multiple_images(self):
content = [
{"type": "text", "text": "compare these"},
{"type": "image_url", "image_url": {"url": "a"}},
{"type": "image_url", "image_url": {"url": "b"}},
]
summary = _summarize_user_message_for_log(content)
assert "[2 images]" in summary
def test_scalar_fallback(self):
assert _summarize_user_message_for_log(42) == "42"
def test_list_supports_slice_and_replace(self):
"""The whole point of this helper: its output must be a plain str."""
content = [{"type": "text", "text": "x" * 200}, {"type": "image_url", "image_url": {"url": "y"}}]
summary = _summarize_user_message_for_log(content)
# These are the operations the run_conversation prologue performs.
_ = summary[:80] + "..."
_ = summary.replace("\n", " ")
class TestChatContentToResponsesParts:
def test_non_list_returns_empty(self):
assert _chat_content_to_responses_parts("hi") == []
assert _chat_content_to_responses_parts(None) == []
def test_text_parts_become_input_text(self):
content = [{"type": "text", "text": "hello"}]
assert _chat_content_to_responses_parts(content) == [{"type": "input_text", "text": "hello"}]
def test_image_url_object_becomes_input_image(self):
content = [{"type": "image_url", "image_url": {"url": "https://x", "detail": "high"}}]
assert _chat_content_to_responses_parts(content) == [
{"type": "input_image", "image_url": "https://x", "detail": "high"},
]
def test_bare_string_image_url(self):
content = [{"type": "image_url", "image_url": "https://x"}]
assert _chat_content_to_responses_parts(content) == [{"type": "input_image", "image_url": "https://x"}]
def test_responses_format_passthrough(self):
"""Input already in Responses format should round-trip cleanly."""
content = [
{"type": "input_text", "text": "hi"},
{"type": "input_image", "image_url": "https://x"},
]
assert _chat_content_to_responses_parts(content) == [
{"type": "input_text", "text": "hi"},
{"type": "input_image", "image_url": "https://x"},
]
def test_unknown_parts_skipped(self):
"""Unknown types shouldn't crash — filtered silently at this level
(the API server's normalizer rejects them earlier)."""
content = [{"type": "text", "text": "ok"}, {"type": "audio", "x": "y"}]
assert _chat_content_to_responses_parts(content) == [{"type": "input_text", "text": "ok"}]
def test_empty_url_image_skipped(self):
content = [{"type": "image_url", "image_url": {"url": ""}}]
assert _chat_content_to_responses_parts(content) == []
+1 -30
View File
@@ -12,7 +12,7 @@ import pytest
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from batch_runner import BatchRunner, _process_batch_worker
from batch_runner import BatchRunner
@pytest.fixture
@@ -157,32 +157,3 @@ class TestResumePreservesProgress:
assert checkpoint_data["completed_prompts"] == []
assert checkpoint_data["run_name"] == "test_run"
class TestBatchWorkerResumeBehavior:
def test_discarded_no_reasoning_prompts_are_marked_completed(self, tmp_path, monkeypatch):
batch_file = tmp_path / "batch_1.jsonl"
prompt_result = {
"success": True,
"trajectory": [{"role": "assistant", "content": "x"}],
"reasoning_stats": {"has_any_reasoning": False},
"tool_stats": {},
"metadata": {},
"completed": True,
"api_calls": 1,
"toolsets_used": [],
}
monkeypatch.setattr("batch_runner._process_single_prompt", lambda *args, **kwargs: prompt_result)
result = _process_batch_worker((
1,
[(0, {"prompt": "hi"})],
tmp_path,
set(),
{"verbose": False},
))
assert result["discarded_no_reasoning"] == 1
assert result["completed_prompts"] == [0]
assert not batch_file.exists() or batch_file.read_text() == ""
-19
View File
@@ -365,25 +365,6 @@ class TestFTS5Search:
assert isinstance(results[0]["context"], list)
assert len(results[0]["context"]) > 0
def test_search_context_uses_session_neighbors_when_ids_are_interleaved(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message("s1", role="user", content="before needle")
db.append_message("s2", role="user", content="other session message")
db.append_message("s1", role="assistant", content="needle match")
db.append_message("s2", role="assistant", content="another other session message")
db.append_message("s1", role="user", content="after needle")
results = db.search_messages('"needle match"')
needle_result = next(r for r in results if r["session_id"] == "s1" and "needle match" in r["snippet"])
assert [msg["content"] for msg in needle_result["context"]] == [
"before needle",
"needle match",
"after needle",
]
def test_search_special_chars_do_not_crash(self, db):
"""FTS5 special characters in queries must not raise OperationalError."""
db.create_session(session_id="s1", source="cli")
-64
View File
@@ -245,67 +245,3 @@ class TestTranscribeAudio:
result = transcribe_audio("/nonexistent/file.ogg")
assert result["success"] is False
assert "not found" in result["error"]
# ---------------------------------------------------------------------------
# Model name normalisation for local providers
# ---------------------------------------------------------------------------
class TestNormalizeLocalModel:
"""_normalize_local_model() maps cloud-only names to the local default."""
def test_openai_model_name_maps_to_default(self):
from tools.transcription_tools import _normalize_local_model, DEFAULT_LOCAL_MODEL
assert _normalize_local_model("whisper-1") == DEFAULT_LOCAL_MODEL
def test_groq_model_name_maps_to_default(self):
from tools.transcription_tools import _normalize_local_model, DEFAULT_LOCAL_MODEL
assert _normalize_local_model("whisper-large-v3-turbo") == DEFAULT_LOCAL_MODEL
def test_valid_local_model_preserved(self):
from tools.transcription_tools import _normalize_local_model
for size in ("tiny", "base", "small", "medium", "large-v3"):
assert _normalize_local_model(size) == size
def test_none_maps_to_default(self):
from tools.transcription_tools import _normalize_local_model, DEFAULT_LOCAL_MODEL
assert _normalize_local_model(None) == DEFAULT_LOCAL_MODEL
def test_warning_emitted_for_cloud_model(self, caplog):
import logging
from tools.transcription_tools import _normalize_local_model
with caplog.at_level(logging.WARNING, logger="tools.transcription_tools"):
_normalize_local_model("whisper-1")
assert any("whisper-1" in r.message for r in caplog.records)
def test_local_transcribe_normalises_model(self):
"""transcribe_audio with local provider must not pass 'whisper-1' to WhisperModel."""
import tempfile, os
from unittest.mock import MagicMock, patch
with tempfile.NamedTemporaryFile(suffix=".ogg", delete=False) as f:
f.write(b"x")
audio_file = f.name
try:
mock_model = MagicMock()
mock_model.transcribe.return_value = (iter([]), MagicMock(language="en", duration=1.0))
with patch("tools.transcription_tools._HAS_FASTER_WHISPER", True), \
patch("tools.transcription_tools._load_stt_config", return_value={
"enabled": True,
"provider": "local",
"local": {"model": "whisper-1"},
}), \
patch("tools.transcription_tools._local_model", None), \
patch("tools.transcription_tools._local_model_name", None), \
patch("faster_whisper.WhisperModel", return_value=mock_model) as mock_cls:
from tools.transcription_tools import transcribe_audio
transcribe_audio(audio_file)
# WhisperModel must NOT have been called with "whisper-1"
call_args = mock_cls.call_args
assert call_args is not None
assert call_args[0][0] != "whisper-1", (
"WhisperModel was called with the cloud-only name 'whisper-1'"
)
finally:
os.unlink(audio_file)
+2 -23
View File
@@ -154,31 +154,12 @@ def _has_local_command() -> bool:
return _get_local_command_template() is not None
def _normalize_local_model(model_name: Optional[str]) -> str:
"""Return a valid faster-whisper model size, mapping cloud-only names to the default.
Cloud providers like OpenAI use names such as ``whisper-1`` which are not
valid for faster-whisper (which expects ``tiny``, ``base``, ``small``,
``medium``, or ``large-v*``). When such a name is detected we fall back to
the default local model and emit a warning so the user knows what happened.
"""
def _normalize_local_command_model(model_name: Optional[str]) -> str:
if not model_name or model_name in OPENAI_MODELS or model_name in GROQ_MODELS:
if model_name and (model_name in OPENAI_MODELS or model_name in GROQ_MODELS):
logger.warning(
"STT model '%s' is a cloud-only name and cannot be used with the local "
"provider. Falling back to '%s'. Set stt.local.model to a valid "
"faster-whisper size (tiny, base, small, medium, large-v3).",
model_name,
DEFAULT_LOCAL_MODEL,
)
return DEFAULT_LOCAL_MODEL
return model_name
def _normalize_local_command_model(model_name: Optional[str]) -> str:
return _normalize_local_model(model_name)
def _get_provider(stt_config: dict) -> str:
"""Determine which STT provider to use.
@@ -615,9 +596,7 @@ def transcribe_audio(file_path: str, model: Optional[str] = None) -> Dict[str, A
if provider == "local":
local_cfg = stt_config.get("local", {})
model_name = _normalize_local_model(
model or local_cfg.get("model", DEFAULT_LOCAL_MODEL)
)
model_name = model or local_cfg.get("model", DEFAULT_LOCAL_MODEL)
return _transcribe_local(file_path, model_name)
if provider == "local_command":
-69
View File
@@ -1,69 +0,0 @@
import { describe, expect, it, vi } from 'vitest'
import { readClipboardText, writeClipboardText } from '../lib/clipboard.js'
describe('readClipboardText', () => {
it('does nothing off macOS', async () => {
const run = vi.fn()
await expect(readClipboardText('linux', run)).resolves.toBeNull()
expect(run).not.toHaveBeenCalled()
})
it('reads text from pbpaste on macOS', async () => {
const run = vi.fn().mockResolvedValue({ stdout: 'hello world\n' })
await expect(readClipboardText('darwin', run)).resolves.toBe('hello world\n')
expect(run).toHaveBeenCalledWith('pbpaste', [], expect.objectContaining({ encoding: 'utf8', windowsHide: true }))
})
it('returns null when pbpaste fails', async () => {
const run = vi.fn().mockRejectedValue(new Error('pbpaste failed'))
await expect(readClipboardText('darwin', run)).resolves.toBeNull()
})
})
describe('writeClipboardText', () => {
it('does nothing off macOS', async () => {
const start = vi.fn()
await expect(writeClipboardText('hello', 'linux', start)).resolves.toBe(false)
expect(start).not.toHaveBeenCalled()
})
it('writes text to pbcopy on macOS', async () => {
const stdin = { end: vi.fn() }
const child = {
once: vi.fn((event: string, cb: (code?: number) => void) => {
if (event === 'close') {
cb(0)
}
return child
}),
stdin
}
const start = vi.fn().mockReturnValue(child)
await expect(writeClipboardText('hello world', 'darwin', start as any)).resolves.toBe(true)
expect(start).toHaveBeenCalledWith('pbcopy', [], expect.objectContaining({ stdio: ['pipe', 'ignore', 'ignore'], windowsHide: true }))
expect(stdin.end).toHaveBeenCalledWith('hello world')
})
it('returns false when pbcopy fails', async () => {
const child = {
once: vi.fn((event: string, cb: () => void) => {
if (event === 'error') {
cb()
}
return child
}),
stdin: { end: vi.fn() }
}
const start = vi.fn().mockReturnValue(child)
await expect(writeClipboardText('hello world', 'darwin', start as any)).resolves.toBe(false)
})
})
+12 -18
View File
@@ -7,8 +7,7 @@ import type {
SudoRespondResponse,
VoiceRecordResponse
} from '../gatewayTypes.js'
import { isAction, isMac } from '../lib/platform.js'
import { writeOsc52Clipboard } from '../lib/osc52.js'
import { getInputSelection } from './inputSelectionStore.js'
import type { InputHandlerContext, InputHandlerResult } from './interfaces.js'
@@ -28,8 +27,6 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
const pagerPageSize = Math.max(5, (terminal.stdout?.rows ?? 24) - 6)
const copySelection = () => {
// ink's copySelection() already calls setClipboard() which handles
// pbcopy (macOS), wl-copy/xclip (Linux), tmux, and OSC 52 fallback.
const text = terminal.selection.copySelection()
if (text) {
@@ -227,6 +224,10 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
return terminal.scrollWithSelection(key.pageUp ? -step : step)
}
if (key.ctrl && key.shift && ch.toLowerCase() === 'c') {
return copySelection()
}
if (key.escape && terminal.hasSelection) {
return clearSelection()
}
@@ -243,7 +244,7 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
return
}
if (isAction(key, ch, 'c')) {
if (isCtrl(key, ch, 'c')) {
if (terminal.hasSelection) {
return copySelection()
}
@@ -251,19 +252,12 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
const inputSel = getInputSelection()
if (inputSel && inputSel.end > inputSel.start) {
writeOsc52Clipboard(inputSel.value.slice(inputSel.start, inputSel.end))
inputSel.clear()
return
}
// On macOS, Cmd+C with no selection is a no-op (Ctrl+C below handles interrupt).
// On non-macOS, isAction uses Ctrl, so fall through to interrupt/clear/exit.
if (isMac) {
return
}
}
if (key.ctrl && ch.toLowerCase() === 'c') {
if (live.busy && live.sid) {
return turnController.interruptTurn({
appendMessage: actions.appendMessage,
@@ -280,11 +274,11 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
return actions.die()
}
if (isAction(key, ch, 'd')) {
if (isCtrl(key, ch, 'd')) {
return actions.die()
}
if (isAction(key, ch, 'l')) {
if (isCtrl(key, ch, 'l')) {
if (actions.guardBusySessionSwitch()) {
return
}
@@ -294,11 +288,11 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
return actions.newSession()
}
if (isAction(key, ch, 'b')) {
if (isCtrl(key, ch, 'b')) {
return voice.recording ? voiceStop() : voiceStart()
}
if (isAction(key, ch, 'g')) {
if (isCtrl(key, ch, 'g')) {
return cActions.openEditor()
}
@@ -317,7 +311,7 @@ export function useInputHandlers(ctx: InputHandlerContext): InputHandlerResult {
return
}
if (isAction(key, ch, 'k') && cRefs.queueRef.current.length && live.sid) {
if (isCtrl(key, ch, 'k') && cRefs.queueRef.current.length && live.sid) {
const next = cActions.dequeue()
if (next) {
+1 -4
View File
@@ -5,7 +5,6 @@ import type { Theme } from '../theme.js'
import type { ApprovalReq, ClarifyReq, ConfirmReq } from '../types.js'
import { TextInput } from './textInput.js'
import { isMac } from '../lib/platform.js'
const OPTS = ['once', 'session', 'always', 'deny'] as const
const LABELS = { always: 'Always allow', deny: 'Deny', once: 'Allow once', session: 'Allow this session' } as const
@@ -129,9 +128,7 @@ export function ClarifyPrompt({ cols = 80, onAnswer, onCancel, req, t }: Clarify
<TextInput columns={Math.max(20, cols - 6)} onChange={setCustom} onSubmit={onAnswer} value={custom} />
</Box>
<Text color={t.color.dim}>
Enter send · Esc {choices.length ? 'back' : 'cancel'} · {isMac ? 'Cmd+C copy · Cmd+V paste · Ctrl+C cancel' : 'Ctrl+C cancel'}
</Text>
<Text color={t.color.dim}>Enter send · Esc {choices.length ? 'back' : 'cancel'} · Ctrl+C cancel</Text>
</Box>
)
}
+12 -54
View File
@@ -3,8 +3,6 @@ import * as Ink from '@hermes/ink'
import { useEffect, useMemo, useRef, useState } from 'react'
import { setInputSelection } from '../app/inputSelectionStore.js'
import { readClipboardText, writeClipboardText } from '../lib/clipboard.js'
import { isActionMod, isMac } from '../lib/platform.js'
type InkExt = typeof Ink & {
stringWidth: (s: string) => number
@@ -486,52 +484,12 @@ export function TextInput({
const ins = (v: string, c: number, s: string) => v.slice(0, c) + s + v.slice(c)
const pastePlainText = (text: string) => {
const cleaned = text.replace(/\r\n/g, '\n').replace(/\r/g, '\n')
if (!cleaned) {
return
}
const range = selRange()
const nextValue = range
? vRef.current.slice(0, range.start) + cleaned + vRef.current.slice(range.end)
: vRef.current.slice(0, curRef.current) + cleaned + vRef.current.slice(curRef.current)
const nextCursor = range ? range.start + cleaned.length : curRef.current + cleaned.length
commit(nextValue, nextCursor)
}
useInput(
(inp: string, k: Key, event: InputEvent) => {
const eventRaw = event.keypress.raw
if (eventRaw === '\x1bv' || eventRaw === '\x1bV' || eventRaw === '\x16' || (isMac && k.meta && inp.toLowerCase() === 'v')) {
if (cbPaste.current) {
return void emitPaste({ cursor: curRef.current, hotkey: true, text: '', value: vRef.current })
}
if (isMac) {
void readClipboardText().then(text => {
if (text) {
pastePlainText(text)
}
})
}
return
}
if (isMac && k.meta && inp.toLowerCase() === 'c') {
const range = selRange()
if (range) {
const text = vRef.current.slice(range.start, range.end)
void writeClipboardText(text)
}
return
if (eventRaw === '\x1bv' || eventRaw === '\x1bV' || eventRaw === '\x16') {
return void emitPaste({ cursor: curRef.current, hotkey: true, text: '', value: vRef.current })
}
if (
@@ -557,26 +515,26 @@ export function TextInput({
let c = curRef.current
let v = vRef.current
const mod = isActionMod(k)
const mod = k.ctrl || k.meta
const range = selRange()
const delFwd = k.delete || fwdDel.current
if (mod && inp === 'z') {
if (k.ctrl && inp === 'z') {
return swap(undo, redo)
}
if ((mod && inp === 'y') || (mod && k.shift && inp === 'z')) {
if ((k.ctrl && inp === 'y') || (k.meta && k.shift && inp === 'z')) {
return swap(redo, undo)
}
if (mod && inp === 'a') {
if (k.ctrl && inp === 'a') {
return selectAll()
}
if (k.home) {
clearSel()
c = 0
} else if (k.end || (mod && inp === 'e')) {
} else if (k.end || (k.ctrl && inp === 'e')) {
clearSel()
c = v.length
} else if (k.leftArrow) {
@@ -595,10 +553,10 @@ export function TextInput({
clearSel()
c = mod ? wordRight(v, c) : nextPos(v, c)
}
} else if (mod && inp === 'b') {
} else if (k.meta && inp === 'b') {
clearSel()
c = wordLeft(v, c)
} else if (mod && inp === 'f') {
} else if (k.meta && inp === 'f') {
clearSel()
c = wordRight(v, c)
} else if (range && (k.backspace || delFwd)) {
@@ -621,7 +579,7 @@ export function TextInput({
} else {
v = v.slice(0, c) + v.slice(nextPos(v, c))
}
} else if (mod && inp === 'w') {
} else if (k.ctrl && inp === 'w') {
if (range) {
v = v.slice(0, range.start) + v.slice(range.end)
c = range.start
@@ -633,7 +591,7 @@ export function TextInput({
} else {
return
}
} else if (mod && inp === 'u') {
} else if (k.ctrl && inp === 'u') {
if (range) {
v = v.slice(0, range.start) + v.slice(range.end)
c = range.start
@@ -641,7 +599,7 @@ export function TextInput({
v = v.slice(c)
c = 0
}
} else if (mod && inp === 'k') {
} else if (k.ctrl && inp === 'k') {
if (range) {
v = v.slice(0, range.start) + v.slice(range.end)
c = range.start
+10 -22
View File
@@ -1,28 +1,16 @@
import { isMac } from '../lib/platform.js'
const action = isMac ? 'Cmd' : 'Ctrl'
const paste = isMac ? 'Cmd' : 'Alt'
export const HOTKEYS: [string, string][] = [
...(
isMac
? ([
['Cmd+C', 'copy selection'],
['Ctrl+C', 'interrupt / clear draft / exit']
] as [string, string][])
: ([['Ctrl+C', 'copy selection / interrupt / clear draft / exit']] as [string, string][])
),
[action + '+D', 'exit'],
[action + '+G', 'open $EDITOR for prompt'],
[action + '+L', 'new session (clear)'],
[paste + '+V / /paste', 'paste clipboard image'],
['Ctrl+C', 'interrupt / clear draft / exit'],
['Ctrl+D', 'exit'],
['Ctrl+G', 'open $EDITOR for prompt'],
['Ctrl+L', 'new session (clear)'],
['Alt+V / /paste', 'paste clipboard image'],
['Tab', 'apply completion'],
['↑/↓', 'completions / queue edit / history'],
[action + '+A/E', 'home / end of line'],
[action + '+Z / ' + action + '+Y', 'undo / redo input edits'],
[action + '+W', 'delete word'],
[action + '+U/K', 'delete to start / end'],
[action + '+←/→', 'jump word'],
['Ctrl+A/E', 'home / end of line'],
['Ctrl+Z / Ctrl+Y', 'undo / redo input edits'],
['Ctrl+W', 'delete word'],
['Ctrl+U/K', 'delete to start / end'],
['Ctrl+←/→', 'jump word'],
['Home/End', 'start / end of line'],
['Shift+Enter / Alt+Enter', 'insert newline'],
['\\+Enter', 'multi-line continuation (fallback)'],
-58
View File
@@ -1,58 +0,0 @@
import { execFile, spawn } from 'node:child_process'
import { promisify } from 'node:util'
const execFileAsync = promisify(execFile)
/**
* Read plain text from the system clipboard.
*
* On macOS this uses `pbpaste`. On other platforms we intentionally return
* null for now; the TUI's text-paste hotkeys are primarily targeted at the
* macOS clarify/input flow.
*/
export async function readClipboardText(
platform: NodeJS.Platform = process.platform,
run: typeof execFileAsync = execFileAsync
): Promise<string | null> {
if (platform !== 'darwin') {
return null
}
try {
const result = await run('pbpaste', [], { encoding: 'utf8', windowsHide: true })
return typeof result.stdout === 'string' ? result.stdout : null
} catch {
return null
}
}
/**
* Write plain text to the system clipboard.
*
* On macOS this uses `pbcopy`. On other platforms we intentionally return
* false for now; non-mac copy still falls back to OSC52.
*/
export async function writeClipboardText(
text: string,
platform: NodeJS.Platform = process.platform,
start: typeof spawn = spawn
): Promise<boolean> {
if (platform !== 'darwin') {
return false
}
try {
const ok = await new Promise<boolean>(resolve => {
const child = start('pbcopy', [], { stdio: ['pipe', 'ignore', 'ignore'], windowsHide: true })
child.once('error', () => resolve(false))
child.once('close', code => resolve(code === 0))
child.stdin.end(text)
})
return ok
} catch {
return false
}
}
-15
View File
@@ -1,15 +0,0 @@
/** Platform-aware keybinding helpers.
*
* On macOS the "action" modifier is Cmd (key.meta in Ink), on other platforms
* it is Ctrl. Ctrl+C is ALWAYS the interrupt key regardless of platform it
* must never be remapped to copy.
*/
export const isMac = process.platform === 'darwin'
/** True when the platform action-modifier is pressed (Cmd on macOS, Ctrl elsewhere). */
export const isActionMod = (key: { ctrl: boolean; meta: boolean }): boolean => (isMac ? key.meta : key.ctrl)
/** Match action-modifier + a single character (case-insensitive). */
export const isAction = (key: { ctrl: boolean; meta: boolean }, ch: string, target: string): boolean =>
isActionMod(key) && ch.toLowerCase() === target
-5
View File
@@ -115,11 +115,6 @@ export const en: Translations = {
dailyTokenUsage: "Daily Token Usage",
dailyBreakdown: "Daily Breakdown",
perModelBreakdown: "Per-Model Breakdown",
topSkills: "Top Skills",
skill: "Skill",
loads: "Agent Loaded",
edits: "Agent Managed",
lastUsed: "Last Used",
input: "Input",
output: "Output",
total: "Total",
-5
View File
@@ -120,11 +120,6 @@ export interface Translations {
dailyTokenUsage: string;
dailyBreakdown: string;
perModelBreakdown: string;
topSkills: string;
skill: string;
loads: string;
edits: string;
lastUsed: string;
input: string;
output: string;
total: string;
-5
View File
@@ -115,11 +115,6 @@ export const zh: Translations = {
dailyTokenUsage: "每日 Token 用量",
dailyBreakdown: "每日明细",
perModelBreakdown: "模型用量明细",
topSkills: "常用技能",
skill: "技能",
loads: "代理加载",
edits: "代理管理",
lastUsed: "最近使用",
input: "输入",
output: "输出",
total: "总计",
-20
View File
@@ -300,22 +300,6 @@ export interface AnalyticsModelEntry {
sessions: number;
}
export interface AnalyticsSkillEntry {
skill: string;
view_count: number;
manage_count: number;
total_count: number;
percentage: number;
last_used_at: number | null;
}
export interface AnalyticsSkillsSummary {
total_skill_loads: number;
total_skill_edits: number;
total_skill_actions: number;
distinct_skills_used: number;
}
export interface AnalyticsResponse {
daily: AnalyticsDailyEntry[];
by_model: AnalyticsModelEntry[];
@@ -328,10 +312,6 @@ export interface AnalyticsResponse {
total_actual_cost: number;
total_sessions: number;
};
skills: {
summary: AnalyticsSkillsSummary;
top_skills: AnalyticsSkillEntry[];
};
}
export interface CronJob {
+2 -51
View File
@@ -1,14 +1,12 @@
import { useEffect, useState, useCallback } from "react";
import {
BarChart3,
Brain,
Cpu,
Hash,
TrendingUp,
} from "lucide-react";
import { api } from "@/lib/api";
import type { AnalyticsResponse, AnalyticsDailyEntry, AnalyticsModelEntry, AnalyticsSkillEntry } from "@/lib/api";
import { timeAgo } from "@/lib/utils";
import type { AnalyticsResponse, AnalyticsDailyEntry, AnalyticsModelEntry } from "@/lib/api";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Button } from "@/components/ui/button";
import { useI18n } from "@/i18n";
@@ -229,52 +227,6 @@ function ModelTable({ models }: { models: AnalyticsModelEntry[] }) {
);
}
function SkillTable({ skills }: { skills: AnalyticsSkillEntry[] }) {
const { t } = useI18n();
if (skills.length === 0) return null;
return (
<Card>
<CardHeader>
<div className="flex items-center gap-2">
<Brain className="h-5 w-5 text-muted-foreground" />
<CardTitle className="text-base">{t.analytics.topSkills}</CardTitle>
</div>
</CardHeader>
<CardContent>
<div className="overflow-x-auto">
<table className="w-full text-sm">
<thead>
<tr className="border-b border-border text-muted-foreground text-xs">
<th className="text-left py-2 pr-4 font-medium">{t.analytics.skill}</th>
<th className="text-right py-2 px-4 font-medium">{t.analytics.loads}</th>
<th className="text-right py-2 px-4 font-medium">{t.analytics.edits}</th>
<th className="text-right py-2 px-4 font-medium">{t.analytics.total}</th>
<th className="text-right py-2 pl-4 font-medium">{t.analytics.lastUsed}</th>
</tr>
</thead>
<tbody>
{skills.map((skill) => (
<tr key={skill.skill} className="border-b border-border/50 hover:bg-secondary/20 transition-colors">
<td className="py-2 pr-4">
<span className="font-mono-ui text-xs">{skill.skill}</span>
</td>
<td className="text-right py-2 px-4 text-muted-foreground">{skill.view_count}</td>
<td className="text-right py-2 px-4 text-muted-foreground">{skill.manage_count}</td>
<td className="text-right py-2 px-4">{skill.total_count}</td>
<td className="text-right py-2 pl-4 text-muted-foreground">
{skill.last_used_at ? timeAgo(skill.last_used_at) : "—"}
</td>
</tr>
))}
</tbody>
</table>
</div>
</CardContent>
</Card>
);
}
export default function AnalyticsPage() {
const [days, setDays] = useState(30);
const [data, setData] = useState<AnalyticsResponse | null>(null);
@@ -358,11 +310,10 @@ export default function AnalyticsPage() {
{/* Tables */}
<DailyTable daily={data.daily} />
<ModelTable models={data.by_model} />
<SkillTable skills={data.skills.top_skills} />
</>
)}
{data && data.daily.length === 0 && data.by_model.length === 0 && data.skills.top_skills.length === 0 && (
{data && data.daily.length === 0 && data.by_model.length === 0 && (
<Card>
<CardContent className="py-12">
<div className="flex flex-col items-center text-muted-foreground">
@@ -13,15 +13,12 @@ description: "Build an automated AI code reviewer that monitors your repos, revi
**What you'll build:**
```
┌───────────────────────────────────────────────────────────────────┐
Cron Timer ──▶ Hermes Agent ──▶ GitHub API ──▶ Review
(every 2h) + gh CLI (PR diffs) delivery
│ + skill (Telegram,
│ + memory Discord, │
│ local) │
│ │
└───────────────────────────────────────────────────────────────────┘
┌──────────────┐ ┌───────────────┐ ┌──────────────┐ ┌──────────────┐
Cron Timer │────▶│ Hermes Agent │────▶│ GitHub API │────▶│ Review to
(every 2h) │ │ + gh CLI │ │ (PR diffs) │ │ Telegram/
+ skill │ │ Discord/
+ memory │ │ │ │ local file
└──────────────┘ └───────────────┘ └──────────────┘ └──────────────┘
```
This guide uses **cron jobs** to poll for PRs on a schedule — no server or public endpoint needed. Works behind NAT and firewalls.
@@ -110,7 +110,7 @@ The largest optional category — covers the full ML pipeline from data curation
| **llava** | Large Language and Vision Assistant — visual instruction tuning and image-based conversations combining CLIP vision with LLaMA language models. |
| **modal** | Serverless GPU cloud platform for running ML workloads. On-demand GPU access without infrastructure management, ML model deployment as APIs, or batch jobs with automatic scaling. |
| **nemo-curator** | GPU-accelerated data curation for LLM training. Fuzzy deduplication (16x faster), quality filtering (30+ heuristics), semantic dedup, PII redaction. Scales with RAPIDS. |
| **peft-fine-tuning** | Parameter-efficient fine-tuning for LLMs using LoRA, QLoRA, and 25+ methods. Train `<1%` of parameters with minimal accuracy loss for 7B70B models on limited GPU memory. HuggingFace's official PEFT library. |
| **peft-fine-tuning** | Parameter-efficient fine-tuning for LLMs using LoRA, QLoRA, and 25+ methods. Train <1% of parameters with minimal accuracy loss for 7B70B models on limited GPU memory. HuggingFace's official PEFT library. |
| **pinecone** | Managed vector database for production AI. Auto-scaling, hybrid search (dense + sparse), metadata filtering, and low latency (under 100ms p95). |
| **pytorch-fsdp** | Expert guidance for Fully Sharded Data Parallel training with PyTorch FSDP — parameter sharding, mixed precision, CPU offloading, FSDP2. |
| **pytorch-lightning** | High-level PyTorch framework with Trainer class, automatic distributed training (DDP/FSDP/DeepSpeed), callbacks, and minimal boilerplate. |
+1 -39
View File
@@ -83,25 +83,6 @@ Standard OpenAI Chat Completions format. Stateless — the full conversation is
}
```
**Inline image input:** user messages may send `content` as an array of `text` and `image_url` parts. Both remote `http(s)` URLs and `data:image/...` URLs are supported:
```json
{
"model": "hermes-agent",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What is in this image?"},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png", "detail": "high"}}
]
}
]
}
```
Uploaded files (`file` / `input_file` / `file_id`) and non-image `data:` URLs return `400 unsupported_content_type`.
**Streaming** (`"stream": true`): Returns Server-Sent Events (SSE) with token-by-token response chunks. For **Chat Completions**, the stream uses standard `chat.completion.chunk` events plus Hermes' custom `hermes.tool.progress` event for tool-start UX. For **Responses**, the stream uses OpenAI Responses event types such as `response.created`, `response.output_text.delta`, `response.output_item.added`, `response.output_item.done`, and `response.completed`.
**Tool progress in streams**:
@@ -138,25 +119,6 @@ OpenAI Responses API format. Supports server-side conversation state via `previo
}
```
**Inline image input:** `input[].content` can contain `input_text` and `input_image` parts. Both remote URLs and `data:image/...` URLs are supported:
```json
{
"model": "hermes-agent",
"input": [
{
"role": "user",
"content": [
{"type": "input_text", "text": "Describe this screenshot."},
{"type": "input_image", "image_url": "data:image/png;base64,iVBORw0K..."}
]
}
]
}
```
Uploaded files (`input_file` / `file_id`) and non-image `data:` URLs return `400 unsupported_content_type`.
#### Multi-turn with previous_response_id
Chain responses to maintain full context (including tool calls) across turns:
@@ -368,7 +330,7 @@ In Open WebUI, add each as a separate connection. The model dropdown shows `alic
## Limitations
- **Response storage** — stored responses (for `previous_response_id`) are persisted in SQLite and survive gateway restarts. Max 100 stored responses (LRU eviction).
- **No file upload**inline images are supported on both `/v1/chat/completions` and `/v1/responses`, but uploaded files (`file`, `input_file`, `file_id`) and non-image document inputs are not supported through the API.
- **No file upload**vision/document analysis via uploaded files is not yet supported through the API.
- **Model field is cosmetic** — the `model` field in requests is accepted but the actual LLM model used is configured server-side in config.yaml.
## Proxy Mode
@@ -1,117 +0,0 @@
---
sidebar_position: 12
sidebar_label: "Built-in Plugins"
title: "Built-in Plugins"
description: "Plugins shipped with Hermes Agent that run automatically via lifecycle hooks — disk-cleanup and friends"
---
# Built-in Plugins
Hermes ships a small set of plugins bundled with the repository. They live under `<repo>/plugins/<name>/` and load automatically alongside user-installed plugins in `~/.hermes/plugins/`. They use the same plugin surface as third-party plugins — hooks, tools, slash commands — just maintained in-tree.
See the [Plugins](/docs/user-guide/features/plugins) page for the general plugin system, and [Build a Hermes Plugin](/docs/guides/build-a-hermes-plugin) to write your own.
## How discovery works
The `PluginManager` scans four sources, in order:
1. **Bundled**`<repo>/plugins/<name>/` (what this page documents)
2. **User**`~/.hermes/plugins/<name>/`
3. **Project**`./.hermes/plugins/<name>/` (requires `HERMES_ENABLE_PROJECT_PLUGINS=1`)
4. **Pip entry points**`hermes_agent.plugins`
On name collision, later sources win — a user plugin named `disk-cleanup` would replace the bundled one.
`plugins/memory/` and `plugins/context_engine/` are deliberately excluded from bundled scanning. Those directories use their own discovery paths because memory providers and context engines are single-select providers configured through `hermes memory setup` / `context.engine` in config.
## Bundled plugins are opt-in
Bundled plugins ship disabled. Discovery finds them (they appear in `hermes plugins list` and the interactive `hermes plugins` UI), but none load until you explicitly enable them:
```bash
hermes plugins enable disk-cleanup
```
Or via `~/.hermes/config.yaml`:
```yaml
plugins:
enabled:
- disk-cleanup
```
This is the same mechanism user-installed plugins use. Bundled plugins are never auto-enabled — not on fresh install, not for existing users upgrading to a newer Hermes. You always opt in explicitly.
To turn a bundled plugin off again:
```bash
hermes plugins disable disk-cleanup
# or: remove it from plugins.enabled in config.yaml
```
## Currently shipped
### disk-cleanup
Auto-tracks and removes ephemeral files created during sessions — test scripts, temp outputs, cron logs, stale chrome profiles — without requiring the agent to remember to call a tool.
**How it works:**
| Hook | Behaviour |
|---|---|
| `post_tool_call` | When `write_file` / `terminal` / `patch` creates a file matching `test_*`, `tmp_*`, or `*.test.*` inside `HERMES_HOME` or `/tmp/hermes-*`, track it silently as `test` / `temp` / `cron-output`. |
| `on_session_end` | If any test files were auto-tracked during the turn, run the safe `quick` cleanup and log a one-line summary. Stays silent otherwise. |
**Deletion rules:**
| Category | Threshold | Confirmation |
|---|---|---|
| `test` | every session end | Never |
| `temp` | >7 days since tracked | Never |
| `cron-output` | >14 days since tracked | Never |
| empty dirs under HERMES_HOME | always | Never |
| `research` | >30 days, beyond 10 newest | Always (deep only) |
| `chrome-profile` | >14 days since tracked | Always (deep only) |
| files >500 MB | never auto | Always (deep only) |
**Slash command** — `/disk-cleanup` available in both CLI and gateway sessions:
```
/disk-cleanup status # breakdown + top-10 largest
/disk-cleanup dry-run # preview without deleting
/disk-cleanup quick # run safe cleanup now
/disk-cleanup deep # quick + list items needing confirmation
/disk-cleanup track <path> <category> # manual tracking
/disk-cleanup forget <path> # stop tracking (does not delete)
```
**State** — everything lives at `$HERMES_HOME/disk-cleanup/`:
| File | Contents |
|---|---|
| `tracked.json` | Tracked paths with category, size, and timestamp |
| `tracked.json.bak` | Atomic-write backup of the above |
| `cleanup.log` | Append-only audit trail of every track / skip / reject / delete |
**Safety** — cleanup only ever touches paths under `HERMES_HOME` or `/tmp/hermes-*`. Windows mounts (`/mnt/c/...`) are rejected. Well-known top-level state dirs (`logs/`, `memories/`, `sessions/`, `cron/`, `cache/`, `skills/`, `plugins/`, `disk-cleanup/` itself) are never removed even when empty — a fresh install does not get gutted on first session end.
**Enabling:** `hermes plugins enable disk-cleanup` (or check the box in `hermes plugins`).
**Disabling again:** `hermes plugins disable disk-cleanup`.
## Adding a bundled plugin
Bundled plugins are written exactly like any other Hermes plugin — see [Build a Hermes Plugin](/docs/guides/build-a-hermes-plugin). The only differences are:
- Directory lives at `<repo>/plugins/<name>/` instead of `~/.hermes/plugins/<name>/`
- Manifest source is reported as `bundled` in `hermes plugins list`
- User plugins with the same name override the bundled version
A plugin is a good candidate for bundling when:
- It has no optional dependencies (or they're already `pip install .[all]` deps)
- The behaviour benefits most users and is opt-out rather than opt-in
- The logic ties into lifecycle hooks that the agent would otherwise have to remember to invoke
- It complements a core capability without expanding the model-visible tool surface
Counter-examples — things that should stay as user-installable plugins, not bundled: third-party integrations with API keys, niche workflows, large dependency trees, anything that would meaningfully change agent behaviour by default.
+15 -51
View File
@@ -95,40 +95,10 @@ Project-local plugins under `./.hermes/plugins/` are disabled by default. Enable
| Source | Path | Use case |
|--------|------|----------|
| Bundled | `<repo>/plugins/` | Ships with Hermes — see [Built-in Plugins](/docs/user-guide/features/built-in-plugins) |
| User | `~/.hermes/plugins/` | Personal plugins |
| Project | `.hermes/plugins/` | Project-specific plugins (requires `HERMES_ENABLE_PROJECT_PLUGINS=true`) |
| pip | `hermes_agent.plugins` entry_points | Distributed packages |
Later sources override earlier ones on name collision, so a user plugin with the same name as a bundled plugin replaces it.
## Plugins are opt-in
**Every plugin — user-installed, bundled, or pip — is disabled by default.** Discovery finds them (so they show up in `hermes plugins` and `/plugins`), but nothing loads until you add the plugin's name to `plugins.enabled` in `~/.hermes/config.yaml`. This stops anything with hooks or tools from running without your explicit consent.
```yaml
plugins:
enabled:
- my-tool-plugin
- disk-cleanup
disabled: # optional deny-list — always wins if a name appears in both
- noisy-plugin
```
Three ways to flip state:
```bash
hermes plugins # interactive toggle (space to check/uncheck)
hermes plugins enable <name> # add to allow-list
hermes plugins disable <name> # remove from allow-list + add to disabled
```
After `hermes plugins install owner/repo`, you're asked `Enable 'name' now? [y/N]` — defaults to no. Skip the prompt for scripted installs with `--enable` or `--no-enable`.
### Migration for existing users
When you upgrade to a version of Hermes that has opt-in plugins (config schema v21+), any user plugins already installed under `~/.hermes/plugins/` that weren't already in `plugins.disabled` are **automatically grandfathered** into `plugins.enabled`. Your existing setup keeps working. Bundled plugins are NOT grandfathered — even existing users have to opt in explicitly.
## Available hooks
Plugins can register callbacks for these lifecycle events. See the **[Event Hooks page](/docs/user-guide/features/hooks#plugin-hooks)** for full details, callback signatures, and examples.
@@ -157,15 +127,13 @@ Memory providers and context engines are **provider plugins** — only one of ea
## Managing plugins
```bash
hermes plugins # unified interactive UI
hermes plugins list # table: enabled / disabled / not enabled
hermes plugins install user/repo # install from Git, then prompt Enable? [y/N]
hermes plugins install user/repo --enable # install AND enable (no prompt)
hermes plugins install user/repo --no-enable # install but leave disabled (no prompt)
hermes plugins update my-plugin # pull latest
hermes plugins remove my-plugin # uninstall
hermes plugins enable my-plugin # add to allow-list
hermes plugins disable my-plugin # remove from allow-list + add to disabled
hermes plugins # unified interactive UI
hermes plugins list # table view with enabled/disabled status
hermes plugins install user/repo # install from Git
hermes plugins update my-plugin # pull latest
hermes plugins remove my-plugin # uninstall
hermes plugins enable my-plugin # re-enable a disabled plugin
hermes plugins disable my-plugin # disable without removing
```
### Interactive UI
@@ -179,16 +147,14 @@ Plugins
General Plugins
→ [✓] my-tool-plugin — Custom search tool
[ ] webhook-notifier — Event hooks
[ ] disk-cleanup — Auto-cleanup of ephemeral files [bundled]
Provider Plugins
Memory Provider ▸ honcho
Context Engine ▸ compressor
```
- **General Plugins section** — checkboxes, toggle with SPACE. Checked = in `plugins.enabled`, unchecked = in `plugins.disabled` (explicit off).
- **General Plugins section** — checkboxes, toggle with SPACE
- **Provider Plugins section** — shows current selection. Press ENTER to drill into a radio picker where you choose one active provider.
- Bundled plugins appear in the same list with a `[bundled]` tag.
Provider plugin selections are saved to `config.yaml`:
@@ -200,17 +166,15 @@ context:
engine: "compressor" # default built-in compressor
```
### Enabled vs. disabled vs. neither
### Disabling general plugins
Plugins occupy one of three states:
Disabled plugins remain installed but are skipped during loading. The disabled list is stored in `config.yaml` under `plugins.disabled`:
| State | Meaning | In `plugins.enabled`? | In `plugins.disabled`? |
|---|---|---|---|
| `enabled` | Loaded on next session | Yes | No |
| `disabled` | Explicitly off — won't load even if also in `enabled` | (irrelevant) | Yes |
| `not enabled` | Discovered but never opted in | No | No |
The default for a newly-installed or bundled plugin is `not enabled`. `hermes plugins list` shows all three distinct states so you can tell what's been explicitly turned off vs. what's just waiting to be enabled.
```yaml
plugins:
disabled:
- my-noisy-plugin
```
In a running session, `/plugins` shows which plugins are currently loaded.
-1
View File
@@ -51,7 +51,6 @@ const sidebars: SidebarsConfig = {
'user-guide/features/personality',
'user-guide/features/skins',
'user-guide/features/plugins',
'user-guide/features/built-in-plugins',
],
},
{