Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cc66b666e5 | |||
| 2556cfdab1 | |||
| d86be33161 | |||
| 569e9f9670 | |||
| 28e1e210ee | |||
| 93aa01c71c | |||
| 5d0f55cac4 | |||
| e09e48567e | |||
| 2aa3f199cb | |||
| 6367e1c4c0 |
@@ -697,6 +697,25 @@ def _read_main_model() -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def _read_main_provider() -> str:
|
||||
"""Read the user's configured main provider from config.yaml.
|
||||
|
||||
Returns the lowercase provider id (e.g. "alibaba", "openrouter") or ""
|
||||
if not configured.
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
cfg = load_config()
|
||||
model_cfg = cfg.get("model", {})
|
||||
if isinstance(model_cfg, dict):
|
||||
provider = model_cfg.get("provider", "")
|
||||
if isinstance(provider, str) and provider.strip():
|
||||
return provider.strip().lower()
|
||||
except Exception:
|
||||
pass
|
||||
return ""
|
||||
|
||||
|
||||
def _resolve_custom_runtime() -> Tuple[Optional[str], Optional[str]]:
|
||||
"""Resolve the active custom/main endpoint the same way the main CLI does.
|
||||
|
||||
@@ -855,10 +874,35 @@ _AUTO_PROVIDER_LABELS = {
|
||||
}
|
||||
|
||||
|
||||
_AGGREGATOR_PROVIDERS = frozenset({"openrouter", "nous"})
|
||||
|
||||
|
||||
def _resolve_auto() -> Tuple[Optional[OpenAI], Optional[str]]:
|
||||
"""Full auto-detection chain: OpenRouter → Nous → custom → Codex → API-key → None."""
|
||||
"""Full auto-detection chain.
|
||||
|
||||
Priority:
|
||||
1. If the user's main provider is NOT an aggregator (OpenRouter / Nous),
|
||||
use their main provider + main model directly. This ensures users on
|
||||
Alibaba, DeepSeek, ZAI, etc. get auxiliary tasks handled by the same
|
||||
provider they already have credentials for — no OpenRouter key needed.
|
||||
2. OpenRouter → Nous → custom → Codex → API-key providers (original chain).
|
||||
"""
|
||||
global auxiliary_is_nous
|
||||
auxiliary_is_nous = False # Reset — _try_nous() will set True if it wins
|
||||
|
||||
# ── Step 1: non-aggregator main provider → use main model directly ──
|
||||
main_provider = _read_main_provider()
|
||||
main_model = _read_main_model()
|
||||
if (main_provider and main_model
|
||||
and main_provider not in _AGGREGATOR_PROVIDERS
|
||||
and main_provider not in ("auto", "custom", "")):
|
||||
client, resolved = resolve_provider_client(main_provider, main_model)
|
||||
if client is not None:
|
||||
logger.info("Auxiliary auto-detect: using main provider %s (%s)",
|
||||
main_provider, resolved or main_model)
|
||||
return client, resolved or main_model
|
||||
|
||||
# ── Step 2: aggregator / fallback chain ──────────────────────────────
|
||||
tried = []
|
||||
for try_fn in (_try_openrouter, _try_nous, _try_custom_endpoint,
|
||||
_try_codex, _resolve_api_key_provider):
|
||||
|
||||
+1
-2
@@ -53,8 +53,7 @@ _PREFIX_PATTERNS = [
|
||||
# ENV assignment patterns: KEY=value where KEY contains a secret-like name
|
||||
_SECRET_ENV_NAMES = r"(?:API_?KEY|TOKEN|SECRET|PASSWORD|PASSWD|CREDENTIAL|AUTH)"
|
||||
_ENV_ASSIGN_RE = re.compile(
|
||||
rf"([A-Z_]{{0,50}}{_SECRET_ENV_NAMES}[A-Z_]{{0,50}})\s*=\s*(['\"]?)(\S+)\2",
|
||||
re.IGNORECASE,
|
||||
rf"([A-Z0-9_]{{0,50}}{_SECRET_ENV_NAMES}[A-Z0-9_]{{0,50}})\s*=\s*(['\"]?)(\S+)\2",
|
||||
)
|
||||
|
||||
# JSON field patterns: "apiKey": "value", "token": "value", etc.
|
||||
|
||||
@@ -375,6 +375,7 @@ def create_job(
|
||||
model: Optional[str] = None,
|
||||
provider: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new cron job.
|
||||
@@ -391,6 +392,9 @@ def create_job(
|
||||
model: Optional per-job model override
|
||||
provider: Optional per-job provider override
|
||||
base_url: Optional per-job base URL override
|
||||
script: Optional path to a Python script whose stdout is injected into the
|
||||
prompt each run. The script runs before the agent turn, and its output
|
||||
is prepended as context. Useful for data collection / change detection.
|
||||
|
||||
Returns:
|
||||
The created job dict
|
||||
@@ -419,6 +423,8 @@ def create_job(
|
||||
normalized_model = normalized_model or None
|
||||
normalized_provider = normalized_provider or None
|
||||
normalized_base_url = normalized_base_url or None
|
||||
normalized_script = str(script).strip() if isinstance(script, str) else None
|
||||
normalized_script = normalized_script or None
|
||||
|
||||
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
|
||||
job = {
|
||||
@@ -430,6 +436,7 @@ def create_job(
|
||||
"model": normalized_model,
|
||||
"provider": normalized_provider,
|
||||
"base_url": normalized_base_url,
|
||||
"script": normalized_script,
|
||||
"schedule": parsed_schedule,
|
||||
"schedule_display": parsed_schedule.get("display", schedule),
|
||||
"repeat": {
|
||||
|
||||
@@ -13,6 +13,7 @@ import concurrent.futures
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
@@ -229,11 +230,89 @@ def _deliver_result(job: dict, content: str) -> None:
|
||||
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
|
||||
|
||||
|
||||
_SCRIPT_TIMEOUT = 120 # seconds
|
||||
|
||||
|
||||
def _run_job_script(script_path: str) -> tuple[bool, str]:
|
||||
"""Execute a cron job's data-collection script and capture its output.
|
||||
|
||||
Args:
|
||||
script_path: Path to a Python script (resolved via HERMES_HOME/scripts/ or absolute).
|
||||
|
||||
Returns:
|
||||
(success, output) — on failure *output* contains the error message so the
|
||||
LLM can report the problem to the user.
|
||||
"""
|
||||
from hermes_constants import get_hermes_home
|
||||
|
||||
path = Path(script_path).expanduser()
|
||||
if not path.is_absolute():
|
||||
# Resolve relative paths against HERMES_HOME/scripts/
|
||||
path = get_hermes_home() / "scripts" / path
|
||||
|
||||
if not path.exists():
|
||||
return False, f"Script not found: {path}"
|
||||
if not path.is_file():
|
||||
return False, f"Script path is not a file: {path}"
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[sys.executable, str(path)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=_SCRIPT_TIMEOUT,
|
||||
cwd=str(path.parent),
|
||||
)
|
||||
stdout = (result.stdout or "").strip()
|
||||
stderr = (result.stderr or "").strip()
|
||||
|
||||
if result.returncode != 0:
|
||||
parts = [f"Script exited with code {result.returncode}"]
|
||||
if stderr:
|
||||
parts.append(f"stderr:\n{stderr}")
|
||||
if stdout:
|
||||
parts.append(f"stdout:\n{stdout}")
|
||||
return False, "\n".join(parts)
|
||||
|
||||
return True, stdout
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}"
|
||||
except Exception as exc:
|
||||
return False, f"Script execution failed: {exc}"
|
||||
|
||||
|
||||
def _build_job_prompt(job: dict) -> str:
|
||||
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
|
||||
prompt = job.get("prompt", "")
|
||||
skills = job.get("skills")
|
||||
|
||||
# Run data-collection script if configured, inject output as context.
|
||||
script_path = job.get("script")
|
||||
if script_path:
|
||||
success, script_output = _run_job_script(script_path)
|
||||
if success:
|
||||
if script_output:
|
||||
prompt = (
|
||||
"## Script Output\n"
|
||||
"The following data was collected by a pre-run script. "
|
||||
"Use it as context for your analysis.\n\n"
|
||||
f"```\n{script_output}\n```\n\n"
|
||||
f"{prompt}"
|
||||
)
|
||||
else:
|
||||
prompt = (
|
||||
"[Script ran successfully but produced no output.]\n\n"
|
||||
f"{prompt}"
|
||||
)
|
||||
else:
|
||||
prompt = (
|
||||
"## Script Error\n"
|
||||
"The data-collection script failed. Report this to the user.\n\n"
|
||||
f"```\n{script_output}\n```\n\n"
|
||||
f"{prompt}"
|
||||
)
|
||||
|
||||
# Always prepend [SILENT] guidance so the cron agent can suppress
|
||||
# delivery when it has nothing new or noteworthy to report.
|
||||
silent_hint = (
|
||||
|
||||
@@ -575,6 +575,20 @@ def load_gateway_config() -> GatewayConfig:
|
||||
if isinstance(frc, list):
|
||||
frc = ",".join(str(v) for v in frc)
|
||||
os.environ["WHATSAPP_FREE_RESPONSE_CHATS"] = str(frc)
|
||||
|
||||
# Matrix settings → env vars (env vars take precedence)
|
||||
matrix_cfg = yaml_cfg.get("matrix", {})
|
||||
if isinstance(matrix_cfg, dict):
|
||||
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
|
||||
os.environ["MATRIX_REQUIRE_MENTION"] = str(matrix_cfg["require_mention"]).lower()
|
||||
frc = matrix_cfg.get("free_response_rooms")
|
||||
if frc is not None and not os.getenv("MATRIX_FREE_RESPONSE_ROOMS"):
|
||||
if isinstance(frc, list):
|
||||
frc = ",".join(str(v) for v in frc)
|
||||
os.environ["MATRIX_FREE_RESPONSE_ROOMS"] = str(frc)
|
||||
if "auto_thread" in matrix_cfg and not os.getenv("MATRIX_AUTO_THREAD"):
|
||||
os.environ["MATRIX_AUTO_THREAD"] = str(matrix_cfg["auto_thread"]).lower()
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to process config.yaml — falling back to .env / gateway.json values. "
|
||||
|
||||
+144
-7
@@ -5,13 +5,16 @@ matrix-nio Python SDK. Supports optional end-to-end encryption (E2EE)
|
||||
when installed with ``pip install "matrix-nio[e2e]"``.
|
||||
|
||||
Environment variables:
|
||||
MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org)
|
||||
MATRIX_ACCESS_TOKEN Access token (preferred auth method)
|
||||
MATRIX_USER_ID Full user ID (@bot:server) — required for password login
|
||||
MATRIX_PASSWORD Password (alternative to access token)
|
||||
MATRIX_ENCRYPTION Set "true" to enable E2EE
|
||||
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
|
||||
MATRIX_HOME_ROOM Room ID for cron/notification delivery
|
||||
MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org)
|
||||
MATRIX_ACCESS_TOKEN Access token (preferred auth method)
|
||||
MATRIX_USER_ID Full user ID (@bot:server) — required for password login
|
||||
MATRIX_PASSWORD Password (alternative to access token)
|
||||
MATRIX_ENCRYPTION Set "true" to enable E2EE
|
||||
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
|
||||
MATRIX_HOME_ROOM Room ID for cron/notification delivery
|
||||
MATRIX_REQUIRE_MENTION Require @mention in rooms (default: true)
|
||||
MATRIX_FREE_RESPONSE_ROOMS Comma-separated room IDs exempt from mention requirement
|
||||
MATRIX_AUTO_THREAD Auto-create threads for room messages (default: true)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -123,6 +126,10 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
# Each entry: (room, event, timestamp)
|
||||
self._pending_megolm: list = []
|
||||
|
||||
# Thread participation tracking (for require_mention bypass)
|
||||
self._bot_participated_threads: set = self._load_participated_threads()
|
||||
self._MAX_TRACKED_THREADS = 500
|
||||
|
||||
def _is_duplicate_event(self, event_id) -> bool:
|
||||
"""Return True if this event was already processed. Tracks the ID otherwise."""
|
||||
if not event_id:
|
||||
@@ -902,6 +909,30 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
if relates_to.get("rel_type") == "m.thread":
|
||||
thread_id = relates_to.get("event_id")
|
||||
|
||||
# Require-mention gating.
|
||||
if not is_dm:
|
||||
free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "")
|
||||
free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()}
|
||||
require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no")
|
||||
is_free_room = room.room_id in free_rooms
|
||||
in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads)
|
||||
|
||||
formatted_body = source_content.get("formatted_body")
|
||||
if require_mention and not is_free_room and not in_bot_thread:
|
||||
if not self._is_bot_mentioned(body, formatted_body):
|
||||
return
|
||||
|
||||
# Strip mention from body when present (including in DMs).
|
||||
if self._is_bot_mentioned(body, source_content.get("formatted_body")):
|
||||
body = self._strip_mention(body)
|
||||
|
||||
# Auto-thread: create a thread for non-DM, non-threaded messages.
|
||||
if not is_dm and not thread_id:
|
||||
auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes")
|
||||
if auto_thread:
|
||||
thread_id = event.event_id
|
||||
self._track_thread(thread_id)
|
||||
|
||||
# Reply-to detection.
|
||||
reply_to = None
|
||||
in_reply_to = relates_to.get("m.in_reply_to", {})
|
||||
@@ -946,6 +977,9 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
reply_to_message_id=reply_to,
|
||||
)
|
||||
|
||||
if thread_id:
|
||||
self._track_thread(thread_id)
|
||||
|
||||
await self.handle_message(msg_event)
|
||||
|
||||
async def _on_room_message_media(self, room: Any, event: Any) -> None:
|
||||
@@ -1031,6 +1065,30 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
if relates_to.get("rel_type") == "m.thread":
|
||||
thread_id = relates_to.get("event_id")
|
||||
|
||||
# Require-mention gating (media messages).
|
||||
if not is_dm:
|
||||
free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "")
|
||||
free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()}
|
||||
require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no")
|
||||
is_free_room = room.room_id in free_rooms
|
||||
in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads)
|
||||
|
||||
if require_mention and not is_free_room and not in_bot_thread:
|
||||
formatted_body = source_content.get("formatted_body")
|
||||
if not self._is_bot_mentioned(body, formatted_body):
|
||||
return
|
||||
|
||||
# Strip mention from body when present (including in DMs).
|
||||
if self._is_bot_mentioned(body, source_content.get("formatted_body")):
|
||||
body = self._strip_mention(body)
|
||||
|
||||
# Auto-thread: create a thread for non-DM, non-threaded messages.
|
||||
if not is_dm and not thread_id:
|
||||
auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes")
|
||||
if auto_thread:
|
||||
thread_id = event.event_id
|
||||
self._track_thread(thread_id)
|
||||
|
||||
# For voice messages, cache audio locally for transcription tools.
|
||||
# Use the authenticated nio client to download (Matrix requires auth for media).
|
||||
media_urls = [http_url] if http_url else None
|
||||
@@ -1079,6 +1137,9 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
media_types=media_types,
|
||||
)
|
||||
|
||||
if thread_id:
|
||||
self._track_thread(thread_id)
|
||||
|
||||
await self.handle_message(msg_event)
|
||||
|
||||
async def _on_invite(self, room: Any, event: Any) -> None:
|
||||
@@ -1166,6 +1227,82 @@ class MatrixAdapter(BasePlatformAdapter):
|
||||
for rid in self._joined_rooms
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Thread participation tracking
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
@staticmethod
|
||||
def _thread_state_path() -> Path:
|
||||
"""Path to the persisted thread participation set."""
|
||||
from hermes_cli.config import get_hermes_home
|
||||
return get_hermes_home() / "matrix_threads.json"
|
||||
|
||||
@classmethod
|
||||
def _load_participated_threads(cls) -> set:
|
||||
"""Load persisted thread IDs from disk."""
|
||||
path = cls._thread_state_path()
|
||||
try:
|
||||
if path.exists():
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
if isinstance(data, list):
|
||||
return set(data)
|
||||
except Exception as e:
|
||||
logger.debug("Could not load matrix thread state: %s", e)
|
||||
return set()
|
||||
|
||||
def _save_participated_threads(self) -> None:
|
||||
"""Persist the current thread set to disk (best-effort)."""
|
||||
path = self._thread_state_path()
|
||||
try:
|
||||
thread_list = list(self._bot_participated_threads)
|
||||
if len(thread_list) > self._MAX_TRACKED_THREADS:
|
||||
thread_list = thread_list[-self._MAX_TRACKED_THREADS:]
|
||||
self._bot_participated_threads = set(thread_list)
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(json.dumps(thread_list), encoding="utf-8")
|
||||
except Exception as e:
|
||||
logger.debug("Could not save matrix thread state: %s", e)
|
||||
|
||||
def _track_thread(self, thread_id: str) -> None:
|
||||
"""Add a thread to the participation set and persist."""
|
||||
if thread_id not in self._bot_participated_threads:
|
||||
self._bot_participated_threads.add(thread_id)
|
||||
self._save_participated_threads()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Mention detection helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _is_bot_mentioned(self, body: str, formatted_body: Optional[str] = None) -> bool:
|
||||
"""Return True if the bot is mentioned in the message."""
|
||||
if not body and not formatted_body:
|
||||
return False
|
||||
# Check for full @user:server in body
|
||||
if self._user_id and self._user_id in body:
|
||||
return True
|
||||
# Check for localpart with word boundaries (case-insensitive)
|
||||
if self._user_id and ":" in self._user_id:
|
||||
localpart = self._user_id.split(":")[0].lstrip("@")
|
||||
if localpart and re.search(r'\b' + re.escape(localpart) + r'\b', body, re.IGNORECASE):
|
||||
return True
|
||||
# Check formatted_body for Matrix pill
|
||||
if formatted_body and self._user_id:
|
||||
if f"matrix.to/#/{self._user_id}" in formatted_body:
|
||||
return True
|
||||
return False
|
||||
|
||||
def _strip_mention(self, body: str) -> str:
|
||||
"""Remove bot mention from message body."""
|
||||
# Remove full @user:server
|
||||
if self._user_id:
|
||||
body = body.replace(self._user_id, "")
|
||||
# If still contains localpart mention, remove it
|
||||
if self._user_id and ":" in self._user_id:
|
||||
localpart = self._user_id.split(":")[0].lstrip("@")
|
||||
if localpart:
|
||||
body = re.sub(r'\b' + re.escape(localpart) + r'\b', '', body, flags=re.IGNORECASE)
|
||||
return body.strip()
|
||||
|
||||
def _get_display_name(self, room: Any, user_id: str) -> str:
|
||||
"""Get a user's display name in a room, falling back to user_id."""
|
||||
if room and hasattr(room, "users"):
|
||||
|
||||
@@ -42,6 +42,7 @@ _EXTRA_ENV_KEYS = frozenset({
|
||||
"WHATSAPP_MODE", "WHATSAPP_ENABLED",
|
||||
"MATTERMOST_HOME_CHANNEL", "MATTERMOST_REPLY_MODE",
|
||||
"MATRIX_PASSWORD", "MATRIX_ENCRYPTION", "MATRIX_HOME_ROOM",
|
||||
"MATRIX_REQUIRE_MENTION", "MATRIX_FREE_RESPONSE_ROOMS", "MATRIX_AUTO_THREAD",
|
||||
})
|
||||
import yaml
|
||||
|
||||
@@ -1008,6 +1009,30 @@ OPTIONAL_ENV_VARS = {
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
},
|
||||
"MATRIX_REQUIRE_MENTION": {
|
||||
"description": "Require @mention in Matrix rooms (default: true). Set to false to respond to all messages.",
|
||||
"prompt": "Require @mention in rooms (true/false)",
|
||||
"url": None,
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
"advanced": True,
|
||||
},
|
||||
"MATRIX_FREE_RESPONSE_ROOMS": {
|
||||
"description": "Comma-separated Matrix room IDs where bot responds without @mention",
|
||||
"prompt": "Free-response room IDs (comma-separated)",
|
||||
"url": None,
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
"advanced": True,
|
||||
},
|
||||
"MATRIX_AUTO_THREAD": {
|
||||
"description": "Auto-create threads for messages in Matrix rooms (default: true)",
|
||||
"prompt": "Auto-create threads in rooms (true/false)",
|
||||
"url": None,
|
||||
"password": False,
|
||||
"category": "messaging",
|
||||
"advanced": True,
|
||||
},
|
||||
"GATEWAY_ALLOW_ALL_USERS": {
|
||||
"description": "Allow all users to interact with messaging bots (true/false). Default: false.",
|
||||
"prompt": "Allow all users (true/false)",
|
||||
|
||||
@@ -90,6 +90,9 @@ def cron_list(show_all: bool = False):
|
||||
print(f" Deliver: {deliver_str}")
|
||||
if skills:
|
||||
print(f" Skills: {', '.join(skills)}")
|
||||
script = job.get("script")
|
||||
if script:
|
||||
print(f" Script: {script}")
|
||||
print()
|
||||
|
||||
from hermes_cli.gateway import find_gateway_pids
|
||||
@@ -149,6 +152,7 @@ def cron_create(args):
|
||||
repeat=getattr(args, "repeat", None),
|
||||
skill=getattr(args, "skill", None),
|
||||
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
|
||||
script=getattr(args, "script", None),
|
||||
)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
@@ -158,6 +162,9 @@ def cron_create(args):
|
||||
print(f" Schedule: {result['schedule']}")
|
||||
if result.get("skills"):
|
||||
print(f" Skills: {', '.join(result['skills'])}")
|
||||
job_data = result.get("job", {})
|
||||
if job_data.get("script"):
|
||||
print(f" Script: {job_data['script']}")
|
||||
print(f" Next run: {result['next_run_at']}")
|
||||
return 0
|
||||
|
||||
@@ -195,6 +202,7 @@ def cron_edit(args):
|
||||
deliver=getattr(args, "deliver", None),
|
||||
repeat=getattr(args, "repeat", None),
|
||||
skills=final_skills,
|
||||
script=getattr(args, "script", None),
|
||||
)
|
||||
if not result.get("success"):
|
||||
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))
|
||||
@@ -208,6 +216,8 @@ def cron_edit(args):
|
||||
print(f" Skills: {', '.join(updated['skills'])}")
|
||||
else:
|
||||
print(" Skills: none")
|
||||
if updated.get("script"):
|
||||
print(f" Script: {updated['script']}")
|
||||
return 0
|
||||
|
||||
|
||||
|
||||
@@ -4381,6 +4381,7 @@ For more help on a command:
|
||||
cron_create.add_argument("--deliver", help="Delivery target: origin, local, telegram, discord, signal, or platform:chat_id")
|
||||
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
|
||||
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
|
||||
cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run")
|
||||
|
||||
# cron edit
|
||||
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
|
||||
@@ -4394,6 +4395,7 @@ For more help on a command:
|
||||
cron_edit.add_argument("--add-skill", dest="add_skills", action="append", help="Append a skill without replacing the existing list. Repeatable.")
|
||||
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
|
||||
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
|
||||
cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.")
|
||||
|
||||
# lifecycle actions
|
||||
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")
|
||||
|
||||
@@ -151,6 +151,7 @@ def _install_dependencies(provider_name: str) -> None:
|
||||
"honcho-ai": "honcho",
|
||||
"mem0ai": "mem0",
|
||||
"hindsight-client": "hindsight_client",
|
||||
"hindsight-all": "hindsight",
|
||||
}
|
||||
|
||||
# Check which packages are missing
|
||||
@@ -166,9 +167,18 @@ def _install_dependencies(provider_name: str) -> None:
|
||||
return
|
||||
|
||||
print(f"\n Installing dependencies: {', '.join(missing)}")
|
||||
|
||||
import shutil
|
||||
uv_path = shutil.which("uv")
|
||||
if not uv_path:
|
||||
print(f" ⚠ uv not found — cannot install dependencies")
|
||||
print(f" Install uv: curl -LsSf https://astral.sh/uv/install.sh | sh")
|
||||
print(f" Then re-run: hermes memory setup")
|
||||
return
|
||||
|
||||
try:
|
||||
subprocess.run(
|
||||
[sys.executable, "-m", "pip", "install", "--quiet"] + missing,
|
||||
[uv_path, "pip", "install", "--python", sys.executable, "--quiet"] + missing,
|
||||
check=True, timeout=120,
|
||||
capture_output=True,
|
||||
)
|
||||
@@ -178,10 +188,10 @@ def _install_dependencies(provider_name: str) -> None:
|
||||
stderr = (e.stderr or b"").decode()[:200]
|
||||
if stderr:
|
||||
print(f" {stderr}")
|
||||
print(f" Run manually: pip install {' '.join(missing)}")
|
||||
print(f" Run manually: uv pip install --python {sys.executable} {' '.join(missing)}")
|
||||
except Exception as e:
|
||||
print(f" ⚠ Install failed: {e}")
|
||||
print(f" Run manually: pip install {' '.join(missing)}")
|
||||
print(f" Run manually: uv pip install --python {sys.executable} {' '.join(missing)}")
|
||||
|
||||
# Also show external dependencies (non-pip) if any
|
||||
ext_deps = meta.get("external_dependencies", [])
|
||||
@@ -275,7 +285,6 @@ def cmd_setup(args) -> None:
|
||||
|
||||
schema = provider.get_config_schema() if hasattr(provider, "get_config_schema") else []
|
||||
|
||||
# Provider config section
|
||||
provider_config = config["memory"].get(name, {})
|
||||
if not isinstance(provider_config, dict):
|
||||
provider_config = {}
|
||||
@@ -290,11 +299,25 @@ def cmd_setup(args) -> None:
|
||||
key = field["key"]
|
||||
desc = field.get("description", key)
|
||||
default = field.get("default")
|
||||
# Dynamic default: look up default from another field's value
|
||||
default_from = field.get("default_from")
|
||||
if default_from and isinstance(default_from, dict):
|
||||
ref_field = default_from.get("field", "")
|
||||
ref_map = default_from.get("map", {})
|
||||
ref_value = provider_config.get(ref_field, "")
|
||||
if ref_value and ref_value in ref_map:
|
||||
default = ref_map[ref_value]
|
||||
is_secret = field.get("secret", False)
|
||||
choices = field.get("choices")
|
||||
env_var = field.get("env_var")
|
||||
url = field.get("url")
|
||||
|
||||
# Skip fields whose "when" condition doesn't match
|
||||
when = field.get("when")
|
||||
if when and isinstance(when, dict):
|
||||
if not all(provider_config.get(k) == v for k, v in when.items()):
|
||||
continue
|
||||
|
||||
if choices and not is_secret:
|
||||
# Use curses picker for choice fields
|
||||
choice_items = [(c, "") for c in choices]
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
# Hindsight Memory Provider
|
||||
|
||||
Long-term memory with knowledge graph, entity resolution, and multi-strategy retrieval. Supports cloud and local modes.
|
||||
Long-term memory with knowledge graph, entity resolution, and multi-strategy retrieval. Supports cloud and local (embedded) modes.
|
||||
|
||||
## Requirements
|
||||
|
||||
- Cloud: `pip install hindsight-client` + API key from [app.hindsight.vectorize.io](https://app.hindsight.vectorize.io)
|
||||
- Local: `pip install hindsight` + LLM API key for embeddings
|
||||
- **Cloud:** API key from [ui.hindsight.vectorize.io](https://ui.hindsight.vectorize.io)
|
||||
- **Local:** API key for a supported LLM provider (OpenAI, Anthropic, Gemini, Groq, MiniMax, or Ollama). Embeddings and reranking run locally — no additional API keys needed.
|
||||
|
||||
## Setup
|
||||
|
||||
@@ -13,26 +13,86 @@ Long-term memory with knowledge graph, entity resolution, and multi-strategy ret
|
||||
hermes memory setup # select "hindsight"
|
||||
```
|
||||
|
||||
Or manually:
|
||||
The setup wizard will install dependencies automatically via `uv` and walk you through configuration.
|
||||
|
||||
Or manually (cloud mode with defaults):
|
||||
```bash
|
||||
hermes config set memory.provider hindsight
|
||||
echo "HINDSIGHT_API_KEY=your-key" >> ~/.hermes/.env
|
||||
```
|
||||
|
||||
### Cloud Mode
|
||||
|
||||
Connects to the Hindsight Cloud API. Requires an API key from [ui.hindsight.vectorize.io](https://ui.hindsight.vectorize.io).
|
||||
|
||||
### Local Mode
|
||||
|
||||
Runs an embedded Hindsight server with built-in PostgreSQL. Requires an LLM API key (e.g. Groq, OpenAI, Anthropic) for memory extraction and synthesis. The daemon starts automatically in the background on first use and stops after 5 minutes of inactivity.
|
||||
|
||||
Daemon startup logs: `~/.hermes/logs/hindsight-embed.log`
|
||||
Daemon runtime logs: `~/.hindsight/profiles/<profile>.log`
|
||||
|
||||
## Config
|
||||
|
||||
Config file: `$HERMES_HOME/hindsight/config.json` (or `~/.hindsight/config.json` legacy)
|
||||
Config file: `~/.hermes/hindsight/config.json`
|
||||
|
||||
### Connection
|
||||
|
||||
| Key | Default | Description |
|
||||
|-----|---------|-------------|
|
||||
| `mode` | `cloud` | `cloud` or `local` |
|
||||
| `bank_id` | `hermes` | Memory bank identifier |
|
||||
| `budget` | `mid` | Recall thoroughness: `low`/`mid`/`high` |
|
||||
| `api_url` | `https://api.hindsight.vectorize.io` | API URL (cloud mode) |
|
||||
| `api_url` | `http://localhost:8888` | API URL (local mode, unused — daemon manages its own port) |
|
||||
|
||||
### Memory
|
||||
|
||||
| Key | Default | Description |
|
||||
|-----|---------|-------------|
|
||||
| `bank_id` | `hermes` | Memory bank name |
|
||||
| `budget` | `mid` | Recall thoroughness: `low` / `mid` / `high` |
|
||||
|
||||
### Integration
|
||||
|
||||
| Key | Default | Description |
|
||||
|-----|---------|-------------|
|
||||
| `memory_mode` | `hybrid` | How memories are integrated into the agent |
|
||||
| `prefetch_method` | `recall` | Method for automatic context injection |
|
||||
|
||||
**memory_mode:**
|
||||
- `hybrid` — automatic context injection + tools available to the LLM
|
||||
- `context` — automatic injection only, no tools exposed
|
||||
- `tools` — tools only, no automatic injection
|
||||
|
||||
**prefetch_method:**
|
||||
- `recall` — injects raw memory facts (fast)
|
||||
- `reflect` — injects LLM-synthesized summary (slower, more coherent)
|
||||
|
||||
### Local Mode LLM
|
||||
|
||||
| Key | Default | Description |
|
||||
|-----|---------|-------------|
|
||||
| `llm_provider` | `openai` | LLM provider: `openai`, `anthropic`, `gemini`, `groq`, `minimax`, `ollama` |
|
||||
| `llm_model` | per-provider | Model name (e.g. `gpt-4o-mini`, `openai/gpt-oss-120b`) |
|
||||
|
||||
The LLM API key is stored in `~/.hermes/.env` as `HINDSIGHT_LLM_API_KEY`.
|
||||
|
||||
## Tools
|
||||
|
||||
Available in `hybrid` and `tools` memory modes:
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `hindsight_retain` | Store information with auto entity extraction |
|
||||
| `hindsight_recall` | Multi-strategy search (semantic + entity graph) |
|
||||
| `hindsight_reflect` | Cross-memory synthesis (LLM-powered) |
|
||||
|
||||
## Environment Variables
|
||||
|
||||
| Variable | Description |
|
||||
|----------|-------------|
|
||||
| `HINDSIGHT_API_KEY` | API key for Hindsight Cloud |
|
||||
| `HINDSIGHT_LLM_API_KEY` | LLM API key for local mode |
|
||||
| `HINDSIGHT_API_URL` | Override API endpoint |
|
||||
| `HINDSIGHT_BANK_ID` | Override bank name |
|
||||
| `HINDSIGHT_BUDGET` | Override recall budget |
|
||||
| `HINDSIGHT_MODE` | Override mode (`cloud` / `local`) |
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Hindsight memory plugin — MemoryProvider interface.
|
||||
|
||||
Long-term memory with knowledge graph, entity resolution, and multi-strategy
|
||||
retrieval. Supports cloud (API key) and local (embedded PostgreSQL) modes.
|
||||
retrieval. Supports cloud (API key) and local modes.
|
||||
|
||||
Original PR #1811 by benfrank241, adapted to MemoryProvider ABC.
|
||||
|
||||
@@ -18,10 +18,10 @@ Or via $HERMES_HOME/hindsight/config.json (profile-scoped), falling back to
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import queue
|
||||
import threading
|
||||
from typing import Any, Dict, List
|
||||
|
||||
@@ -30,30 +30,51 @@ from agent.memory_provider import MemoryProvider
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_DEFAULT_API_URL = "https://api.hindsight.vectorize.io"
|
||||
_DEFAULT_LOCAL_URL = "http://localhost:8888"
|
||||
_VALID_BUDGETS = {"low", "mid", "high"}
|
||||
_PROVIDER_DEFAULT_MODELS = {
|
||||
"openai": "gpt-4o-mini",
|
||||
"anthropic": "claude-haiku-4-5",
|
||||
"gemini": "gemini-2.5-flash",
|
||||
"groq": "openai/gpt-oss-120b",
|
||||
"minimax": "MiniMax-M2.7",
|
||||
"ollama": "gemma3:12b",
|
||||
"lmstudio": "local-model",
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Thread helper (from original PR — avoids aiohttp event loop conflicts)
|
||||
# Dedicated event loop for Hindsight async calls (one per process, reused).
|
||||
# Avoids creating ephemeral loops that leak aiohttp sessions.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _run_in_thread(fn, timeout: float = 30.0):
|
||||
result_q: queue.Queue = queue.Queue(maxsize=1)
|
||||
_loop: asyncio.AbstractEventLoop | None = None
|
||||
_loop_thread: threading.Thread | None = None
|
||||
_loop_lock = threading.Lock()
|
||||
|
||||
def _run():
|
||||
import asyncio
|
||||
asyncio.set_event_loop(None)
|
||||
try:
|
||||
result_q.put(("ok", fn()))
|
||||
except Exception as exc:
|
||||
result_q.put(("err", exc))
|
||||
|
||||
t = threading.Thread(target=_run, daemon=True, name="hindsight-call")
|
||||
t.start()
|
||||
kind, value = result_q.get(timeout=timeout)
|
||||
if kind == "err":
|
||||
raise value
|
||||
return value
|
||||
def _get_loop() -> asyncio.AbstractEventLoop:
|
||||
"""Return a long-lived event loop running on a background thread."""
|
||||
global _loop, _loop_thread
|
||||
with _loop_lock:
|
||||
if _loop is not None and _loop.is_running():
|
||||
return _loop
|
||||
_loop = asyncio.new_event_loop()
|
||||
|
||||
def _run():
|
||||
asyncio.set_event_loop(_loop)
|
||||
_loop.run_forever()
|
||||
|
||||
_loop_thread = threading.Thread(target=_run, daemon=True, name="hindsight-loop")
|
||||
_loop_thread.start()
|
||||
return _loop
|
||||
|
||||
|
||||
def _run_sync(coro, timeout: float = 120.0):
|
||||
"""Schedule *coro* on the shared loop and block until done."""
|
||||
loop = _get_loop()
|
||||
future = asyncio.run_coroutine_threadsafe(coro, loop)
|
||||
return future.result(timeout=timeout)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -161,9 +182,13 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
def __init__(self):
|
||||
self._config = None
|
||||
self._api_key = None
|
||||
self._api_url = _DEFAULT_API_URL
|
||||
self._bank_id = "hermes"
|
||||
self._budget = "mid"
|
||||
self._mode = "cloud"
|
||||
self._memory_mode = "hybrid" # "context", "tools", or "hybrid"
|
||||
self._prefetch_method = "recall" # "recall" or "reflect"
|
||||
self._client = None
|
||||
self._prefetch_result = ""
|
||||
self._prefetch_lock = threading.Lock()
|
||||
self._prefetch_thread = None
|
||||
@@ -178,10 +203,10 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
cfg = _load_config()
|
||||
mode = cfg.get("mode", "cloud")
|
||||
if mode == "local":
|
||||
embed = cfg.get("embed", {})
|
||||
return bool(embed.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY"))
|
||||
api_key = cfg.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", "")
|
||||
return bool(api_key)
|
||||
return True
|
||||
has_key = bool(cfg.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", ""))
|
||||
has_url = bool(cfg.get("api_url") or os.environ.get("HINDSIGHT_API_URL", ""))
|
||||
return has_key or has_url
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@@ -204,49 +229,148 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
def get_config_schema(self):
|
||||
return [
|
||||
{"key": "mode", "description": "Cloud API or local embedded mode", "default": "cloud", "choices": ["cloud", "local"]},
|
||||
{"key": "api_key", "description": "Hindsight Cloud API key", "secret": True, "env_var": "HINDSIGHT_API_KEY", "url": "https://app.hindsight.vectorize.io"},
|
||||
{"key": "bank_id", "description": "Memory bank identifier", "default": "hermes"},
|
||||
{"key": "api_url", "description": "Hindsight API URL", "default": _DEFAULT_API_URL, "when": {"mode": "cloud"}},
|
||||
{"key": "api_key", "description": "Hindsight Cloud API key", "secret": True, "env_var": "HINDSIGHT_API_KEY", "url": "https://ui.hindsight.vectorize.io", "when": {"mode": "cloud"}},
|
||||
{"key": "llm_provider", "description": "LLM provider for local mode", "default": "openai", "choices": ["openai", "anthropic", "gemini", "groq", "minimax", "ollama"], "when": {"mode": "local"}},
|
||||
{"key": "llm_api_key", "description": "LLM API key for local Hindsight", "secret": True, "env_var": "HINDSIGHT_LLM_API_KEY", "when": {"mode": "local"}},
|
||||
{"key": "llm_model", "description": "LLM model for local mode", "default": "gpt-4o-mini", "default_from": {"field": "llm_provider", "map": _PROVIDER_DEFAULT_MODELS}, "when": {"mode": "local"}},
|
||||
{"key": "bank_id", "description": "Memory bank name", "default": "hermes"},
|
||||
{"key": "budget", "description": "Recall thoroughness", "default": "mid", "choices": ["low", "mid", "high"]},
|
||||
{"key": "llm_provider", "description": "LLM provider for local mode", "default": "anthropic", "choices": ["anthropic", "openai", "groq", "ollama"]},
|
||||
{"key": "llm_api_key", "description": "LLM API key for local mode", "secret": True, "env_var": "HINDSIGHT_LLM_API_KEY"},
|
||||
{"key": "llm_model", "description": "LLM model for local mode", "default": "claude-haiku-4-5-20251001"},
|
||||
{"key": "memory_mode", "description": "Memory integration mode", "default": "hybrid", "choices": ["hybrid", "context", "tools"]},
|
||||
{"key": "prefetch_method", "description": "Auto-recall method", "default": "recall", "choices": ["recall", "reflect"]},
|
||||
]
|
||||
|
||||
def _make_client(self):
|
||||
"""Create a fresh Hindsight client (thread-safe)."""
|
||||
if self._mode == "local":
|
||||
from hindsight import HindsightEmbedded
|
||||
embed = self._config.get("embed", {})
|
||||
return HindsightEmbedded(
|
||||
profile=embed.get("profile", "hermes"),
|
||||
llm_provider=embed.get("llmProvider", ""),
|
||||
llm_api_key=embed.get("llmApiKey", ""),
|
||||
llm_model=embed.get("llmModel", ""),
|
||||
)
|
||||
from hindsight_client import Hindsight
|
||||
return Hindsight(api_key=self._api_key, timeout=30.0)
|
||||
def _get_client(self):
|
||||
"""Return the cached Hindsight client (created once, reused)."""
|
||||
if self._client is None:
|
||||
if self._mode == "local":
|
||||
from hindsight import HindsightEmbedded
|
||||
# Disable __del__ on the class to prevent "attached to a
|
||||
# different loop" errors during GC — we handle cleanup in
|
||||
# shutdown() instead.
|
||||
HindsightEmbedded.__del__ = lambda self: None
|
||||
self._client = HindsightEmbedded(
|
||||
profile=self._config.get("profile", "hermes"),
|
||||
llm_provider=self._config.get("llm_provider", ""),
|
||||
llm_api_key=self._config.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY", ""),
|
||||
llm_model=self._config.get("llm_model", ""),
|
||||
)
|
||||
else:
|
||||
from hindsight_client import Hindsight
|
||||
kwargs = {"base_url": self._api_url, "timeout": 30.0}
|
||||
if self._api_key:
|
||||
kwargs["api_key"] = self._api_key
|
||||
self._client = Hindsight(**kwargs)
|
||||
return self._client
|
||||
|
||||
def initialize(self, session_id: str, **kwargs) -> None:
|
||||
self._config = _load_config()
|
||||
self._mode = self._config.get("mode", "cloud")
|
||||
self._api_key = self._config.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", "")
|
||||
default_url = _DEFAULT_LOCAL_URL if self._mode == "local" else _DEFAULT_API_URL
|
||||
self._api_url = self._config.get("api_url") or os.environ.get("HINDSIGHT_API_URL", default_url)
|
||||
|
||||
banks = self._config.get("banks", {}).get("hermes", {})
|
||||
self._bank_id = banks.get("bankId", "hermes")
|
||||
budget = banks.get("budget", "mid")
|
||||
self._bank_id = self._config.get("bank_id") or banks.get("bankId", "hermes")
|
||||
budget = self._config.get("budget") or banks.get("budget", "mid")
|
||||
self._budget = budget if budget in _VALID_BUDGETS else "mid"
|
||||
|
||||
# Ensure bank exists
|
||||
try:
|
||||
client = _run_in_thread(self._make_client)
|
||||
_run_in_thread(lambda: client.create_bank(bank_id=self._bank_id, name=self._bank_id))
|
||||
except Exception:
|
||||
pass # Already exists
|
||||
memory_mode = self._config.get("memory_mode", "hybrid")
|
||||
self._memory_mode = memory_mode if memory_mode in ("context", "tools", "hybrid") else "hybrid"
|
||||
|
||||
prefetch_method = self._config.get("prefetch_method", "recall")
|
||||
self._prefetch_method = prefetch_method if prefetch_method in ("recall", "reflect") else "recall"
|
||||
|
||||
logger.info("Hindsight initialized: mode=%s, api_url=%s, bank=%s, budget=%s, memory_mode=%s, prefetch_method=%s",
|
||||
self._mode, self._api_url, self._bank_id, self._budget, self._memory_mode, self._prefetch_method)
|
||||
|
||||
# For local mode, start the embedded daemon in the background so it
|
||||
# doesn't block the chat. Redirect stdout/stderr to a log file to
|
||||
# prevent rich startup output from spamming the terminal.
|
||||
if self._mode == "local":
|
||||
def _start_daemon():
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
log_dir = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) / "logs"
|
||||
log_dir.mkdir(parents=True, exist_ok=True)
|
||||
log_path = log_dir / "hindsight-embed.log"
|
||||
try:
|
||||
# Redirect the daemon manager's Rich console to our log file
|
||||
# instead of stderr. This avoids global fd redirects that
|
||||
# would capture output from other threads.
|
||||
import hindsight_embed.daemon_embed_manager as dem
|
||||
from rich.console import Console
|
||||
dem.console = Console(file=open(log_path, "a"), force_terminal=False)
|
||||
|
||||
client = self._get_client()
|
||||
profile = self._config.get("profile", "hermes")
|
||||
|
||||
# Update the profile .env to match our current config so
|
||||
# the daemon always starts with the right settings.
|
||||
# If the config changed and the daemon is running, stop it.
|
||||
from pathlib import Path as _Path
|
||||
profile_env = _Path.home() / ".hindsight" / "profiles" / f"{profile}.env"
|
||||
current_key = self._config.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY", "")
|
||||
current_provider = self._config.get("llm_provider", "")
|
||||
current_model = self._config.get("llm_model", "")
|
||||
|
||||
# Read saved profile config
|
||||
saved = {}
|
||||
if profile_env.exists():
|
||||
for line in profile_env.read_text().splitlines():
|
||||
if "=" in line and not line.startswith("#"):
|
||||
k, v = line.split("=", 1)
|
||||
saved[k.strip()] = v.strip()
|
||||
|
||||
config_changed = (
|
||||
saved.get("HINDSIGHT_API_LLM_PROVIDER") != current_provider or
|
||||
saved.get("HINDSIGHT_API_LLM_MODEL") != current_model or
|
||||
saved.get("HINDSIGHT_API_LLM_API_KEY") != current_key
|
||||
)
|
||||
|
||||
if config_changed:
|
||||
# Write updated profile .env
|
||||
profile_env.parent.mkdir(parents=True, exist_ok=True)
|
||||
profile_env.write_text(
|
||||
f"HINDSIGHT_API_LLM_PROVIDER={current_provider}\n"
|
||||
f"HINDSIGHT_API_LLM_API_KEY={current_key}\n"
|
||||
f"HINDSIGHT_API_LLM_MODEL={current_model}\n"
|
||||
f"HINDSIGHT_API_LOG_LEVEL=info\n"
|
||||
)
|
||||
if client._manager.is_running(profile):
|
||||
with open(log_path, "a") as f:
|
||||
f.write("\n=== Config changed, restarting daemon ===\n")
|
||||
client._manager.stop(profile)
|
||||
|
||||
client._ensure_started()
|
||||
with open(log_path, "a") as f:
|
||||
f.write("\n=== Daemon started successfully ===\n")
|
||||
except Exception as e:
|
||||
with open(log_path, "a") as f:
|
||||
f.write(f"\n=== Daemon startup failed: {e} ===\n")
|
||||
traceback.print_exc(file=f)
|
||||
|
||||
t = threading.Thread(target=_start_daemon, daemon=True, name="hindsight-daemon-start")
|
||||
t.start()
|
||||
|
||||
def system_prompt_block(self) -> str:
|
||||
if self._memory_mode == "context":
|
||||
return (
|
||||
f"# Hindsight Memory\n"
|
||||
f"Active (context mode). Bank: {self._bank_id}, budget: {self._budget}.\n"
|
||||
f"Relevant memories are automatically injected into context."
|
||||
)
|
||||
if self._memory_mode == "tools":
|
||||
return (
|
||||
f"# Hindsight Memory\n"
|
||||
f"Active (tools mode). Bank: {self._bank_id}, budget: {self._budget}.\n"
|
||||
f"Use hindsight_recall to search, hindsight_reflect for synthesis, "
|
||||
f"hindsight_retain to store facts."
|
||||
)
|
||||
return (
|
||||
f"# Hindsight Memory\n"
|
||||
f"Active. Bank: {self._bank_id}, budget: {self._budget}.\n"
|
||||
f"Relevant memories are automatically injected into context. "
|
||||
f"Use hindsight_recall to search, hindsight_reflect for synthesis, "
|
||||
f"hindsight_retain to store facts."
|
||||
)
|
||||
@@ -262,12 +386,18 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
return f"## Hindsight Memory\n{result}"
|
||||
|
||||
def queue_prefetch(self, query: str, *, session_id: str = "") -> None:
|
||||
if self._memory_mode == "tools":
|
||||
return
|
||||
def _run():
|
||||
try:
|
||||
client = self._make_client()
|
||||
resp = client.recall(bank_id=self._bank_id, query=query, budget=self._budget)
|
||||
if resp.results:
|
||||
text = "\n".join(r.text for r in resp.results if r.text)
|
||||
client = self._get_client()
|
||||
if self._prefetch_method == "reflect":
|
||||
resp = _run_sync(client.areflect(bank_id=self._bank_id, query=query, budget=self._budget))
|
||||
text = resp.text or ""
|
||||
else:
|
||||
resp = _run_sync(client.arecall(bank_id=self._bank_id, query=query, budget=self._budget))
|
||||
text = "\n".join(r.text for r in resp.results if r.text) if resp.results else ""
|
||||
if text:
|
||||
with self._prefetch_lock:
|
||||
self._prefetch_result = text
|
||||
except Exception as e:
|
||||
@@ -282,11 +412,10 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
|
||||
def _sync():
|
||||
try:
|
||||
_run_in_thread(
|
||||
lambda: self._make_client().retain(
|
||||
bank_id=self._bank_id, content=combined, context="conversation"
|
||||
)
|
||||
)
|
||||
client = self._get_client()
|
||||
_run_sync(client.aretain(
|
||||
bank_id=self._bank_id, content=combined, context="conversation"
|
||||
))
|
||||
except Exception as e:
|
||||
logger.warning("Hindsight sync failed: %s", e)
|
||||
|
||||
@@ -296,22 +425,29 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
self._sync_thread.start()
|
||||
|
||||
def get_tool_schemas(self) -> List[Dict[str, Any]]:
|
||||
if self._memory_mode == "context":
|
||||
return []
|
||||
return [RETAIN_SCHEMA, RECALL_SCHEMA, REFLECT_SCHEMA]
|
||||
|
||||
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
|
||||
try:
|
||||
client = self._get_client()
|
||||
except Exception as e:
|
||||
logger.warning("Hindsight client init failed: %s", e)
|
||||
return json.dumps({"error": f"Hindsight client unavailable: {e}"})
|
||||
|
||||
if tool_name == "hindsight_retain":
|
||||
content = args.get("content", "")
|
||||
if not content:
|
||||
return json.dumps({"error": "Missing required parameter: content"})
|
||||
context = args.get("context")
|
||||
try:
|
||||
_run_in_thread(
|
||||
lambda: self._make_client().retain(
|
||||
bank_id=self._bank_id, content=content, context=context
|
||||
)
|
||||
)
|
||||
_run_sync(client.aretain(
|
||||
bank_id=self._bank_id, content=content, context=context
|
||||
))
|
||||
return json.dumps({"result": "Memory stored successfully."})
|
||||
except Exception as e:
|
||||
logger.warning("hindsight_retain failed: %s", e)
|
||||
return json.dumps({"error": f"Failed to store memory: {e}"})
|
||||
|
||||
elif tool_name == "hindsight_recall":
|
||||
@@ -319,16 +455,15 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
if not query:
|
||||
return json.dumps({"error": "Missing required parameter: query"})
|
||||
try:
|
||||
resp = _run_in_thread(
|
||||
lambda: self._make_client().recall(
|
||||
bank_id=self._bank_id, query=query, budget=self._budget
|
||||
)
|
||||
)
|
||||
resp = _run_sync(client.arecall(
|
||||
bank_id=self._bank_id, query=query, budget=self._budget
|
||||
))
|
||||
if not resp.results:
|
||||
return json.dumps({"result": "No relevant memories found."})
|
||||
lines = [f"{i}. {r.text}" for i, r in enumerate(resp.results, 1)]
|
||||
return json.dumps({"result": "\n".join(lines)})
|
||||
except Exception as e:
|
||||
logger.warning("hindsight_recall failed: %s", e)
|
||||
return json.dumps({"error": f"Failed to search memory: {e}"})
|
||||
|
||||
elif tool_name == "hindsight_reflect":
|
||||
@@ -336,21 +471,43 @@ class HindsightMemoryProvider(MemoryProvider):
|
||||
if not query:
|
||||
return json.dumps({"error": "Missing required parameter: query"})
|
||||
try:
|
||||
resp = _run_in_thread(
|
||||
lambda: self._make_client().reflect(
|
||||
bank_id=self._bank_id, query=query, budget=self._budget
|
||||
)
|
||||
)
|
||||
resp = _run_sync(client.areflect(
|
||||
bank_id=self._bank_id, query=query, budget=self._budget
|
||||
))
|
||||
return json.dumps({"result": resp.text or "No relevant memories found."})
|
||||
except Exception as e:
|
||||
logger.warning("hindsight_reflect failed: %s", e)
|
||||
return json.dumps({"error": f"Failed to reflect: {e}"})
|
||||
|
||||
return json.dumps({"error": f"Unknown tool: {tool_name}"})
|
||||
|
||||
def shutdown(self) -> None:
|
||||
global _loop, _loop_thread
|
||||
for t in (self._prefetch_thread, self._sync_thread):
|
||||
if t and t.is_alive():
|
||||
t.join(timeout=5.0)
|
||||
if self._client is not None:
|
||||
try:
|
||||
if self._mode == "local":
|
||||
# Use the public close() API. The RuntimeError from
|
||||
# aiohttp's "attached to a different loop" is expected
|
||||
# and harmless — the daemon keeps running independently.
|
||||
try:
|
||||
self._client.close()
|
||||
except RuntimeError:
|
||||
pass
|
||||
else:
|
||||
_run_sync(self._client.aclose())
|
||||
except Exception:
|
||||
pass
|
||||
self._client = None
|
||||
# Stop the background event loop so no tasks are pending at exit
|
||||
if _loop is not None and _loop.is_running():
|
||||
_loop.call_soon_threadsafe(_loop.stop)
|
||||
if _loop_thread is not None:
|
||||
_loop_thread.join(timeout=5.0)
|
||||
_loop = None
|
||||
_loop_thread = None
|
||||
|
||||
|
||||
def register(ctx) -> None:
|
||||
|
||||
@@ -3,6 +3,7 @@ version: 1.0.0
|
||||
description: "Hindsight — long-term memory with knowledge graph, entity resolution, and multi-strategy retrieval."
|
||||
pip_dependencies:
|
||||
- hindsight-client
|
||||
- hindsight-all
|
||||
requires_env:
|
||||
- HINDSIGHT_API_KEY
|
||||
hooks:
|
||||
|
||||
@@ -283,9 +283,9 @@ class OpenVikingMemoryProvider(MemoryProvider):
|
||||
# Provide brief info about the knowledge base
|
||||
try:
|
||||
# Check what's in the knowledge base via a root listing
|
||||
resp = self._client.post("/api/v1/browse", {"action": "stat", "path": "viking://"})
|
||||
result = resp.get("result", {})
|
||||
children = result.get("children", 0)
|
||||
resp = self._client.get("/api/v1/fs/ls", params={"uri": "viking://"})
|
||||
result = resp.get("result", [])
|
||||
children = len(result) if isinstance(result, list) else 0
|
||||
if children == 0:
|
||||
return ""
|
||||
return (
|
||||
@@ -495,16 +495,17 @@ class OpenVikingMemoryProvider(MemoryProvider):
|
||||
return json.dumps({"error": "uri is required"})
|
||||
|
||||
level = args.get("level", "overview")
|
||||
# Map our level names to OpenViking endpoints
|
||||
# Map our level names to OpenViking GET endpoints
|
||||
if level == "abstract":
|
||||
resp = self._client.post("/api/v1/read/abstract", {"uri": uri})
|
||||
resp = self._client.get("/api/v1/content/abstract", params={"uri": uri})
|
||||
elif level == "full":
|
||||
resp = self._client.post("/api/v1/read", {"uri": uri, "level": "read"})
|
||||
resp = self._client.get("/api/v1/content/read", params={"uri": uri})
|
||||
else: # overview
|
||||
resp = self._client.post("/api/v1/read", {"uri": uri, "level": "overview"})
|
||||
resp = self._client.get("/api/v1/content/overview", params={"uri": uri})
|
||||
|
||||
result = resp.get("result", {})
|
||||
content = result.get("content", "")
|
||||
result = resp.get("result", "")
|
||||
# result is a plain string from the content endpoints
|
||||
content = result if isinstance(result, str) else result.get("content", "")
|
||||
|
||||
# Truncate very long content to avoid flooding the context
|
||||
if len(content) > 8000:
|
||||
@@ -520,20 +521,21 @@ class OpenVikingMemoryProvider(MemoryProvider):
|
||||
action = args.get("action", "list")
|
||||
path = args.get("path", "viking://")
|
||||
|
||||
resp = self._client.post("/api/v1/browse", {
|
||||
"action": action,
|
||||
"path": path,
|
||||
})
|
||||
# Map action to the correct fs endpoint (all GET with uri= param)
|
||||
endpoint_map = {"tree": "/api/v1/fs/tree", "list": "/api/v1/fs/ls", "stat": "/api/v1/fs/stat"}
|
||||
endpoint = endpoint_map.get(action, "/api/v1/fs/ls")
|
||||
resp = self._client.get(endpoint, params={"uri": path})
|
||||
result = resp.get("result", {})
|
||||
|
||||
# Format for readability
|
||||
if action == "list" and "entries" in result:
|
||||
# Format list/tree results for readability
|
||||
if action in ("list", "tree") and isinstance(result, list):
|
||||
entries = []
|
||||
for e in result["entries"][:50]: # cap at 50 entries
|
||||
for e in result[:50]: # cap at 50 entries
|
||||
entries.append({
|
||||
"name": e.get("name", ""),
|
||||
"name": e.get("rel_path", e.get("name", "")),
|
||||
"uri": e.get("uri", ""),
|
||||
"type": "dir" if e.get("is_dir") else "file",
|
||||
"type": "dir" if e.get("isDir") else "file",
|
||||
"abstract": e.get("abstract", ""),
|
||||
})
|
||||
return json.dumps({"path": path, "entries": entries}, ensure_ascii=False)
|
||||
|
||||
|
||||
+17
-4
@@ -6649,8 +6649,8 @@ class AIAgent:
|
||||
# Plugin hook: pre_llm_call
|
||||
# Fired once per turn before the tool-calling loop. Plugins can
|
||||
# return a dict with a ``context`` key whose value is a string
|
||||
# that will be appended to the ephemeral system prompt for every
|
||||
# API call in this turn (not persisted to session DB or cache).
|
||||
# that will be injected at request time for every API call in
|
||||
# this turn (not persisted to session DB or cached prefix).
|
||||
_plugin_turn_context = ""
|
||||
try:
|
||||
from hermes_cli.plugins import invoke_hook as _invoke_hook
|
||||
@@ -6796,8 +6796,11 @@ class AIAgent:
|
||||
effective_system = active_system_prompt or ""
|
||||
if self.ephemeral_system_prompt:
|
||||
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
|
||||
# Plugin context from pre_llm_call hooks — ephemeral, not cached.
|
||||
if _plugin_turn_context:
|
||||
# Plugin context from pre_llm_call hooks.
|
||||
# For non-cached providers/requests we can append directly.
|
||||
# For Anthropic prompt-cached requests we inject it later as an
|
||||
# uncached system suffix block so the cache key stays stable.
|
||||
if _plugin_turn_context and not self._use_prompt_caching:
|
||||
effective_system = (effective_system + "\n\n" + _plugin_turn_context).strip()
|
||||
if effective_system:
|
||||
api_messages = [{"role": "system", "content": effective_system}] + api_messages
|
||||
@@ -6816,6 +6819,16 @@ class AIAgent:
|
||||
if self._use_prompt_caching:
|
||||
api_messages = apply_anthropic_cache_control(api_messages, cache_ttl=self._cache_ttl, native_anthropic=(self.api_mode == 'anthropic_messages'))
|
||||
|
||||
# Append plugin context AFTER cache markers so the system-level
|
||||
# cache key stays stable even when plugin output varies per turn.
|
||||
if _plugin_turn_context and api_messages and api_messages[0].get("role") == "system":
|
||||
_sys = api_messages[0].get("content", "")
|
||||
_blocks = list(_sys) if isinstance(_sys, list) else [{"type": "text", "text": _sys}] if isinstance(_sys, str) else []
|
||||
_blocks.append({"type": "text", "text": _plugin_turn_context})
|
||||
api_messages[0]["content"] = _blocks
|
||||
elif _plugin_turn_context:
|
||||
api_messages.insert(0, {"role": "system", "content": _plugin_turn_context})
|
||||
|
||||
# Safety net: strip orphaned tool results / add stubs for missing
|
||||
# results before sending to the API. Runs unconditionally — not
|
||||
# gated on context_compressor — so orphans from session loading or
|
||||
|
||||
@@ -547,3 +547,253 @@ class TestPluginMemoryDiscovery:
|
||||
"""load_memory_provider returns None for unknown names."""
|
||||
from plugins.memory import load_memory_provider
|
||||
assert load_memory_provider("nonexistent_provider") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Sequential dispatch routing tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSequentialDispatchRouting:
|
||||
"""Verify that memory provider tools are correctly routed through
|
||||
memory_manager.has_tool() and handle_tool_call().
|
||||
|
||||
This is a regression test for a bug where _execute_tool_calls_sequential
|
||||
in run_agent.py had its own inline dispatch chain that skipped
|
||||
memory_manager.has_tool(), causing all memory provider tools to fall
|
||||
through to the registry and return "Unknown tool". The fix added
|
||||
has_tool() + handle_tool_call() to the sequential path.
|
||||
|
||||
These tests verify the memory_manager contract that both dispatch
|
||||
paths rely on: has_tool() returns True for registered provider tools,
|
||||
and handle_tool_call() routes to the correct provider.
|
||||
"""
|
||||
|
||||
def test_has_tool_returns_true_for_provider_tools(self):
|
||||
"""has_tool returns True for tools registered by memory providers."""
|
||||
mgr = MemoryManager()
|
||||
provider = FakeMemoryProvider("ext", tools=[
|
||||
{"name": "ext_recall", "description": "Ext recall", "parameters": {}},
|
||||
{"name": "ext_retain", "description": "Ext retain", "parameters": {}},
|
||||
])
|
||||
mgr.add_provider(provider)
|
||||
|
||||
assert mgr.has_tool("ext_recall")
|
||||
assert mgr.has_tool("ext_retain")
|
||||
|
||||
def test_has_tool_returns_false_for_builtin_tools(self):
|
||||
"""has_tool returns False for agent-level tools (terminal, memory, etc.)."""
|
||||
mgr = MemoryManager()
|
||||
provider = FakeMemoryProvider("ext", tools=[
|
||||
{"name": "ext_recall", "description": "Ext", "parameters": {}},
|
||||
])
|
||||
mgr.add_provider(provider)
|
||||
|
||||
assert not mgr.has_tool("terminal")
|
||||
assert not mgr.has_tool("memory")
|
||||
assert not mgr.has_tool("todo")
|
||||
assert not mgr.has_tool("session_search")
|
||||
assert not mgr.has_tool("nonexistent")
|
||||
|
||||
def test_handle_tool_call_routes_to_provider(self):
|
||||
"""handle_tool_call dispatches to the correct provider's handler."""
|
||||
mgr = MemoryManager()
|
||||
provider = FakeMemoryProvider("hindsight", tools=[
|
||||
{"name": "hindsight_recall", "description": "Recall", "parameters": {}},
|
||||
{"name": "hindsight_retain", "description": "Retain", "parameters": {}},
|
||||
])
|
||||
mgr.add_provider(provider)
|
||||
|
||||
result = json.loads(mgr.handle_tool_call("hindsight_recall", {"query": "alice"}))
|
||||
assert result["handled"] == "hindsight_recall"
|
||||
assert result["args"] == {"query": "alice"}
|
||||
|
||||
def test_handle_tool_call_unknown_returns_error(self):
|
||||
"""handle_tool_call returns error for tools not in any provider."""
|
||||
mgr = MemoryManager()
|
||||
provider = FakeMemoryProvider("ext", tools=[
|
||||
{"name": "ext_recall", "description": "Ext", "parameters": {}},
|
||||
])
|
||||
mgr.add_provider(provider)
|
||||
|
||||
result = json.loads(mgr.handle_tool_call("terminal", {"command": "ls"}))
|
||||
assert "error" in result
|
||||
|
||||
def test_multiple_providers_route_to_correct_one(self):
|
||||
"""Tools from different providers route to the right handler."""
|
||||
mgr = MemoryManager()
|
||||
builtin = FakeMemoryProvider("builtin", tools=[
|
||||
{"name": "builtin_tool", "description": "Builtin", "parameters": {}},
|
||||
])
|
||||
external = FakeMemoryProvider("hindsight", tools=[
|
||||
{"name": "hindsight_recall", "description": "Recall", "parameters": {}},
|
||||
])
|
||||
mgr.add_provider(builtin)
|
||||
mgr.add_provider(external)
|
||||
|
||||
r1 = json.loads(mgr.handle_tool_call("builtin_tool", {}))
|
||||
assert r1["handled"] == "builtin_tool"
|
||||
|
||||
r2 = json.loads(mgr.handle_tool_call("hindsight_recall", {"query": "test"}))
|
||||
assert r2["handled"] == "hindsight_recall"
|
||||
|
||||
def test_tool_names_include_all_providers(self):
|
||||
"""get_all_tool_names returns tools from all registered providers."""
|
||||
mgr = MemoryManager()
|
||||
builtin = FakeMemoryProvider("builtin", tools=[
|
||||
{"name": "builtin_tool", "description": "B", "parameters": {}},
|
||||
])
|
||||
external = FakeMemoryProvider("ext", tools=[
|
||||
{"name": "ext_recall", "description": "E1", "parameters": {}},
|
||||
{"name": "ext_retain", "description": "E2", "parameters": {}},
|
||||
])
|
||||
mgr.add_provider(builtin)
|
||||
mgr.add_provider(external)
|
||||
|
||||
names = mgr.get_all_tool_names()
|
||||
assert names == {"builtin_tool", "ext_recall", "ext_retain"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Setup wizard field filtering tests (when clause and default_from)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestSetupFieldFiltering:
|
||||
"""Test the 'when' clause and 'default_from' logic used by the
|
||||
memory setup wizard in hermes_cli/memory_setup.py.
|
||||
|
||||
These features are generic — any memory plugin can use them in
|
||||
get_config_schema(). Currently used by the hindsight plugin.
|
||||
"""
|
||||
|
||||
def _filter_fields(self, schema, provider_config):
|
||||
"""Simulate the setup wizard's field filtering logic.
|
||||
|
||||
Returns list of (key, effective_default) for fields that pass
|
||||
the 'when' filter.
|
||||
"""
|
||||
results = []
|
||||
for field in schema:
|
||||
key = field["key"]
|
||||
default = field.get("default")
|
||||
|
||||
# Dynamic default
|
||||
default_from = field.get("default_from")
|
||||
if default_from and isinstance(default_from, dict):
|
||||
ref_field = default_from.get("field", "")
|
||||
ref_map = default_from.get("map", {})
|
||||
ref_value = provider_config.get(ref_field, "")
|
||||
if ref_value and ref_value in ref_map:
|
||||
default = ref_map[ref_value]
|
||||
|
||||
# When clause
|
||||
when = field.get("when")
|
||||
if when and isinstance(when, dict):
|
||||
if not all(provider_config.get(k) == v for k, v in when.items()):
|
||||
continue
|
||||
|
||||
results.append((key, default))
|
||||
return results
|
||||
|
||||
def test_when_clause_filters_fields(self):
|
||||
"""Fields with 'when' are skipped if the condition doesn't match."""
|
||||
schema = [
|
||||
{"key": "mode", "default": "cloud"},
|
||||
{"key": "api_url", "default": "https://api.example.com", "when": {"mode": "cloud"}},
|
||||
{"key": "api_key", "default": None, "when": {"mode": "cloud"}},
|
||||
{"key": "llm_provider", "default": "openai", "when": {"mode": "local"}},
|
||||
{"key": "llm_model", "default": "gpt-4o-mini", "when": {"mode": "local"}},
|
||||
{"key": "budget", "default": "mid"},
|
||||
]
|
||||
|
||||
# Cloud mode: should see mode, api_url, api_key, budget
|
||||
cloud_fields = self._filter_fields(schema, {"mode": "cloud"})
|
||||
cloud_keys = [k for k, _ in cloud_fields]
|
||||
assert cloud_keys == ["mode", "api_url", "api_key", "budget"]
|
||||
|
||||
# Local mode: should see mode, llm_provider, llm_model, budget
|
||||
local_fields = self._filter_fields(schema, {"mode": "local"})
|
||||
local_keys = [k for k, _ in local_fields]
|
||||
assert local_keys == ["mode", "llm_provider", "llm_model", "budget"]
|
||||
|
||||
def test_when_clause_no_condition_always_shown(self):
|
||||
"""Fields without 'when' are always included."""
|
||||
schema = [
|
||||
{"key": "bank_id", "default": "hermes"},
|
||||
{"key": "budget", "default": "mid"},
|
||||
]
|
||||
fields = self._filter_fields(schema, {"mode": "cloud"})
|
||||
assert [k for k, _ in fields] == ["bank_id", "budget"]
|
||||
|
||||
def test_default_from_resolves_dynamic_default(self):
|
||||
"""default_from looks up the default from another field's value."""
|
||||
provider_models = {
|
||||
"openai": "gpt-4o-mini",
|
||||
"groq": "openai/gpt-oss-120b",
|
||||
"anthropic": "claude-haiku-4-5",
|
||||
}
|
||||
schema = [
|
||||
{"key": "llm_provider", "default": "openai"},
|
||||
{"key": "llm_model", "default": "gpt-4o-mini",
|
||||
"default_from": {"field": "llm_provider", "map": provider_models}},
|
||||
]
|
||||
|
||||
# Groq selected: model should default to groq's default
|
||||
fields = self._filter_fields(schema, {"llm_provider": "groq"})
|
||||
model_default = dict(fields)["llm_model"]
|
||||
assert model_default == "openai/gpt-oss-120b"
|
||||
|
||||
# Anthropic selected
|
||||
fields = self._filter_fields(schema, {"llm_provider": "anthropic"})
|
||||
model_default = dict(fields)["llm_model"]
|
||||
assert model_default == "claude-haiku-4-5"
|
||||
|
||||
def test_default_from_falls_back_to_static_default(self):
|
||||
"""default_from falls back to static default if provider not in map."""
|
||||
schema = [
|
||||
{"key": "llm_model", "default": "gpt-4o-mini",
|
||||
"default_from": {"field": "llm_provider", "map": {"groq": "openai/gpt-oss-120b"}}},
|
||||
]
|
||||
|
||||
# Unknown provider: should fall back to static default
|
||||
fields = self._filter_fields(schema, {"llm_provider": "unknown_provider"})
|
||||
model_default = dict(fields)["llm_model"]
|
||||
assert model_default == "gpt-4o-mini"
|
||||
|
||||
def test_default_from_with_no_ref_value(self):
|
||||
"""default_from keeps static default if referenced field is not set."""
|
||||
schema = [
|
||||
{"key": "llm_model", "default": "gpt-4o-mini",
|
||||
"default_from": {"field": "llm_provider", "map": {"groq": "openai/gpt-oss-120b"}}},
|
||||
]
|
||||
|
||||
# No provider set at all
|
||||
fields = self._filter_fields(schema, {})
|
||||
model_default = dict(fields)["llm_model"]
|
||||
assert model_default == "gpt-4o-mini"
|
||||
|
||||
def test_when_and_default_from_combined(self):
|
||||
"""when clause and default_from work together correctly."""
|
||||
provider_models = {"groq": "openai/gpt-oss-120b", "openai": "gpt-4o-mini"}
|
||||
schema = [
|
||||
{"key": "mode", "default": "local"},
|
||||
{"key": "llm_provider", "default": "openai", "when": {"mode": "local"}},
|
||||
{"key": "llm_model", "default": "gpt-4o-mini",
|
||||
"default_from": {"field": "llm_provider", "map": provider_models},
|
||||
"when": {"mode": "local"}},
|
||||
{"key": "api_url", "default": "https://api.example.com", "when": {"mode": "cloud"}},
|
||||
]
|
||||
|
||||
# Local + groq: should see llm_model with groq default, no api_url
|
||||
fields = self._filter_fields(schema, {"mode": "local", "llm_provider": "groq"})
|
||||
keys = [k for k, _ in fields]
|
||||
assert "llm_model" in keys
|
||||
assert "api_url" not in keys
|
||||
assert dict(fields)["llm_model"] == "openai/gpt-oss-120b"
|
||||
|
||||
# Cloud: should see api_url, no llm_model
|
||||
fields = self._filter_fields(schema, {"mode": "cloud"})
|
||||
keys = [k for k, _ in fields]
|
||||
assert "api_url" in keys
|
||||
assert "llm_model" not in keys
|
||||
|
||||
@@ -0,0 +1,300 @@
|
||||
"""Tests for cron job script injection feature.
|
||||
|
||||
Tests cover:
|
||||
- Script field in job creation / storage / update
|
||||
- Script execution and output injection into prompts
|
||||
- Error handling (missing script, timeout, non-zero exit)
|
||||
- Path resolution (absolute, relative to HERMES_HOME/scripts/)
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import stat
|
||||
import sys
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure project root is importable
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def cron_env(tmp_path, monkeypatch):
|
||||
"""Isolated cron environment with temp HERMES_HOME."""
|
||||
hermes_home = tmp_path / ".hermes"
|
||||
hermes_home.mkdir()
|
||||
(hermes_home / "cron").mkdir()
|
||||
(hermes_home / "cron" / "output").mkdir()
|
||||
(hermes_home / "scripts").mkdir()
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
|
||||
# Clear cached module-level paths
|
||||
import cron.jobs as jobs_mod
|
||||
monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home)
|
||||
monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron")
|
||||
monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json")
|
||||
monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output")
|
||||
|
||||
return hermes_home
|
||||
|
||||
|
||||
class TestJobScriptField:
|
||||
"""Test that the script field is stored and retrieved correctly."""
|
||||
|
||||
def test_create_job_with_script(self, cron_env):
|
||||
from cron.jobs import create_job, get_job
|
||||
|
||||
job = create_job(
|
||||
prompt="Analyze the data",
|
||||
schedule="every 30m",
|
||||
script="/path/to/monitor.py",
|
||||
)
|
||||
assert job["script"] == "/path/to/monitor.py"
|
||||
|
||||
loaded = get_job(job["id"])
|
||||
assert loaded["script"] == "/path/to/monitor.py"
|
||||
|
||||
def test_create_job_without_script(self, cron_env):
|
||||
from cron.jobs import create_job
|
||||
|
||||
job = create_job(prompt="Hello", schedule="every 1h")
|
||||
assert job.get("script") is None
|
||||
|
||||
def test_create_job_empty_script_normalized_to_none(self, cron_env):
|
||||
from cron.jobs import create_job
|
||||
|
||||
job = create_job(prompt="Hello", schedule="every 1h", script=" ")
|
||||
assert job.get("script") is None
|
||||
|
||||
def test_update_job_add_script(self, cron_env):
|
||||
from cron.jobs import create_job, update_job
|
||||
|
||||
job = create_job(prompt="Hello", schedule="every 1h")
|
||||
assert job.get("script") is None
|
||||
|
||||
updated = update_job(job["id"], {"script": "/new/script.py"})
|
||||
assert updated["script"] == "/new/script.py"
|
||||
|
||||
def test_update_job_clear_script(self, cron_env):
|
||||
from cron.jobs import create_job, update_job
|
||||
|
||||
job = create_job(prompt="Hello", schedule="every 1h", script="/some/script.py")
|
||||
assert job["script"] == "/some/script.py"
|
||||
|
||||
updated = update_job(job["id"], {"script": None})
|
||||
assert updated.get("script") is None
|
||||
|
||||
|
||||
class TestRunJobScript:
|
||||
"""Test the _run_job_script() function."""
|
||||
|
||||
def test_successful_script(self, cron_env):
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
script = cron_env / "scripts" / "test.py"
|
||||
script.write_text('print("hello from script")\n')
|
||||
|
||||
success, output = _run_job_script(str(script))
|
||||
assert success is True
|
||||
assert output == "hello from script"
|
||||
|
||||
def test_script_relative_path(self, cron_env):
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
script = cron_env / "scripts" / "relative.py"
|
||||
script.write_text('print("relative works")\n')
|
||||
|
||||
success, output = _run_job_script("relative.py")
|
||||
assert success is True
|
||||
assert output == "relative works"
|
||||
|
||||
def test_script_not_found(self, cron_env):
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
success, output = _run_job_script("/nonexistent/script.py")
|
||||
assert success is False
|
||||
assert "not found" in output.lower()
|
||||
|
||||
def test_script_nonzero_exit(self, cron_env):
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
script = cron_env / "scripts" / "fail.py"
|
||||
script.write_text(textwrap.dedent("""\
|
||||
import sys
|
||||
print("partial output")
|
||||
print("error info", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
"""))
|
||||
|
||||
success, output = _run_job_script(str(script))
|
||||
assert success is False
|
||||
assert "exited with code 1" in output
|
||||
assert "error info" in output
|
||||
|
||||
def test_script_empty_output(self, cron_env):
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
script = cron_env / "scripts" / "empty.py"
|
||||
script.write_text("# no output\n")
|
||||
|
||||
success, output = _run_job_script(str(script))
|
||||
assert success is True
|
||||
assert output == ""
|
||||
|
||||
def test_script_timeout(self, cron_env, monkeypatch):
|
||||
from cron import scheduler as sched_mod
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
# Use a very short timeout
|
||||
monkeypatch.setattr(sched_mod, "_SCRIPT_TIMEOUT", 1)
|
||||
|
||||
script = cron_env / "scripts" / "slow.py"
|
||||
script.write_text("import time; time.sleep(30)\n")
|
||||
|
||||
success, output = _run_job_script(str(script))
|
||||
assert success is False
|
||||
assert "timed out" in output.lower()
|
||||
|
||||
def test_script_json_output(self, cron_env):
|
||||
"""Scripts can output structured JSON for the LLM to parse."""
|
||||
from cron.scheduler import _run_job_script
|
||||
|
||||
script = cron_env / "scripts" / "json_out.py"
|
||||
script.write_text(textwrap.dedent("""\
|
||||
import json
|
||||
data = {"new_prs": [{"number": 42, "title": "Fix bug"}]}
|
||||
print(json.dumps(data, indent=2))
|
||||
"""))
|
||||
|
||||
success, output = _run_job_script(str(script))
|
||||
assert success is True
|
||||
parsed = json.loads(output)
|
||||
assert parsed["new_prs"][0]["number"] == 42
|
||||
|
||||
|
||||
class TestBuildJobPromptWithScript:
|
||||
"""Test that script output is injected into the prompt."""
|
||||
|
||||
def test_script_output_injected(self, cron_env):
|
||||
from cron.scheduler import _build_job_prompt
|
||||
|
||||
script = cron_env / "scripts" / "data.py"
|
||||
script.write_text('print("new PR: #123 fix typo")\n')
|
||||
|
||||
job = {
|
||||
"prompt": "Report any notable changes.",
|
||||
"script": str(script),
|
||||
}
|
||||
prompt = _build_job_prompt(job)
|
||||
assert "## Script Output" in prompt
|
||||
assert "new PR: #123 fix typo" in prompt
|
||||
assert "Report any notable changes." in prompt
|
||||
|
||||
def test_script_error_injected(self, cron_env):
|
||||
from cron.scheduler import _build_job_prompt
|
||||
|
||||
job = {
|
||||
"prompt": "Report status.",
|
||||
"script": "/nonexistent/script.py",
|
||||
}
|
||||
prompt = _build_job_prompt(job)
|
||||
assert "## Script Error" in prompt
|
||||
assert "not found" in prompt.lower()
|
||||
assert "Report status." in prompt
|
||||
|
||||
def test_no_script_unchanged(self, cron_env):
|
||||
from cron.scheduler import _build_job_prompt
|
||||
|
||||
job = {"prompt": "Simple job."}
|
||||
prompt = _build_job_prompt(job)
|
||||
assert "## Script Output" not in prompt
|
||||
assert "Simple job." in prompt
|
||||
|
||||
def test_script_empty_output_noted(self, cron_env):
|
||||
from cron.scheduler import _build_job_prompt
|
||||
|
||||
script = cron_env / "scripts" / "noop.py"
|
||||
script.write_text("# nothing\n")
|
||||
|
||||
job = {
|
||||
"prompt": "Check status.",
|
||||
"script": str(script),
|
||||
}
|
||||
prompt = _build_job_prompt(job)
|
||||
assert "no output" in prompt.lower()
|
||||
assert "Check status." in prompt
|
||||
|
||||
|
||||
class TestCronjobToolScript:
|
||||
"""Test the cronjob tool's script parameter."""
|
||||
|
||||
def test_create_with_script(self, cron_env, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
result = json.loads(cronjob(
|
||||
action="create",
|
||||
schedule="every 1h",
|
||||
prompt="Monitor things",
|
||||
script="/home/user/monitor.py",
|
||||
))
|
||||
assert result["success"] is True
|
||||
assert result["job"]["script"] == "/home/user/monitor.py"
|
||||
|
||||
def test_update_script(self, cron_env, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
create_result = json.loads(cronjob(
|
||||
action="create",
|
||||
schedule="every 1h",
|
||||
prompt="Monitor things",
|
||||
))
|
||||
job_id = create_result["job_id"]
|
||||
|
||||
update_result = json.loads(cronjob(
|
||||
action="update",
|
||||
job_id=job_id,
|
||||
script="/new/script.py",
|
||||
))
|
||||
assert update_result["success"] is True
|
||||
assert update_result["job"]["script"] == "/new/script.py"
|
||||
|
||||
def test_clear_script(self, cron_env, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
create_result = json.loads(cronjob(
|
||||
action="create",
|
||||
schedule="every 1h",
|
||||
prompt="Monitor things",
|
||||
script="/some/script.py",
|
||||
))
|
||||
job_id = create_result["job_id"]
|
||||
|
||||
update_result = json.loads(cronjob(
|
||||
action="update",
|
||||
job_id=job_id,
|
||||
script="",
|
||||
))
|
||||
assert update_result["success"] is True
|
||||
assert "script" not in update_result["job"]
|
||||
|
||||
def test_list_shows_script(self, cron_env, monkeypatch):
|
||||
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
|
||||
from tools.cronjob_tools import cronjob
|
||||
|
||||
cronjob(
|
||||
action="create",
|
||||
schedule="every 1h",
|
||||
prompt="Monitor things",
|
||||
script="/path/to/script.py",
|
||||
)
|
||||
|
||||
list_result = json.loads(cronjob(action="list"))
|
||||
assert list_result["success"] is True
|
||||
assert len(list_result["jobs"]) == 1
|
||||
assert list_result["jobs"][0]["script"] == "/path/to/script.py"
|
||||
@@ -591,7 +591,16 @@ class TestBlockingApprovalE2E:
|
||||
]
|
||||
for t in threads:
|
||||
t.start()
|
||||
time.sleep(0.3)
|
||||
|
||||
# Wait for both threads to register pending approvals instead of
|
||||
# relying on a fixed sleep. The approval module stores entries in
|
||||
# _gateway_queues[session_key] — poll until we see 2 entries.
|
||||
from tools.approval import _gateway_queues
|
||||
deadline = time.monotonic() + 5
|
||||
while time.monotonic() < deadline:
|
||||
if len(_gateway_queues.get(session_key, [])) >= 2:
|
||||
break
|
||||
time.sleep(0.05)
|
||||
|
||||
# Approve first, deny second
|
||||
resolve_gateway_approval(session_key, "once") # oldest
|
||||
|
||||
@@ -0,0 +1,492 @@
|
||||
"""Tests for Matrix require-mention gating and auto-thread features."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import PlatformConfig
|
||||
|
||||
|
||||
def _ensure_nio_mock():
|
||||
"""Install a mock nio module when matrix-nio isn't available."""
|
||||
if "nio" in sys.modules and hasattr(sys.modules["nio"], "__file__"):
|
||||
return
|
||||
nio_mod = MagicMock()
|
||||
nio_mod.MegolmEvent = type("MegolmEvent", (), {})
|
||||
nio_mod.RoomMessageText = type("RoomMessageText", (), {})
|
||||
nio_mod.RoomMessageImage = type("RoomMessageImage", (), {})
|
||||
nio_mod.RoomMessageAudio = type("RoomMessageAudio", (), {})
|
||||
nio_mod.RoomMessageVideo = type("RoomMessageVideo", (), {})
|
||||
nio_mod.RoomMessageFile = type("RoomMessageFile", (), {})
|
||||
nio_mod.DownloadResponse = type("DownloadResponse", (), {})
|
||||
nio_mod.MemoryDownloadResponse = type("MemoryDownloadResponse", (), {})
|
||||
nio_mod.InviteMemberEvent = type("InviteMemberEvent", (), {})
|
||||
sys.modules.setdefault("nio", nio_mod)
|
||||
|
||||
|
||||
_ensure_nio_mock()
|
||||
|
||||
|
||||
def _make_adapter(tmp_path=None):
|
||||
"""Create a MatrixAdapter with mocked config."""
|
||||
from gateway.platforms.matrix import MatrixAdapter
|
||||
|
||||
config = PlatformConfig(
|
||||
enabled=True,
|
||||
token="syt_test_token",
|
||||
extra={
|
||||
"homeserver": "https://matrix.example.org",
|
||||
"user_id": "@hermes:example.org",
|
||||
},
|
||||
)
|
||||
adapter = MatrixAdapter(config)
|
||||
adapter.handle_message = AsyncMock()
|
||||
adapter._startup_ts = time.time() - 10 # avoid startup grace filter
|
||||
return adapter
|
||||
|
||||
|
||||
def _make_room(room_id="!room1:example.org", member_count=5, is_dm=False):
|
||||
"""Create a fake Matrix room."""
|
||||
room = SimpleNamespace(
|
||||
room_id=room_id,
|
||||
member_count=member_count,
|
||||
users={},
|
||||
)
|
||||
return room
|
||||
|
||||
|
||||
def _make_event(
|
||||
body,
|
||||
sender="@alice:example.org",
|
||||
event_id="$evt1",
|
||||
formatted_body=None,
|
||||
thread_id=None,
|
||||
):
|
||||
"""Create a fake RoomMessageText event."""
|
||||
content = {"body": body, "msgtype": "m.text"}
|
||||
if formatted_body:
|
||||
content["formatted_body"] = formatted_body
|
||||
content["format"] = "org.matrix.custom.html"
|
||||
|
||||
relates_to = {}
|
||||
if thread_id:
|
||||
relates_to["rel_type"] = "m.thread"
|
||||
relates_to["event_id"] = thread_id
|
||||
if relates_to:
|
||||
content["m.relates_to"] = relates_to
|
||||
|
||||
return SimpleNamespace(
|
||||
sender=sender,
|
||||
event_id=event_id,
|
||||
server_timestamp=int(time.time() * 1000),
|
||||
body=body,
|
||||
source={"content": content},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mention detection helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestIsBotMentioned:
|
||||
def setup_method(self):
|
||||
self.adapter = _make_adapter()
|
||||
|
||||
def test_full_user_id_in_body(self):
|
||||
assert self.adapter._is_bot_mentioned("hey @hermes:example.org help")
|
||||
|
||||
def test_localpart_in_body(self):
|
||||
assert self.adapter._is_bot_mentioned("hermes can you help?")
|
||||
|
||||
def test_localpart_case_insensitive(self):
|
||||
assert self.adapter._is_bot_mentioned("HERMES can you help?")
|
||||
|
||||
def test_matrix_pill_in_formatted_body(self):
|
||||
html = '<a href="https://matrix.to/#/@hermes:example.org">Hermes</a> help'
|
||||
assert self.adapter._is_bot_mentioned("Hermes help", html)
|
||||
|
||||
def test_no_mention(self):
|
||||
assert not self.adapter._is_bot_mentioned("hello everyone")
|
||||
|
||||
def test_empty_body(self):
|
||||
assert not self.adapter._is_bot_mentioned("")
|
||||
|
||||
def test_partial_localpart_no_match(self):
|
||||
# "hermesbot" should not match word-boundary check for "hermes"
|
||||
assert not self.adapter._is_bot_mentioned("hermesbot is here")
|
||||
|
||||
|
||||
class TestStripMention:
|
||||
def setup_method(self):
|
||||
self.adapter = _make_adapter()
|
||||
|
||||
def test_strip_full_user_id(self):
|
||||
result = self.adapter._strip_mention("@hermes:example.org help me")
|
||||
assert result == "help me"
|
||||
|
||||
def test_strip_localpart(self):
|
||||
result = self.adapter._strip_mention("hermes help me")
|
||||
assert result == "help me"
|
||||
|
||||
def test_strip_returns_empty_for_mention_only(self):
|
||||
result = self.adapter._strip_mention("@hermes:example.org")
|
||||
assert result == ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Require-mention gating in _on_room_message
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_default_ignores_unmentioned(monkeypatch):
|
||||
"""Default (require_mention=true): messages without mention are ignored."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("hello everyone")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_not_awaited()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_default_processes_mentioned(monkeypatch):
|
||||
"""Default: messages with mention are processed, mention stripped."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("@hermes:example.org help me")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.text == "help me"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_html_pill(monkeypatch):
|
||||
"""Bot mentioned via HTML pill should be processed."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
formatted = '<a href="https://matrix.to/#/@hermes:example.org">Hermes</a> help'
|
||||
event = _make_event("Hermes help", formatted_body=formatted)
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_dm_always_responds(monkeypatch):
|
||||
"""DMs always respond regardless of mention setting."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
# member_count=2 triggers DM detection
|
||||
room = _make_room(member_count=2)
|
||||
event = _make_event("hello without mention")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dm_strips_mention(monkeypatch):
|
||||
"""DMs strip mention from body, matching Discord behavior."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room(member_count=2)
|
||||
event = _make_event("@hermes:example.org help me")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.text == "help me"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bare_mention_passes_empty_string(monkeypatch):
|
||||
"""A message that is only a mention should pass through as empty, not be dropped."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("@hermes:example.org")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.text == ""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_free_response_room(monkeypatch):
|
||||
"""Free-response rooms bypass mention requirement."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", "!room1:example.org,!room2:example.org")
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room(room_id="!room1:example.org")
|
||||
event = _make_event("hello without mention")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_bot_participated_thread(monkeypatch):
|
||||
"""Threads with prior bot participation bypass mention requirement."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
adapter._bot_participated_threads.add("$thread1")
|
||||
|
||||
room = _make_room()
|
||||
event = _make_event("hello without mention", thread_id="$thread1")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_require_mention_disabled(monkeypatch):
|
||||
"""MATRIX_REQUIRE_MENTION=false: all messages processed."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("hello without mention")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.text == "hello without mention"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auto-thread in _on_room_message
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_default_creates_thread(monkeypatch):
|
||||
"""Default (auto_thread=true): sets thread_id to event.event_id."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
|
||||
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("hello", event_id="$msg1")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.source.thread_id == "$msg1"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_preserves_existing_thread(monkeypatch):
|
||||
"""If message is already in a thread, thread_id is not overridden."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
|
||||
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
|
||||
|
||||
adapter = _make_adapter()
|
||||
adapter._bot_participated_threads.add("$thread_root")
|
||||
room = _make_room()
|
||||
event = _make_event("reply in thread", thread_id="$thread_root")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.source.thread_id == "$thread_root"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_skips_dm(monkeypatch):
|
||||
"""DMs should not get auto-threaded."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
|
||||
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room(member_count=2)
|
||||
event = _make_event("hello dm", event_id="$dm1")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.source.thread_id is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_disabled(monkeypatch):
|
||||
"""MATRIX_AUTO_THREAD=false: thread_id stays None."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("hello", event_id="$msg1")
|
||||
|
||||
await adapter._on_room_message(room, event)
|
||||
adapter.handle_message.assert_awaited_once()
|
||||
msg = adapter.handle_message.await_args.args[0]
|
||||
assert msg.source.thread_id is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_thread_tracks_participation(monkeypatch):
|
||||
"""Auto-created threads are tracked in _bot_participated_threads."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
|
||||
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
|
||||
|
||||
adapter = _make_adapter()
|
||||
room = _make_room()
|
||||
event = _make_event("hello", event_id="$msg1")
|
||||
|
||||
with patch.object(adapter, "_save_participated_threads"):
|
||||
await adapter._on_room_message(room, event)
|
||||
|
||||
assert "$msg1" in adapter._bot_participated_threads
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Thread persistence
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestThreadPersistence:
|
||||
def test_empty_state_file(self, tmp_path, monkeypatch):
|
||||
"""No state file → empty set."""
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
|
||||
staticmethod(lambda: tmp_path / "matrix_threads.json"),
|
||||
)
|
||||
adapter = _make_adapter()
|
||||
loaded = adapter._load_participated_threads()
|
||||
assert loaded == set()
|
||||
|
||||
def test_track_thread_persists(self, tmp_path, monkeypatch):
|
||||
"""_track_thread writes to disk."""
|
||||
state_path = tmp_path / "matrix_threads.json"
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
|
||||
staticmethod(lambda: state_path),
|
||||
)
|
||||
adapter = _make_adapter()
|
||||
adapter._track_thread("$thread_abc")
|
||||
|
||||
data = json.loads(state_path.read_text())
|
||||
assert "$thread_abc" in data
|
||||
|
||||
def test_threads_survive_reload(self, tmp_path, monkeypatch):
|
||||
"""Persisted threads are loaded by a new adapter instance."""
|
||||
state_path = tmp_path / "matrix_threads.json"
|
||||
state_path.write_text(json.dumps(["$t1", "$t2"]))
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
|
||||
staticmethod(lambda: state_path),
|
||||
)
|
||||
adapter = _make_adapter()
|
||||
assert "$t1" in adapter._bot_participated_threads
|
||||
assert "$t2" in adapter._bot_participated_threads
|
||||
|
||||
def test_cap_max_tracked_threads(self, tmp_path, monkeypatch):
|
||||
"""Thread set is trimmed to _MAX_TRACKED_THREADS."""
|
||||
state_path = tmp_path / "matrix_threads.json"
|
||||
monkeypatch.setattr(
|
||||
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
|
||||
staticmethod(lambda: state_path),
|
||||
)
|
||||
adapter = _make_adapter()
|
||||
adapter._MAX_TRACKED_THREADS = 5
|
||||
|
||||
for i in range(10):
|
||||
adapter._bot_participated_threads.add(f"$t{i}")
|
||||
adapter._save_participated_threads()
|
||||
|
||||
data = json.loads(state_path.read_text())
|
||||
assert len(data) == 5
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# YAML config bridge
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestMatrixConfigBridge:
|
||||
def test_yaml_bridge_sets_env_vars(self, monkeypatch, tmp_path):
|
||||
"""Matrix YAML config should bridge to env vars."""
|
||||
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
|
||||
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
|
||||
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
|
||||
|
||||
yaml_content = {
|
||||
"matrix": {
|
||||
"require_mention": False,
|
||||
"free_response_rooms": ["!room1:example.org", "!room2:example.org"],
|
||||
"auto_thread": False,
|
||||
}
|
||||
}
|
||||
|
||||
import os
|
||||
import yaml
|
||||
|
||||
config_file = tmp_path / "config.yaml"
|
||||
config_file.write_text(yaml.dump(yaml_content))
|
||||
|
||||
# Simulate the bridge logic from gateway/config.py
|
||||
yaml_cfg = yaml.safe_load(config_file.read_text())
|
||||
matrix_cfg = yaml_cfg.get("matrix", {})
|
||||
if isinstance(matrix_cfg, dict):
|
||||
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
|
||||
frc = matrix_cfg.get("free_response_rooms")
|
||||
if frc is not None and not os.getenv("MATRIX_FREE_RESPONSE_ROOMS"):
|
||||
if isinstance(frc, list):
|
||||
frc = ",".join(str(v) for v in frc)
|
||||
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", str(frc))
|
||||
if "auto_thread" in matrix_cfg and not os.getenv("MATRIX_AUTO_THREAD"):
|
||||
monkeypatch.setenv("MATRIX_AUTO_THREAD", str(matrix_cfg["auto_thread"]).lower())
|
||||
|
||||
assert os.getenv("MATRIX_REQUIRE_MENTION") == "false"
|
||||
assert os.getenv("MATRIX_FREE_RESPONSE_ROOMS") == "!room1:example.org,!room2:example.org"
|
||||
assert os.getenv("MATRIX_AUTO_THREAD") == "false"
|
||||
|
||||
def test_env_vars_take_precedence_over_yaml(self, monkeypatch):
|
||||
"""Env vars should not be overwritten by YAML values."""
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "true")
|
||||
|
||||
import os
|
||||
yaml_cfg = {"matrix": {"require_mention": False}}
|
||||
matrix_cfg = yaml_cfg.get("matrix", {})
|
||||
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
|
||||
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
|
||||
|
||||
assert os.getenv("MATRIX_REQUIRE_MENTION") == "true"
|
||||
@@ -7,7 +7,7 @@ Verifies that:
|
||||
"""
|
||||
|
||||
import pytest
|
||||
pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
|
||||
#pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
|
||||
|
||||
|
||||
|
||||
@@ -318,12 +318,13 @@ class TestPreflightCompression:
|
||||
def test_preflight_compresses_oversized_history(self, agent):
|
||||
"""When loaded history exceeds the model's context threshold, compress before API call."""
|
||||
agent.compression_enabled = True
|
||||
# Set a very small context so the history is "oversized"
|
||||
agent.context_compressor.context_length = 100
|
||||
agent.context_compressor.threshold_tokens = 85 # 85% of 100
|
||||
# Set a small context so the history is "oversized", but large enough
|
||||
# that the compressed result (2 short messages) fits in a single pass.
|
||||
agent.context_compressor.context_length = 2000
|
||||
agent.context_compressor.threshold_tokens = 200
|
||||
|
||||
# Build a history that will be large enough to trigger preflight
|
||||
# (each message ~20 chars = ~5 tokens, 20 messages = ~100 tokens > 85 threshold)
|
||||
# (each message ~50 chars ≈ 13 tokens, 40 messages ≈ 520 tokens > 200 threshold)
|
||||
big_history = []
|
||||
for i in range(20):
|
||||
big_history.append({"role": "user", "content": f"Message number {i} with some extra text padding"})
|
||||
@@ -338,7 +339,7 @@ class TestPreflightCompression:
|
||||
patch.object(agent, "_save_trajectory"),
|
||||
patch.object(agent, "_cleanup_task_resources"),
|
||||
):
|
||||
# Simulate compression reducing messages
|
||||
# Simulate compression reducing messages to a small set that fits
|
||||
mock_compress.return_value = (
|
||||
[
|
||||
{"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"},
|
||||
@@ -411,7 +412,7 @@ class TestToolResultPreflightCompression:
|
||||
"""When tool results push estimated tokens past threshold, compress before next call."""
|
||||
agent.compression_enabled = True
|
||||
agent.context_compressor.context_length = 200_000
|
||||
agent.context_compressor.threshold_tokens = 140_000
|
||||
agent.context_compressor.threshold_tokens = 130_000 # below the 135k reported usage
|
||||
agent.context_compressor.last_prompt_tokens = 130_000
|
||||
agent.context_compressor.last_completion_tokens = 5_000
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
pytestmark = pytest.mark.skip(reason="Live API integration test — hangs in batch runs")
|
||||
# pytestmark removed — tests skip gracefully via OPENROUTER_API_KEY check on line 59
|
||||
|
||||
# Ensure repo root is importable
|
||||
_repo_root = Path(__file__).resolve().parent.parent
|
||||
|
||||
@@ -1573,6 +1573,40 @@ class TestRunConversation:
|
||||
assert "Local/custom backend returned reasoning-only output" in result["error"]
|
||||
assert "wrong /v1 endpoint" in result["error"]
|
||||
|
||||
def test_plugin_context_is_uncached_system_suffix_when_prompt_caching_enabled(self, agent):
|
||||
self._setup_agent(agent)
|
||||
agent._use_prompt_caching = True
|
||||
|
||||
captured = {}
|
||||
|
||||
def _fake_api_call(api_kwargs):
|
||||
captured["kwargs"] = api_kwargs
|
||||
return _mock_response(content="ok", finish_reason="stop")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"hermes_cli.plugins.invoke_hook",
|
||||
return_value=[{"context": "plugin-turn-context"}],
|
||||
),
|
||||
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
|
||||
patch.object(agent, "_persist_session"),
|
||||
patch.object(agent, "_save_trajectory"),
|
||||
patch.object(agent, "_cleanup_task_resources"),
|
||||
):
|
||||
result = agent.run_conversation("hello")
|
||||
|
||||
assert result["completed"] is True
|
||||
assert result["final_response"] == "ok"
|
||||
messages = captured["kwargs"]["messages"]
|
||||
assert messages[0]["role"] == "system"
|
||||
|
||||
system_blocks = messages[0]["content"]
|
||||
assert isinstance(system_blocks, list)
|
||||
assert system_blocks[0]["text"] == "You are helpful."
|
||||
assert system_blocks[0]["cache_control"]["type"] == "ephemeral"
|
||||
assert system_blocks[-1]["text"] == "plugin-turn-context"
|
||||
assert "cache_control" not in system_blocks[-1]
|
||||
|
||||
def test_nous_401_refreshes_after_remint_and_retries(self, agent):
|
||||
self._setup_agent(agent)
|
||||
agent.provider = "nous"
|
||||
|
||||
@@ -13,7 +13,7 @@ Run with: python -m pytest tests/test_code_execution.py -v
|
||||
"""
|
||||
|
||||
import pytest
|
||||
pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
|
||||
# pytestmark removed — tests run fine (61 pass, ~99s)
|
||||
|
||||
|
||||
import json
|
||||
|
||||
@@ -9,7 +9,7 @@ asserts zero contamination from shell noise via _assert_clean().
|
||||
"""
|
||||
|
||||
import pytest
|
||||
pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -61,7 +61,8 @@ class TestProbeMcpServerTools:
|
||||
async def fake_connect(name, cfg):
|
||||
return mock_server
|
||||
|
||||
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
|
||||
patch("tools.mcp_tool._ensure_mcp_loop"), \
|
||||
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
|
||||
@@ -102,7 +103,8 @@ class TestProbeMcpServerTools:
|
||||
raise ConnectionError("Server not found")
|
||||
return mock_server
|
||||
|
||||
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
|
||||
patch("tools.mcp_tool._ensure_mcp_loop"), \
|
||||
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
|
||||
@@ -135,7 +137,8 @@ class TestProbeMcpServerTools:
|
||||
async def fake_connect(name, cfg):
|
||||
return mock_server
|
||||
|
||||
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
|
||||
patch("tools.mcp_tool._ensure_mcp_loop"), \
|
||||
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
|
||||
@@ -159,7 +162,8 @@ class TestProbeMcpServerTools:
|
||||
"""_stop_mcp_loop is called even when probe fails."""
|
||||
config = {"github": {"command": "npx", "connect_timeout": 5}}
|
||||
|
||||
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
patch("tools.mcp_tool._ensure_mcp_loop"), \
|
||||
patch("tools.mcp_tool._run_on_mcp_loop", side_effect=RuntimeError("boom")), \
|
||||
patch("tools.mcp_tool._stop_mcp_loop") as mock_stop:
|
||||
@@ -187,7 +191,8 @@ class TestProbeMcpServerTools:
|
||||
connect_calls.append(name)
|
||||
return mock_server
|
||||
|
||||
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
|
||||
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
|
||||
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
|
||||
patch("tools.mcp_tool._ensure_mcp_loop"), \
|
||||
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
|
||||
|
||||
@@ -1,11 +1,22 @@
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from tools.mcp_tool import MCPServerTask, _format_connect_error, _resolve_stdio_command
|
||||
from tools.mcp_tool import MCPServerTask, _format_connect_error, _resolve_stdio_command, _MCP_AVAILABLE
|
||||
|
||||
# Ensure the mcp module symbols exist for patching even when the SDK isn't installed
|
||||
if not _MCP_AVAILABLE:
|
||||
import tools.mcp_tool as _mcp_mod
|
||||
if not hasattr(_mcp_mod, "StdioServerParameters"):
|
||||
_mcp_mod.StdioServerParameters = MagicMock
|
||||
if not hasattr(_mcp_mod, "stdio_client"):
|
||||
_mcp_mod.stdio_client = MagicMock
|
||||
if not hasattr(_mcp_mod, "ClientSession"):
|
||||
_mcp_mod.ClientSession = MagicMock
|
||||
|
||||
|
||||
def test_resolve_stdio_command_falls_back_to_hermes_node_bin(tmp_path):
|
||||
|
||||
+545
-13
@@ -5,18 +5,30 @@ Code Execution Tool -- Programmatic Tool Calling (PTC)
|
||||
Lets the LLM write a Python script that calls Hermes tools via RPC,
|
||||
collapsing multi-step tool chains into a single inference turn.
|
||||
|
||||
Architecture:
|
||||
1. Parent generates a `hermes_tools.py` stub module with RPC functions
|
||||
Architecture (two transports):
|
||||
|
||||
**Local backend (UDS):**
|
||||
1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions
|
||||
2. Parent opens a Unix domain socket and starts an RPC listener thread
|
||||
3. Parent spawns a child process that runs the LLM's script
|
||||
4. When the script calls a tool function, the call travels over the UDS
|
||||
back to the parent, which dispatches through handle_function_call
|
||||
5. Only the script's stdout is returned to the LLM; intermediate tool
|
||||
results never enter the context window
|
||||
4. Tool calls travel over the UDS back to the parent for dispatch
|
||||
|
||||
Platform: Linux / macOS only (Unix domain sockets). Disabled on Windows.
|
||||
**Remote backends (file-based RPC):**
|
||||
1. Parent generates `hermes_tools.py` with file-based RPC stubs
|
||||
2. Parent ships both files to the remote environment
|
||||
3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.)
|
||||
4. Tool calls are written as request files; a polling thread on the parent
|
||||
reads them via execute_oneshot(), dispatches, and writes response files
|
||||
5. The script polls for response files and continues
|
||||
|
||||
In both cases, only the script's stdout is returned to the LLM; intermediate
|
||||
tool results never enter the context window.
|
||||
|
||||
Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows.
|
||||
Remote execution additionally requires Python 3 in the terminal backend.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -114,11 +126,17 @@ _TOOL_STUBS = {
|
||||
}
|
||||
|
||||
|
||||
def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
|
||||
def generate_hermes_tools_module(enabled_tools: List[str],
|
||||
transport: str = "uds") -> str:
|
||||
"""
|
||||
Build the source code for the hermes_tools.py stub module.
|
||||
|
||||
Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs.
|
||||
|
||||
Args:
|
||||
enabled_tools: Tool names enabled in the current session.
|
||||
transport: ``"uds"`` for Unix domain socket (local backend) or
|
||||
``"file"`` for file-based RPC (remote backends).
|
||||
"""
|
||||
tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools))
|
||||
|
||||
@@ -135,13 +153,18 @@ def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
|
||||
)
|
||||
export_names.append(func_name)
|
||||
|
||||
header = '''\
|
||||
"""Auto-generated Hermes tools RPC stubs."""
|
||||
import json, os, socket, shlex, time
|
||||
if transport == "file":
|
||||
header = _FILE_TRANSPORT_HEADER
|
||||
else:
|
||||
header = _UDS_TRANSPORT_HEADER
|
||||
|
||||
_sock = None
|
||||
return header + "\n".join(stub_functions)
|
||||
|
||||
|
||||
# ---- Shared helpers section (embedded in both transport headers) ----------
|
||||
|
||||
_COMMON_HELPERS = '''\
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Convenience helpers (avoid common scripting pitfalls)
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -176,6 +199,17 @@ def retry(fn, max_attempts=3, delay=2):
|
||||
time.sleep(delay * (2 ** attempt))
|
||||
raise last_err
|
||||
|
||||
'''
|
||||
|
||||
# ---- UDS transport (local backend) ---------------------------------------
|
||||
|
||||
_UDS_TRANSPORT_HEADER = '''\
|
||||
"""Auto-generated Hermes tools RPC stubs."""
|
||||
import json, os, socket, shlex, time
|
||||
|
||||
_sock = None
|
||||
''' + _COMMON_HELPERS + '''\
|
||||
|
||||
def _connect():
|
||||
global _sock
|
||||
if _sock is None:
|
||||
@@ -208,7 +242,57 @@ def _call(tool_name, args):
|
||||
|
||||
'''
|
||||
|
||||
return header + "\n".join(stub_functions)
|
||||
# ---- File-based transport (remote backends) -------------------------------
|
||||
|
||||
_FILE_TRANSPORT_HEADER = '''\
|
||||
"""Auto-generated Hermes tools RPC stubs (file-based transport)."""
|
||||
import json, os, shlex, time
|
||||
|
||||
_RPC_DIR = os.environ.get("HERMES_RPC_DIR", "/tmp/hermes_rpc")
|
||||
_seq = 0
|
||||
''' + _COMMON_HELPERS + '''\
|
||||
|
||||
def _call(tool_name, args):
|
||||
"""Send a tool call request via file-based RPC and wait for response."""
|
||||
global _seq
|
||||
_seq += 1
|
||||
seq_str = f"{_seq:06d}"
|
||||
req_file = os.path.join(_RPC_DIR, f"req_{seq_str}")
|
||||
res_file = os.path.join(_RPC_DIR, f"res_{seq_str}")
|
||||
|
||||
# Write request atomically (write to .tmp, then rename)
|
||||
tmp = req_file + ".tmp"
|
||||
with open(tmp, "w") as f:
|
||||
json.dump({"tool": tool_name, "args": args, "seq": _seq}, f)
|
||||
os.rename(tmp, req_file)
|
||||
|
||||
# Wait for response with adaptive polling
|
||||
deadline = time.monotonic() + 300 # 5-minute timeout per tool call
|
||||
poll_interval = 0.05 # Start at 50ms
|
||||
while not os.path.exists(res_file):
|
||||
if time.monotonic() > deadline:
|
||||
raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s")
|
||||
time.sleep(poll_interval)
|
||||
poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms
|
||||
|
||||
with open(res_file) as f:
|
||||
raw = f.read()
|
||||
|
||||
# Clean up response file
|
||||
try:
|
||||
os.unlink(res_file)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
result = json.loads(raw)
|
||||
if isinstance(result, str):
|
||||
try:
|
||||
return json.loads(result)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return result
|
||||
return result
|
||||
|
||||
'''
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -339,6 +423,443 @@ def _rpc_server_loop(
|
||||
logger.debug("RPC conn close error: %s", e)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Remote execution support (file-based RPC via terminal backend)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _get_or_create_env(task_id: str):
|
||||
"""Get or create the terminal environment for *task_id*.
|
||||
|
||||
Reuses the same environment (container/sandbox/SSH session) that the
|
||||
terminal and file tools use, creating one if it doesn't exist yet.
|
||||
Returns ``(env, env_type)`` tuple.
|
||||
"""
|
||||
from tools.terminal_tool import (
|
||||
_active_environments, _env_lock, _create_environment,
|
||||
_get_env_config, _last_activity, _start_cleanup_thread,
|
||||
_creation_locks, _creation_locks_lock, _task_env_overrides,
|
||||
)
|
||||
|
||||
effective_task_id = task_id or "default"
|
||||
|
||||
# Fast path: environment already exists
|
||||
with _env_lock:
|
||||
if effective_task_id in _active_environments:
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
return _active_environments[effective_task_id], _get_env_config()["env_type"]
|
||||
|
||||
# Slow path: create environment (same pattern as file_tools._get_file_ops)
|
||||
with _creation_locks_lock:
|
||||
if effective_task_id not in _creation_locks:
|
||||
_creation_locks[effective_task_id] = threading.Lock()
|
||||
task_lock = _creation_locks[effective_task_id]
|
||||
|
||||
with task_lock:
|
||||
with _env_lock:
|
||||
if effective_task_id in _active_environments:
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
return _active_environments[effective_task_id], _get_env_config()["env_type"]
|
||||
|
||||
config = _get_env_config()
|
||||
env_type = config["env_type"]
|
||||
overrides = _task_env_overrides.get(effective_task_id, {})
|
||||
|
||||
if env_type == "docker":
|
||||
image = overrides.get("docker_image") or config["docker_image"]
|
||||
elif env_type == "singularity":
|
||||
image = overrides.get("singularity_image") or config["singularity_image"]
|
||||
elif env_type == "modal":
|
||||
image = overrides.get("modal_image") or config["modal_image"]
|
||||
elif env_type == "daytona":
|
||||
image = overrides.get("daytona_image") or config["daytona_image"]
|
||||
else:
|
||||
image = ""
|
||||
|
||||
cwd = overrides.get("cwd") or config["cwd"]
|
||||
|
||||
container_config = None
|
||||
if env_type in ("docker", "singularity", "modal", "daytona"):
|
||||
container_config = {
|
||||
"container_cpu": config.get("container_cpu", 1),
|
||||
"container_memory": config.get("container_memory", 5120),
|
||||
"container_disk": config.get("container_disk", 51200),
|
||||
"container_persistent": config.get("container_persistent", True),
|
||||
"docker_volumes": config.get("docker_volumes", []),
|
||||
}
|
||||
|
||||
ssh_config = None
|
||||
if env_type == "ssh":
|
||||
ssh_config = {
|
||||
"host": config.get("ssh_host", ""),
|
||||
"user": config.get("ssh_user", ""),
|
||||
"port": config.get("ssh_port", 22),
|
||||
"key": config.get("ssh_key", ""),
|
||||
"persistent": config.get("ssh_persistent", False),
|
||||
}
|
||||
|
||||
local_config = None
|
||||
if env_type == "local":
|
||||
local_config = {
|
||||
"persistent": config.get("local_persistent", False),
|
||||
}
|
||||
|
||||
logger.info("Creating new %s environment for execute_code task %s...",
|
||||
env_type, effective_task_id[:8])
|
||||
env = _create_environment(
|
||||
env_type=env_type,
|
||||
image=image,
|
||||
cwd=cwd,
|
||||
timeout=config["timeout"],
|
||||
ssh_config=ssh_config,
|
||||
container_config=container_config,
|
||||
local_config=local_config,
|
||||
task_id=effective_task_id,
|
||||
host_cwd=config.get("host_cwd"),
|
||||
)
|
||||
|
||||
with _env_lock:
|
||||
_active_environments[effective_task_id] = env
|
||||
_last_activity[effective_task_id] = time.time()
|
||||
|
||||
_start_cleanup_thread()
|
||||
logger.info("%s environment ready for execute_code task %s",
|
||||
env_type, effective_task_id[:8])
|
||||
return env, env_type
|
||||
|
||||
|
||||
def _ship_file_to_remote(env, remote_path: str, content: str) -> None:
|
||||
"""Write *content* to *remote_path* on the remote environment.
|
||||
|
||||
Uses ``echo … | base64 -d`` rather than stdin piping because some
|
||||
backends (Modal) don't reliably deliver stdin_data to chained
|
||||
commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single
|
||||
quotes are fine.
|
||||
"""
|
||||
encoded = base64.b64encode(content.encode("utf-8")).decode("ascii")
|
||||
env.execute_oneshot(
|
||||
f"echo '{encoded}' | base64 -d > {remote_path}",
|
||||
cwd="/",
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
|
||||
def _rpc_poll_loop(
|
||||
env,
|
||||
rpc_dir: str,
|
||||
task_id: str,
|
||||
tool_call_log: list,
|
||||
tool_call_counter: list,
|
||||
max_tool_calls: int,
|
||||
allowed_tools: frozenset,
|
||||
stop_event: threading.Event,
|
||||
):
|
||||
"""Poll the remote filesystem for tool call requests and dispatch them.
|
||||
|
||||
Runs in a background thread. Uses ``env.execute_oneshot()`` so it can
|
||||
operate concurrently with the script-execution thread that holds
|
||||
``env.execute()`` (important for persistent-shell backends like SSH).
|
||||
"""
|
||||
from model_tools import handle_function_call
|
||||
|
||||
poll_interval = 0.1 # 100 ms
|
||||
|
||||
while not stop_event.is_set():
|
||||
try:
|
||||
# List pending request files (skip .tmp partials)
|
||||
ls_result = env.execute_oneshot(
|
||||
f"ls -1 {rpc_dir}/req_* 2>/dev/null || true",
|
||||
cwd="/",
|
||||
timeout=10,
|
||||
)
|
||||
output = ls_result.get("output", "").strip()
|
||||
if not output:
|
||||
stop_event.wait(poll_interval)
|
||||
continue
|
||||
|
||||
req_files = sorted([
|
||||
f.strip() for f in output.split("\n")
|
||||
if f.strip()
|
||||
and not f.strip().endswith(".tmp")
|
||||
and "/req_" in f.strip()
|
||||
])
|
||||
|
||||
for req_file in req_files:
|
||||
if stop_event.is_set():
|
||||
break
|
||||
|
||||
call_start = time.monotonic()
|
||||
|
||||
# Read request
|
||||
read_result = env.execute_oneshot(
|
||||
f"cat {req_file}",
|
||||
cwd="/",
|
||||
timeout=10,
|
||||
)
|
||||
try:
|
||||
request = json.loads(read_result.get("output", ""))
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
logger.debug("Malformed RPC request in %s", req_file)
|
||||
# Remove bad request to avoid infinite retry
|
||||
env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5)
|
||||
continue
|
||||
|
||||
tool_name = request.get("tool", "")
|
||||
tool_args = request.get("args", {})
|
||||
seq = request.get("seq", 0)
|
||||
seq_str = f"{seq:06d}"
|
||||
res_file = f"{rpc_dir}/res_{seq_str}"
|
||||
|
||||
# Enforce allow-list
|
||||
if tool_name not in allowed_tools:
|
||||
available = ", ".join(sorted(allowed_tools))
|
||||
tool_result = json.dumps({
|
||||
"error": (
|
||||
f"Tool '{tool_name}' is not available in execute_code. "
|
||||
f"Available: {available}"
|
||||
)
|
||||
})
|
||||
# Enforce tool call limit
|
||||
elif tool_call_counter[0] >= max_tool_calls:
|
||||
tool_result = json.dumps({
|
||||
"error": (
|
||||
f"Tool call limit reached ({max_tool_calls}). "
|
||||
"No more tool calls allowed in this execution."
|
||||
)
|
||||
})
|
||||
else:
|
||||
# Strip forbidden terminal parameters
|
||||
if tool_name == "terminal" and isinstance(tool_args, dict):
|
||||
for param in _TERMINAL_BLOCKED_PARAMS:
|
||||
tool_args.pop(param, None)
|
||||
|
||||
# Dispatch through the standard tool handler
|
||||
try:
|
||||
_real_stdout, _real_stderr = sys.stdout, sys.stderr
|
||||
devnull = open(os.devnull, "w")
|
||||
try:
|
||||
sys.stdout = devnull
|
||||
sys.stderr = devnull
|
||||
tool_result = handle_function_call(
|
||||
tool_name, tool_args, task_id=task_id
|
||||
)
|
||||
finally:
|
||||
sys.stdout, sys.stderr = _real_stdout, _real_stderr
|
||||
devnull.close()
|
||||
except Exception as exc:
|
||||
logger.error("Tool call failed in remote sandbox: %s",
|
||||
exc, exc_info=True)
|
||||
tool_result = json.dumps({"error": str(exc)})
|
||||
|
||||
tool_call_counter[0] += 1
|
||||
call_duration = time.monotonic() - call_start
|
||||
tool_call_log.append({
|
||||
"tool": tool_name,
|
||||
"args_preview": str(tool_args)[:80],
|
||||
"duration": round(call_duration, 2),
|
||||
})
|
||||
|
||||
# Write response atomically (tmp + rename).
|
||||
# Use echo piping (not stdin_data) because Modal doesn't
|
||||
# reliably deliver stdin to chained commands.
|
||||
encoded_result = base64.b64encode(
|
||||
tool_result.encode("utf-8")
|
||||
).decode("ascii")
|
||||
env.execute_oneshot(
|
||||
f"echo '{encoded_result}' | base64 -d > {res_file}.tmp"
|
||||
f" && mv {res_file}.tmp {res_file}",
|
||||
cwd="/",
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
# Remove the request file
|
||||
env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5)
|
||||
|
||||
except Exception as e:
|
||||
if not stop_event.is_set():
|
||||
logger.debug("RPC poll error: %s", e, exc_info=True)
|
||||
|
||||
if not stop_event.is_set():
|
||||
stop_event.wait(poll_interval)
|
||||
|
||||
|
||||
def _execute_remote(
|
||||
code: str,
|
||||
task_id: Optional[str],
|
||||
enabled_tools: Optional[List[str]],
|
||||
) -> str:
|
||||
"""Run a script on the remote terminal backend via file-based RPC.
|
||||
|
||||
The script and the generated hermes_tools.py module are shipped to
|
||||
the remote environment, and tool calls are proxied through a polling
|
||||
thread that communicates via request/response files.
|
||||
"""
|
||||
from tools.terminal_tool import _interrupt_event
|
||||
|
||||
_cfg = _load_config()
|
||||
timeout = _cfg.get("timeout", DEFAULT_TIMEOUT)
|
||||
max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS)
|
||||
|
||||
session_tools = set(enabled_tools) if enabled_tools else set()
|
||||
sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools)
|
||||
if not sandbox_tools:
|
||||
sandbox_tools = SANDBOX_ALLOWED_TOOLS
|
||||
|
||||
effective_task_id = task_id or "default"
|
||||
env, env_type = _get_or_create_env(effective_task_id)
|
||||
|
||||
sandbox_id = uuid.uuid4().hex[:12]
|
||||
sandbox_dir = f"/tmp/hermes_exec_{sandbox_id}"
|
||||
|
||||
tool_call_log: list = []
|
||||
tool_call_counter = [0]
|
||||
exec_start = time.monotonic()
|
||||
stop_event = threading.Event()
|
||||
rpc_thread = None
|
||||
|
||||
try:
|
||||
# Verify Python is available on the remote
|
||||
py_check = env.execute_oneshot(
|
||||
"command -v python3 >/dev/null 2>&1 && echo OK",
|
||||
cwd="/", timeout=15,
|
||||
)
|
||||
if "OK" not in py_check.get("output", ""):
|
||||
return json.dumps({
|
||||
"status": "error",
|
||||
"error": (
|
||||
f"Python 3 is not available in the {env_type} terminal "
|
||||
"environment. Install Python to use execute_code with "
|
||||
"remote backends."
|
||||
),
|
||||
"tool_calls_made": 0,
|
||||
"duration_seconds": 0,
|
||||
})
|
||||
|
||||
# Create sandbox directory on remote
|
||||
env.execute_oneshot(
|
||||
f"mkdir -p {sandbox_dir}/rpc", cwd="/", timeout=10,
|
||||
)
|
||||
|
||||
# Generate and ship files
|
||||
tools_src = generate_hermes_tools_module(
|
||||
list(sandbox_tools), transport="file",
|
||||
)
|
||||
_ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src)
|
||||
_ship_file_to_remote(env, f"{sandbox_dir}/script.py", code)
|
||||
|
||||
# Start RPC polling thread
|
||||
rpc_thread = threading.Thread(
|
||||
target=_rpc_poll_loop,
|
||||
args=(
|
||||
env, f"{sandbox_dir}/rpc", effective_task_id,
|
||||
tool_call_log, tool_call_counter, max_tool_calls,
|
||||
sandbox_tools, stop_event,
|
||||
),
|
||||
daemon=True,
|
||||
)
|
||||
rpc_thread.start()
|
||||
|
||||
# Build environment variable prefix for the script
|
||||
env_prefix = (
|
||||
f"HERMES_RPC_DIR={sandbox_dir}/rpc "
|
||||
f"PYTHONDONTWRITEBYTECODE=1"
|
||||
)
|
||||
tz = os.getenv("HERMES_TIMEZONE", "").strip()
|
||||
if tz:
|
||||
env_prefix += f" TZ={tz}"
|
||||
|
||||
# Execute the script on the remote backend
|
||||
logger.info("Executing code on %s backend (task %s)...",
|
||||
env_type, effective_task_id[:8])
|
||||
script_result = env.execute(
|
||||
f"cd {sandbox_dir} && {env_prefix} python3 script.py",
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
stdout_text = script_result.get("output", "")
|
||||
exit_code = script_result.get("returncode", -1)
|
||||
status = "success"
|
||||
|
||||
# Check for timeout/interrupt from the backend
|
||||
if exit_code == 124:
|
||||
status = "timeout"
|
||||
elif exit_code == 130:
|
||||
status = "interrupted"
|
||||
|
||||
except Exception as exc:
|
||||
duration = round(time.monotonic() - exec_start, 2)
|
||||
logger.error(
|
||||
"execute_code remote failed after %ss with %d tool calls: %s: %s",
|
||||
duration, tool_call_counter[0], type(exc).__name__, exc,
|
||||
exc_info=True,
|
||||
)
|
||||
return json.dumps({
|
||||
"status": "error",
|
||||
"error": str(exc),
|
||||
"tool_calls_made": tool_call_counter[0],
|
||||
"duration_seconds": duration,
|
||||
}, ensure_ascii=False)
|
||||
|
||||
finally:
|
||||
# Stop the polling thread
|
||||
stop_event.set()
|
||||
if rpc_thread is not None:
|
||||
rpc_thread.join(timeout=5)
|
||||
|
||||
# Clean up remote sandbox dir
|
||||
try:
|
||||
env.execute_oneshot(
|
||||
f"rm -rf {sandbox_dir}", cwd="/", timeout=15,
|
||||
)
|
||||
except Exception:
|
||||
logger.debug("Failed to clean up remote sandbox %s", sandbox_dir)
|
||||
|
||||
duration = round(time.monotonic() - exec_start, 2)
|
||||
|
||||
# --- Post-process output (same as local path) ---
|
||||
|
||||
# Truncate stdout to cap
|
||||
if len(stdout_text) > MAX_STDOUT_BYTES:
|
||||
head_bytes = int(MAX_STDOUT_BYTES * 0.4)
|
||||
tail_bytes = MAX_STDOUT_BYTES - head_bytes
|
||||
head = stdout_text[:head_bytes]
|
||||
tail = stdout_text[-tail_bytes:]
|
||||
omitted = len(stdout_text) - len(head) - len(tail)
|
||||
stdout_text = (
|
||||
head
|
||||
+ f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted "
|
||||
f"out of {len(stdout_text):,} total] ...\n\n"
|
||||
+ tail
|
||||
)
|
||||
|
||||
# Strip ANSI escape sequences
|
||||
from tools.ansi_strip import strip_ansi
|
||||
stdout_text = strip_ansi(stdout_text)
|
||||
|
||||
# Redact secrets
|
||||
from agent.redact import redact_sensitive_text
|
||||
stdout_text = redact_sensitive_text(stdout_text)
|
||||
|
||||
# Build response
|
||||
result: Dict[str, Any] = {
|
||||
"status": status,
|
||||
"output": stdout_text,
|
||||
"tool_calls_made": tool_call_counter[0],
|
||||
"duration_seconds": duration,
|
||||
}
|
||||
|
||||
if status == "timeout":
|
||||
result["error"] = f"Script timed out after {timeout}s and was killed."
|
||||
elif status == "interrupted":
|
||||
result["output"] = (
|
||||
stdout_text + "\n[execution interrupted — user sent a new message]"
|
||||
)
|
||||
elif exit_code != 0:
|
||||
result["status"] = "error"
|
||||
result["error"] = f"Script exited with code {exit_code}"
|
||||
|
||||
return json.dumps(result, ensure_ascii=False)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -352,6 +873,9 @@ def execute_code(
|
||||
Run a Python script in a sandboxed child process with RPC access
|
||||
to a subset of Hermes tools.
|
||||
|
||||
Dispatches to the local (UDS) or remote (file-based RPC) path
|
||||
depending on the configured terminal backend.
|
||||
|
||||
Args:
|
||||
code: Python source code to execute.
|
||||
task_id: Session task ID for tool isolation (terminal env, etc.).
|
||||
@@ -369,6 +893,14 @@ def execute_code(
|
||||
if not code or not code.strip():
|
||||
return json.dumps({"error": "No code provided."})
|
||||
|
||||
# Dispatch: remote backends use file-based RPC, local uses UDS
|
||||
from tools.terminal_tool import _get_env_config
|
||||
env_type = _get_env_config()["env_type"]
|
||||
if env_type != "local":
|
||||
return _execute_remote(code, task_id, enabled_tools)
|
||||
|
||||
# --- Local execution path (UDS) --- below this line is unchanged ---
|
||||
|
||||
# Import interrupt event from terminal_tool (cooperative cancellation)
|
||||
from tools.terminal_tool import _interrupt_event
|
||||
|
||||
|
||||
+19
-1
@@ -116,7 +116,7 @@ def _normalize_optional_job_value(value: Optional[Any], *, strip_trailing_slash:
|
||||
def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
prompt = job.get("prompt", "")
|
||||
skills = _canonical_skills(job.get("skill"), job.get("skills"))
|
||||
return {
|
||||
result = {
|
||||
"job_id": job["id"],
|
||||
"name": job["name"],
|
||||
"skill": skills[0] if skills else None,
|
||||
@@ -136,6 +136,9 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"paused_at": job.get("paused_at"),
|
||||
"paused_reason": job.get("paused_reason"),
|
||||
}
|
||||
if job.get("script"):
|
||||
result["script"] = job["script"]
|
||||
return result
|
||||
|
||||
|
||||
def cronjob(
|
||||
@@ -153,6 +156,7 @@ def cronjob(
|
||||
provider: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
reason: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
task_id: str = None,
|
||||
) -> str:
|
||||
"""Unified cron job management tool."""
|
||||
@@ -183,6 +187,7 @@ def cronjob(
|
||||
model=_normalize_optional_job_value(model),
|
||||
provider=_normalize_optional_job_value(provider),
|
||||
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
|
||||
script=_normalize_optional_job_value(script),
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
@@ -265,6 +270,9 @@ def cronjob(
|
||||
updates["provider"] = _normalize_optional_job_value(provider)
|
||||
if base_url is not None:
|
||||
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
|
||||
if script is not None:
|
||||
# Pass empty string to clear an existing script
|
||||
updates["script"] = _normalize_optional_job_value(script) if script else None
|
||||
if repeat is not None:
|
||||
# Normalize: treat 0 or negative as None (infinite)
|
||||
normalized_repeat = None if repeat <= 0 else repeat
|
||||
@@ -338,6 +346,11 @@ Jobs run in a fresh session with no current-chat context, so prompts must be sel
|
||||
If skill or skills are provided on create, the future cron run loads those skills in order, then follows the prompt as the task instruction.
|
||||
On update, passing skills=[] clears attached skills.
|
||||
|
||||
If script is provided on create, the referenced Python script runs before each agent turn.
|
||||
Its stdout is injected into the prompt as context. Use this for data collection and change
|
||||
detection — the script handles gathering data, the agent analyzes and reports.
|
||||
On update, pass script="" to clear an attached script.
|
||||
|
||||
NOTE: The agent's final response is auto-delivered to the target. Put the primary
|
||||
user-facing content in the final response. Cron jobs run autonomously with no user
|
||||
present — they cannot ask questions or request clarification.
|
||||
@@ -402,6 +415,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
||||
"reason": {
|
||||
"type": "string",
|
||||
"description": "Optional pause reason"
|
||||
},
|
||||
"script": {
|
||||
"type": "string",
|
||||
"description": "Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under ~/.hermes/scripts/. On update, pass empty string to clear."
|
||||
}
|
||||
},
|
||||
"required": ["action"]
|
||||
@@ -451,6 +468,7 @@ registry.register(
|
||||
provider=args.get("provider"),
|
||||
base_url=args.get("base_url"),
|
||||
reason=args.get("reason"),
|
||||
script=args.get("script"),
|
||||
task_id=kw.get("task_id"),
|
||||
),
|
||||
check_fn=check_cronjob_requirements,
|
||||
|
||||
@@ -91,6 +91,19 @@ class BaseEnvironment(ABC):
|
||||
kw["stdin"] = subprocess.DEVNULL
|
||||
return kw
|
||||
|
||||
def execute_oneshot(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
"""Execute a command bypassing any persistent shell.
|
||||
|
||||
Safe for concurrent use alongside a long-running execute() call.
|
||||
Backends that maintain a persistent shell (SSH, Local) override this
|
||||
to route through their oneshot path, avoiding the shell lock.
|
||||
Non-persistent backends delegate to execute().
|
||||
"""
|
||||
return self.execute(command, cwd=cwd, timeout=timeout,
|
||||
stdin_data=stdin_data)
|
||||
|
||||
def _timeout_result(self, timeout: int | None) -> dict:
|
||||
"""Standard return dict when a command times out."""
|
||||
return {
|
||||
|
||||
@@ -141,6 +141,19 @@ class PersistentShellMixin:
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
|
||||
def execute_oneshot(self, command: str, cwd: str = "", *,
|
||||
timeout: int | None = None,
|
||||
stdin_data: str | None = None) -> dict:
|
||||
"""Always use the oneshot (non-persistent) execution path.
|
||||
|
||||
This bypasses _shell_lock so it can run concurrently with a
|
||||
long-running command in the persistent shell — used by
|
||||
execute_code's file-based RPC polling thread.
|
||||
"""
|
||||
return self._execute_oneshot(
|
||||
command, cwd, timeout=timeout, stdin_data=stdin_data,
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
if self.persistent:
|
||||
self._cleanup_persistent_shell()
|
||||
|
||||
@@ -898,7 +898,7 @@ class ShellFileOperations(FileOperations):
|
||||
hidden_exclude = "-not -path '*/.*'"
|
||||
|
||||
cmd = f"find {self._escape_shell_arg(path)} {hidden_exclude} -type f -name {self._escape_shell_arg(search_pattern)} " \
|
||||
f"-printf '%T@ %p\\\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
|
||||
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
|
||||
|
||||
result = self._exec(cmd, timeout=60)
|
||||
|
||||
|
||||
@@ -232,6 +232,9 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI
|
||||
| `MATRIX_ALLOWED_USERS` | Comma-separated Matrix user IDs allowed to message the bot (e.g. `@alice:matrix.org`) |
|
||||
| `MATRIX_HOME_ROOM` | Room ID for proactive message delivery (e.g. `!abc123:matrix.org`) |
|
||||
| `MATRIX_ENCRYPTION` | Enable end-to-end encryption (`true`/`false`, default: `false`) |
|
||||
| `MATRIX_REQUIRE_MENTION` | Require `@mention` in rooms (default: `true`). Set to `false` to respond to all messages. |
|
||||
| `MATRIX_FREE_RESPONSE_ROOMS` | Comma-separated room IDs where bot responds without `@mention` |
|
||||
| `MATRIX_AUTO_THREAD` | Auto-create threads for room messages (default: `true`) |
|
||||
| `HASS_TOKEN` | Home Assistant Long-Lived Access Token (enables HA platform + tools) |
|
||||
| `HASS_URL` | Home Assistant URL (default: `http://homeassistant.local:8123`) |
|
||||
| `WEBHOOK_ENABLED` | Enable the webhook platform adapter (`true`/`false`) |
|
||||
|
||||
@@ -17,8 +17,9 @@ Before setup, here's the part most people want to know: how Hermes behaves once
|
||||
| Context | Behavior |
|
||||
|---------|----------|
|
||||
| **DMs** | Hermes responds to every message. No `@mention` needed. Each DM has its own session. |
|
||||
| **Rooms** | Hermes responds to all messages in rooms it has joined. Room invites are auto-accepted. |
|
||||
| **Threads** | Hermes supports Matrix threads (MSC3440). If you reply in a thread, Hermes keeps the thread context isolated from the main room timeline. |
|
||||
| **Rooms** | By default, Hermes requires an `@mention` to respond. Set `MATRIX_REQUIRE_MENTION=false` or add room IDs to `MATRIX_FREE_RESPONSE_ROOMS` for free-response rooms. Room invites are auto-accepted. |
|
||||
| **Threads** | Hermes supports Matrix threads (MSC3440). If you reply in a thread, Hermes keeps the thread context isolated from the main room timeline. Threads where the bot has already participated do not require a mention. |
|
||||
| **Auto-threading** | By default, Hermes auto-creates a thread for each message it responds to in a room. This keeps conversations isolated. Set `MATRIX_AUTO_THREAD=false` to disable. |
|
||||
| **Shared rooms with multiple users** | By default, Hermes isolates session history per user inside the room. Two people talking in the same room do not share one transcript unless you explicitly disable that. |
|
||||
|
||||
:::tip
|
||||
@@ -51,6 +52,30 @@ Shared sessions can be useful for a collaborative room, but they also mean:
|
||||
- one person's long tool-heavy task can bloat everyone else's context
|
||||
- one person's in-flight run can interrupt another person's follow-up in the same room
|
||||
|
||||
### Mention and Threading Configuration
|
||||
|
||||
You can configure mention and auto-threading behavior via environment variables or `config.yaml`:
|
||||
|
||||
```yaml
|
||||
matrix:
|
||||
require_mention: true # Require @mention in rooms (default: true)
|
||||
free_response_rooms: # Rooms exempt from mention requirement
|
||||
- "!abc123:matrix.org"
|
||||
auto_thread: true # Auto-create threads for responses (default: true)
|
||||
```
|
||||
|
||||
Or via environment variables:
|
||||
|
||||
```bash
|
||||
MATRIX_REQUIRE_MENTION=true
|
||||
MATRIX_FREE_RESPONSE_ROOMS=!abc123:matrix.org,!def456:matrix.org
|
||||
MATRIX_AUTO_THREAD=true
|
||||
```
|
||||
|
||||
:::note
|
||||
If you are upgrading from a version that did not have `MATRIX_REQUIRE_MENTION`, the bot previously responded to all messages in rooms. To preserve that behavior, set `MATRIX_REQUIRE_MENTION=false`.
|
||||
:::
|
||||
|
||||
This guide walks you through the full setup process — from creating your bot account to sending your first message.
|
||||
|
||||
## Step 1: Create a Bot Account
|
||||
|
||||
Reference in New Issue
Block a user