Compare commits

..

10 Commits

Author SHA1 Message Date
Brooklyn Nicholson cc66b666e5 fix: inject plugin context after cache markers to preserve Anthropic prompt cache prefix stability 2026-04-04 18:04:55 -05:00
Fran Fitzpatrick 2556cfdab1 fix(gateway): match Discord mention-stripping behavior in Matrix adapter
Move mention stripping outside the `if not is_dm` guard so mentions
are stripped in DMs too. Remove the bare-mention early return so a
message containing only a mention passes through as empty string,
matching Discord's behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 13:09:27 -07:00
Fran Fitzpatrick d86be33161 feat(gateway): add MATRIX_REQUIRE_MENTION and MATRIX_AUTO_THREAD support
Bring Matrix feature parity with Discord by adding mention gating and
auto-threading. Both default to true, matching Discord behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 13:09:27 -07:00
Teknium 569e9f9670 feat: execute_code runs on remote terminal backends (#5088)
* feat: execute_code runs on remote terminal backends (Docker/SSH/Modal/Daytona/Singularity)

When TERMINAL_ENV is not 'local', execute_code now ships the script to
the remote environment and runs it there via the terminal backend --
the same container/sandbox/SSH session used by terminal() and file tools.

Architecture:
- Local backend: unchanged (UDS RPC, subprocess.Popen)
- Remote backends: file-based RPC via execute_oneshot() polling
  - Script writes request files, parent polls and dispatches tool calls
  - Responses written atomically (tmp + rename) via base64/stdin
  - execute_oneshot() bypasses persistent shell lock for concurrency

Changes:
- tools/environments/base.py: add execute_oneshot() (delegates to execute())
- tools/environments/persistent_shell.py: override execute_oneshot() to
  bypass _shell_lock via _execute_oneshot(), enabling concurrent polling
- tools/code_execution_tool.py: add file-based transport to
  generate_hermes_tools_module(), _execute_remote() with full env
  get-or-create, file shipping, RPC poll loop, output post-processing

* fix: use _get_env_config() instead of raw TERMINAL_ENV env var

Read terminal backend type through the canonical config resolution
path (terminal_tool._get_env_config) instead of os.getenv directly.

* fix: use echo piping instead of stdin_data for base64 writes

Modal doesn't reliably deliver stdin_data to chained commands
(base64 -d > file && mv), producing 0-byte files. Switch to
echo 'base64' | base64 -d which works on all backends.

Verified E2E on both Docker and Modal.
2026-04-04 12:57:49 -07:00
Chris Bartholomew 28e1e210ee fix(hindsight): overhaul hindsight memory plugin and memory setup wizard
- Dedicated asyncio event loop for Hindsight async calls (fixes aiohttp session leaks)
- Client caching (reuse instead of creating per-call)
- Local mode daemon management with config change detection and auto-restart
- Memory mode support (hybrid/context/tools) and prefetch method (recall/reflect)
- Proper shutdown with event loop and client cleanup
- Disable HindsightEmbedded.__del__ to avoid GC loop errors
- Update API URLs (app -> ui.hindsight.vectorize.io, api_url -> base_url)
- Setup wizard: conditional fields (when clause), dynamic defaults (default_from)
- Switch dependency install from pip to uv (correct for uv-based venvs)
- Add hindsight-all to plugin.yaml and import mapping
- 12 new tests for dispatch routing and setup field filtering

Original PR #5044 by cdbartholomew.
2026-04-04 12:18:46 -07:00
Teknium 93aa01c71c fix: use main provider model for auxiliary tasks on non-aggregator providers (#5091)
Users on direct API-key providers (Alibaba, DeepSeek, ZAI, etc.) without
an OpenRouter or Nous key would get broken auxiliary tasks (compression,
vision, etc.) because _resolve_auto() only tried aggregator providers
first, then fell back to iterating PROVIDER_REGISTRY with wrong default
model names.

Now _resolve_auto() checks the user's main provider first. If it's not
an aggregator (OpenRouter/Nous), it uses their main model directly for
all auxiliary tasks. Aggregator users still get the cheap gemini-flash
model as before.

Adds _read_main_provider() to read model.provider from config.yaml,
mirroring the existing _read_main_model().

Reported by SkyLinx — Alibaba Coding Plan user getting 400 errors from
google/gemini-3-flash-preview being sent to DashScope.
2026-04-04 12:07:43 -07:00
Teknium 5d0f55cac4 feat(cron): add script field for pre-run data collection (#5082)
Add an optional 'script' parameter to cron jobs that references a Python
script. The script runs before each agent turn, and its stdout is injected
into the prompt as context. This enables stateful monitoring — the script
handles data collection and change detection, the LLM analyzes and reports.

- cron/jobs.py: add script field to create_job(), stored in job dict
- cron/scheduler.py: add _run_job_script() executor with timeout handling,
  inject script output/errors into _build_job_prompt()
- tools/cronjob_tools.py: add script to tool schema, create/update handlers,
  _format_job display
- hermes_cli/cron.py: add --script to create/edit, display in list/edit output
- hermes_cli/main.py: add --script argparse for cron create/edit subcommands
- tests/cron/test_cron_script.py: 20 tests covering job CRUD, script
  execution, path resolution, error handling, prompt injection, tool API

Script paths can be absolute or relative (resolved against ~/.hermes/scripts/).
Scripts run with a 120s timeout. Failures are injected as error context so
the LLM can report the problem. Empty string clears an attached script.
2026-04-04 10:43:39 -07:00
catbusconductor e09e48567e fix(openviking): correct API endpoint paths and response parsing
- Browse: POST /api/v1/browse → GET /api/v1/fs/{ls,tree,stat}
- Read: POST /api/v1/read[/abstract] → GET /api/v1/content/{read,abstract,overview}
- System prompt: result.get('children') → len(result) (API returns list)
- Content: result.get('content') → result is a plain string
- Browse: result['entries'] → result is the list; is_dir → isDir (camelCase)
- Browse: add rel_path and abstract fields to entry output

Based on PR #4742 by catbusconductor. Auth header changes dropped
(already on main via #4825).
2026-04-04 10:40:38 -07:00
Teknium 2aa3f199cb fix(doctor): sync provider checks, add config migration, WAL and mem0 diagnostics (#5077)
Provider coverage:
- Add 6 missing providers to _PROVIDER_ENV_HINTS (Nous, DeepSeek,
  DashScope, HF, OpenCode Zen/Go)
- Add 5 missing providers to API connectivity checks (DeepSeek,
  Hugging Face, Alibaba/DashScope, OpenCode Zen, OpenCode Go)

New diagnostics:
- Config version check — detects outdated config, --fix runs
  non-interactive migration automatically
- Stale root-level config keys — detects provider/base_url at root
  level (known bug source, PR #4329), --fix migrates them into
  the model section
- WAL file size check — warns on >50MB WAL files (indicates missed
  checkpoints from the duplicate close() bug), --fix runs PASSIVE
  checkpoint
- Mem0 memory plugin status — checks API key resolution including
  the env+json merge we just fixed
2026-04-04 10:21:33 -07:00
LucidPaths 6367e1c4c0 fix: remove stale test skips, fix regex backtracking, file search bug, and test flakiness
Bug fixes:
- agent/redact.py: catastrophic regex backtracking in _ENV_ASSIGN_RE — removed
  re.IGNORECASE and changed [A-Z_]* to [A-Z0-9_]* to restrict matching to actual
  env var name chars. Without this, the pattern backtracks exponentially on large
  strings (e.g. 100K tool output), causing test_file_read_guards to time out.
- tools/file_operations.py: over-escaped newline in find -printf format string
  produced literal backslash-n instead of a real newline, breaking file search
  result parsing (total_count always 1, paths concatenated).

Test fixes:
- Remove stale pytestmark.skip from 4 test modules that were blanket-skipped as
  'Hangs in non-interactive environments' but actually run fine:
  - test_413_compression.py (12 tests, 25s)
  - test_file_tools_live.py (71 tests, 24s)
  - test_code_execution.py (61 tests, 99s)
  - test_agent_loop_tool_calling.py (has proper OPENROUTER_API_KEY skip already)
- test_413_compression.py: fix threshold values in 2 preflight compression tests
  where context_length was too small for the compressed output to fit in one pass.
- test_mcp_probe.py: add missing _MCP_AVAILABLE mock so tests work without MCP SDK.
- test_mcp_tool_issue_948.py: inject MCP symbols (StdioServerParameters etc.) when
  SDK is not installed so patch() targets exist.
- test_approve_deny_commands.py: replace time.sleep(0.3) with deterministic polling
  of _gateway_queues — fixes race condition where resolve fires before threads
  register their approval entries, causing the test to hang indefinitely.

Net effect: +256 tests recovered from skip, 8 real failures fixed.
2026-04-04 10:18:57 -07:00
33 changed files with 2428 additions and 149 deletions
+45 -1
View File
@@ -697,6 +697,25 @@ def _read_main_model() -> str:
return ""
def _read_main_provider() -> str:
"""Read the user's configured main provider from config.yaml.
Returns the lowercase provider id (e.g. "alibaba", "openrouter") or ""
if not configured.
"""
try:
from hermes_cli.config import load_config
cfg = load_config()
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, dict):
provider = model_cfg.get("provider", "")
if isinstance(provider, str) and provider.strip():
return provider.strip().lower()
except Exception:
pass
return ""
def _resolve_custom_runtime() -> Tuple[Optional[str], Optional[str]]:
"""Resolve the active custom/main endpoint the same way the main CLI does.
@@ -855,10 +874,35 @@ _AUTO_PROVIDER_LABELS = {
}
_AGGREGATOR_PROVIDERS = frozenset({"openrouter", "nous"})
def _resolve_auto() -> Tuple[Optional[OpenAI], Optional[str]]:
"""Full auto-detection chain: OpenRouter → Nous → custom → Codex → API-key → None."""
"""Full auto-detection chain.
Priority:
1. If the user's main provider is NOT an aggregator (OpenRouter / Nous),
use their main provider + main model directly. This ensures users on
Alibaba, DeepSeek, ZAI, etc. get auxiliary tasks handled by the same
provider they already have credentials for — no OpenRouter key needed.
2. OpenRouter → Nous → custom → Codex → API-key providers (original chain).
"""
global auxiliary_is_nous
auxiliary_is_nous = False # Reset — _try_nous() will set True if it wins
# ── Step 1: non-aggregator main provider → use main model directly ──
main_provider = _read_main_provider()
main_model = _read_main_model()
if (main_provider and main_model
and main_provider not in _AGGREGATOR_PROVIDERS
and main_provider not in ("auto", "custom", "")):
client, resolved = resolve_provider_client(main_provider, main_model)
if client is not None:
logger.info("Auxiliary auto-detect: using main provider %s (%s)",
main_provider, resolved or main_model)
return client, resolved or main_model
# ── Step 2: aggregator / fallback chain ──────────────────────────────
tried = []
for try_fn in (_try_openrouter, _try_nous, _try_custom_endpoint,
_try_codex, _resolve_api_key_provider):
+1 -2
View File
@@ -53,8 +53,7 @@ _PREFIX_PATTERNS = [
# ENV assignment patterns: KEY=value where KEY contains a secret-like name
_SECRET_ENV_NAMES = r"(?:API_?KEY|TOKEN|SECRET|PASSWORD|PASSWD|CREDENTIAL|AUTH)"
_ENV_ASSIGN_RE = re.compile(
rf"([A-Z_]{{0,50}}{_SECRET_ENV_NAMES}[A-Z_]{{0,50}})\s*=\s*(['\"]?)(\S+)\2",
re.IGNORECASE,
rf"([A-Z0-9_]{{0,50}}{_SECRET_ENV_NAMES}[A-Z0-9_]{{0,50}})\s*=\s*(['\"]?)(\S+)\2",
)
# JSON field patterns: "apiKey": "value", "token": "value", etc.
+7
View File
@@ -375,6 +375,7 @@ def create_job(
model: Optional[str] = None,
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@@ -391,6 +392,9 @@ def create_job(
model: Optional per-job model override
provider: Optional per-job provider override
base_url: Optional per-job base URL override
script: Optional path to a Python script whose stdout is injected into the
prompt each run. The script runs before the agent turn, and its output
is prepended as context. Useful for data collection / change detection.
Returns:
The created job dict
@@ -419,6 +423,8 @@ def create_job(
normalized_model = normalized_model or None
normalized_provider = normalized_provider or None
normalized_base_url = normalized_base_url or None
normalized_script = str(script).strip() if isinstance(script, str) else None
normalized_script = normalized_script or None
label_source = (prompt or (normalized_skills[0] if normalized_skills else None)) or "cron job"
job = {
@@ -430,6 +436,7 @@ def create_job(
"model": normalized_model,
"provider": normalized_provider,
"base_url": normalized_base_url,
"script": normalized_script,
"schedule": parsed_schedule,
"schedule_display": parsed_schedule.get("display", schedule),
"repeat": {
+79
View File
@@ -13,6 +13,7 @@ import concurrent.futures
import json
import logging
import os
import subprocess
import sys
import traceback
@@ -229,11 +230,89 @@ def _deliver_result(job: dict, content: str) -> None:
logger.info("Job '%s': delivered to %s:%s", job["id"], platform_name, chat_id)
_SCRIPT_TIMEOUT = 120 # seconds
def _run_job_script(script_path: str) -> tuple[bool, str]:
"""Execute a cron job's data-collection script and capture its output.
Args:
script_path: Path to a Python script (resolved via HERMES_HOME/scripts/ or absolute).
Returns:
(success, output) — on failure *output* contains the error message so the
LLM can report the problem to the user.
"""
from hermes_constants import get_hermes_home
path = Path(script_path).expanduser()
if not path.is_absolute():
# Resolve relative paths against HERMES_HOME/scripts/
path = get_hermes_home() / "scripts" / path
if not path.exists():
return False, f"Script not found: {path}"
if not path.is_file():
return False, f"Script path is not a file: {path}"
try:
result = subprocess.run(
[sys.executable, str(path)],
capture_output=True,
text=True,
timeout=_SCRIPT_TIMEOUT,
cwd=str(path.parent),
)
stdout = (result.stdout or "").strip()
stderr = (result.stderr or "").strip()
if result.returncode != 0:
parts = [f"Script exited with code {result.returncode}"]
if stderr:
parts.append(f"stderr:\n{stderr}")
if stdout:
parts.append(f"stdout:\n{stdout}")
return False, "\n".join(parts)
return True, stdout
except subprocess.TimeoutExpired:
return False, f"Script timed out after {_SCRIPT_TIMEOUT}s: {path}"
except Exception as exc:
return False, f"Script execution failed: {exc}"
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
skills = job.get("skills")
# Run data-collection script if configured, inject output as context.
script_path = job.get("script")
if script_path:
success, script_output = _run_job_script(script_path)
if success:
if script_output:
prompt = (
"## Script Output\n"
"The following data was collected by a pre-run script. "
"Use it as context for your analysis.\n\n"
f"```\n{script_output}\n```\n\n"
f"{prompt}"
)
else:
prompt = (
"[Script ran successfully but produced no output.]\n\n"
f"{prompt}"
)
else:
prompt = (
"## Script Error\n"
"The data-collection script failed. Report this to the user.\n\n"
f"```\n{script_output}\n```\n\n"
f"{prompt}"
)
# Always prepend [SILENT] guidance so the cron agent can suppress
# delivery when it has nothing new or noteworthy to report.
silent_hint = (
+14
View File
@@ -575,6 +575,20 @@ def load_gateway_config() -> GatewayConfig:
if isinstance(frc, list):
frc = ",".join(str(v) for v in frc)
os.environ["WHATSAPP_FREE_RESPONSE_CHATS"] = str(frc)
# Matrix settings → env vars (env vars take precedence)
matrix_cfg = yaml_cfg.get("matrix", {})
if isinstance(matrix_cfg, dict):
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
os.environ["MATRIX_REQUIRE_MENTION"] = str(matrix_cfg["require_mention"]).lower()
frc = matrix_cfg.get("free_response_rooms")
if frc is not None and not os.getenv("MATRIX_FREE_RESPONSE_ROOMS"):
if isinstance(frc, list):
frc = ",".join(str(v) for v in frc)
os.environ["MATRIX_FREE_RESPONSE_ROOMS"] = str(frc)
if "auto_thread" in matrix_cfg and not os.getenv("MATRIX_AUTO_THREAD"):
os.environ["MATRIX_AUTO_THREAD"] = str(matrix_cfg["auto_thread"]).lower()
except Exception as e:
logger.warning(
"Failed to process config.yaml — falling back to .env / gateway.json values. "
+144 -7
View File
@@ -5,13 +5,16 @@ matrix-nio Python SDK. Supports optional end-to-end encryption (E2EE)
when installed with ``pip install "matrix-nio[e2e]"``.
Environment variables:
MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org)
MATRIX_ACCESS_TOKEN Access token (preferred auth method)
MATRIX_USER_ID Full user ID (@bot:server) — required for password login
MATRIX_PASSWORD Password (alternative to access token)
MATRIX_ENCRYPTION Set "true" to enable E2EE
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
MATRIX_HOME_ROOM Room ID for cron/notification delivery
MATRIX_HOMESERVER Homeserver URL (e.g. https://matrix.example.org)
MATRIX_ACCESS_TOKEN Access token (preferred auth method)
MATRIX_USER_ID Full user ID (@bot:server) — required for password login
MATRIX_PASSWORD Password (alternative to access token)
MATRIX_ENCRYPTION Set "true" to enable E2EE
MATRIX_ALLOWED_USERS Comma-separated Matrix user IDs (@user:server)
MATRIX_HOME_ROOM Room ID for cron/notification delivery
MATRIX_REQUIRE_MENTION Require @mention in rooms (default: true)
MATRIX_FREE_RESPONSE_ROOMS Comma-separated room IDs exempt from mention requirement
MATRIX_AUTO_THREAD Auto-create threads for room messages (default: true)
"""
from __future__ import annotations
@@ -123,6 +126,10 @@ class MatrixAdapter(BasePlatformAdapter):
# Each entry: (room, event, timestamp)
self._pending_megolm: list = []
# Thread participation tracking (for require_mention bypass)
self._bot_participated_threads: set = self._load_participated_threads()
self._MAX_TRACKED_THREADS = 500
def _is_duplicate_event(self, event_id) -> bool:
"""Return True if this event was already processed. Tracks the ID otherwise."""
if not event_id:
@@ -902,6 +909,30 @@ class MatrixAdapter(BasePlatformAdapter):
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
# Require-mention gating.
if not is_dm:
free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "")
free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()}
require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no")
is_free_room = room.room_id in free_rooms
in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads)
formatted_body = source_content.get("formatted_body")
if require_mention and not is_free_room and not in_bot_thread:
if not self._is_bot_mentioned(body, formatted_body):
return
# Strip mention from body when present (including in DMs).
if self._is_bot_mentioned(body, source_content.get("formatted_body")):
body = self._strip_mention(body)
# Auto-thread: create a thread for non-DM, non-threaded messages.
if not is_dm and not thread_id:
auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes")
if auto_thread:
thread_id = event.event_id
self._track_thread(thread_id)
# Reply-to detection.
reply_to = None
in_reply_to = relates_to.get("m.in_reply_to", {})
@@ -946,6 +977,9 @@ class MatrixAdapter(BasePlatformAdapter):
reply_to_message_id=reply_to,
)
if thread_id:
self._track_thread(thread_id)
await self.handle_message(msg_event)
async def _on_room_message_media(self, room: Any, event: Any) -> None:
@@ -1031,6 +1065,30 @@ class MatrixAdapter(BasePlatformAdapter):
if relates_to.get("rel_type") == "m.thread":
thread_id = relates_to.get("event_id")
# Require-mention gating (media messages).
if not is_dm:
free_rooms_raw = os.getenv("MATRIX_FREE_RESPONSE_ROOMS", "")
free_rooms = {r.strip() for r in free_rooms_raw.split(",") if r.strip()}
require_mention = os.getenv("MATRIX_REQUIRE_MENTION", "true").lower() not in ("false", "0", "no")
is_free_room = room.room_id in free_rooms
in_bot_thread = bool(thread_id and thread_id in self._bot_participated_threads)
if require_mention and not is_free_room and not in_bot_thread:
formatted_body = source_content.get("formatted_body")
if not self._is_bot_mentioned(body, formatted_body):
return
# Strip mention from body when present (including in DMs).
if self._is_bot_mentioned(body, source_content.get("formatted_body")):
body = self._strip_mention(body)
# Auto-thread: create a thread for non-DM, non-threaded messages.
if not is_dm and not thread_id:
auto_thread = os.getenv("MATRIX_AUTO_THREAD", "true").lower() in ("true", "1", "yes")
if auto_thread:
thread_id = event.event_id
self._track_thread(thread_id)
# For voice messages, cache audio locally for transcription tools.
# Use the authenticated nio client to download (Matrix requires auth for media).
media_urls = [http_url] if http_url else None
@@ -1079,6 +1137,9 @@ class MatrixAdapter(BasePlatformAdapter):
media_types=media_types,
)
if thread_id:
self._track_thread(thread_id)
await self.handle_message(msg_event)
async def _on_invite(self, room: Any, event: Any) -> None:
@@ -1166,6 +1227,82 @@ class MatrixAdapter(BasePlatformAdapter):
for rid in self._joined_rooms
}
# ------------------------------------------------------------------
# Thread participation tracking
# ------------------------------------------------------------------
@staticmethod
def _thread_state_path() -> Path:
"""Path to the persisted thread participation set."""
from hermes_cli.config import get_hermes_home
return get_hermes_home() / "matrix_threads.json"
@classmethod
def _load_participated_threads(cls) -> set:
"""Load persisted thread IDs from disk."""
path = cls._thread_state_path()
try:
if path.exists():
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, list):
return set(data)
except Exception as e:
logger.debug("Could not load matrix thread state: %s", e)
return set()
def _save_participated_threads(self) -> None:
"""Persist the current thread set to disk (best-effort)."""
path = self._thread_state_path()
try:
thread_list = list(self._bot_participated_threads)
if len(thread_list) > self._MAX_TRACKED_THREADS:
thread_list = thread_list[-self._MAX_TRACKED_THREADS:]
self._bot_participated_threads = set(thread_list)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(thread_list), encoding="utf-8")
except Exception as e:
logger.debug("Could not save matrix thread state: %s", e)
def _track_thread(self, thread_id: str) -> None:
"""Add a thread to the participation set and persist."""
if thread_id not in self._bot_participated_threads:
self._bot_participated_threads.add(thread_id)
self._save_participated_threads()
# ------------------------------------------------------------------
# Mention detection helpers
# ------------------------------------------------------------------
def _is_bot_mentioned(self, body: str, formatted_body: Optional[str] = None) -> bool:
"""Return True if the bot is mentioned in the message."""
if not body and not formatted_body:
return False
# Check for full @user:server in body
if self._user_id and self._user_id in body:
return True
# Check for localpart with word boundaries (case-insensitive)
if self._user_id and ":" in self._user_id:
localpart = self._user_id.split(":")[0].lstrip("@")
if localpart and re.search(r'\b' + re.escape(localpart) + r'\b', body, re.IGNORECASE):
return True
# Check formatted_body for Matrix pill
if formatted_body and self._user_id:
if f"matrix.to/#/{self._user_id}" in formatted_body:
return True
return False
def _strip_mention(self, body: str) -> str:
"""Remove bot mention from message body."""
# Remove full @user:server
if self._user_id:
body = body.replace(self._user_id, "")
# If still contains localpart mention, remove it
if self._user_id and ":" in self._user_id:
localpart = self._user_id.split(":")[0].lstrip("@")
if localpart:
body = re.sub(r'\b' + re.escape(localpart) + r'\b', '', body, flags=re.IGNORECASE)
return body.strip()
def _get_display_name(self, room: Any, user_id: str) -> str:
"""Get a user's display name in a room, falling back to user_id."""
if room and hasattr(room, "users"):
+25
View File
@@ -42,6 +42,7 @@ _EXTRA_ENV_KEYS = frozenset({
"WHATSAPP_MODE", "WHATSAPP_ENABLED",
"MATTERMOST_HOME_CHANNEL", "MATTERMOST_REPLY_MODE",
"MATRIX_PASSWORD", "MATRIX_ENCRYPTION", "MATRIX_HOME_ROOM",
"MATRIX_REQUIRE_MENTION", "MATRIX_FREE_RESPONSE_ROOMS", "MATRIX_AUTO_THREAD",
})
import yaml
@@ -1008,6 +1009,30 @@ OPTIONAL_ENV_VARS = {
"password": False,
"category": "messaging",
},
"MATRIX_REQUIRE_MENTION": {
"description": "Require @mention in Matrix rooms (default: true). Set to false to respond to all messages.",
"prompt": "Require @mention in rooms (true/false)",
"url": None,
"password": False,
"category": "messaging",
"advanced": True,
},
"MATRIX_FREE_RESPONSE_ROOMS": {
"description": "Comma-separated Matrix room IDs where bot responds without @mention",
"prompt": "Free-response room IDs (comma-separated)",
"url": None,
"password": False,
"category": "messaging",
"advanced": True,
},
"MATRIX_AUTO_THREAD": {
"description": "Auto-create threads for messages in Matrix rooms (default: true)",
"prompt": "Auto-create threads in rooms (true/false)",
"url": None,
"password": False,
"category": "messaging",
"advanced": True,
},
"GATEWAY_ALLOW_ALL_USERS": {
"description": "Allow all users to interact with messaging bots (true/false). Default: false.",
"prompt": "Allow all users (true/false)",
+10
View File
@@ -90,6 +90,9 @@ def cron_list(show_all: bool = False):
print(f" Deliver: {deliver_str}")
if skills:
print(f" Skills: {', '.join(skills)}")
script = job.get("script")
if script:
print(f" Script: {script}")
print()
from hermes_cli.gateway import find_gateway_pids
@@ -149,6 +152,7 @@ def cron_create(args):
repeat=getattr(args, "repeat", None),
skill=getattr(args, "skill", None),
skills=_normalize_skills(getattr(args, "skill", None), getattr(args, "skills", None)),
script=getattr(args, "script", None),
)
if not result.get("success"):
print(color(f"Failed to create job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -158,6 +162,9 @@ def cron_create(args):
print(f" Schedule: {result['schedule']}")
if result.get("skills"):
print(f" Skills: {', '.join(result['skills'])}")
job_data = result.get("job", {})
if job_data.get("script"):
print(f" Script: {job_data['script']}")
print(f" Next run: {result['next_run_at']}")
return 0
@@ -195,6 +202,7 @@ def cron_edit(args):
deliver=getattr(args, "deliver", None),
repeat=getattr(args, "repeat", None),
skills=final_skills,
script=getattr(args, "script", None),
)
if not result.get("success"):
print(color(f"Failed to update job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -208,6 +216,8 @@ def cron_edit(args):
print(f" Skills: {', '.join(updated['skills'])}")
else:
print(" Skills: none")
if updated.get("script"):
print(f" Script: {updated['script']}")
return 0
+2
View File
@@ -4381,6 +4381,7 @@ For more help on a command:
cron_create.add_argument("--deliver", help="Delivery target: origin, local, telegram, discord, signal, or platform:chat_id")
cron_create.add_argument("--repeat", type=int, help="Optional repeat count")
cron_create.add_argument("--skill", dest="skills", action="append", help="Attach a skill. Repeat to add multiple skills.")
cron_create.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run")
# cron edit
cron_edit = cron_subparsers.add_parser("edit", help="Edit an existing scheduled job")
@@ -4394,6 +4395,7 @@ For more help on a command:
cron_edit.add_argument("--add-skill", dest="add_skills", action="append", help="Append a skill without replacing the existing list. Repeatable.")
cron_edit.add_argument("--remove-skill", dest="remove_skills", action="append", help="Remove a specific attached skill. Repeatable.")
cron_edit.add_argument("--clear-skills", action="store_true", help="Remove all attached skills from the job")
cron_edit.add_argument("--script", help="Path to a Python script whose stdout is injected into the prompt each run. Pass empty string to clear.")
# lifecycle actions
cron_pause = cron_subparsers.add_parser("pause", help="Pause a scheduled job")
+27 -4
View File
@@ -151,6 +151,7 @@ def _install_dependencies(provider_name: str) -> None:
"honcho-ai": "honcho",
"mem0ai": "mem0",
"hindsight-client": "hindsight_client",
"hindsight-all": "hindsight",
}
# Check which packages are missing
@@ -166,9 +167,18 @@ def _install_dependencies(provider_name: str) -> None:
return
print(f"\n Installing dependencies: {', '.join(missing)}")
import shutil
uv_path = shutil.which("uv")
if not uv_path:
print(f" ⚠ uv not found — cannot install dependencies")
print(f" Install uv: curl -LsSf https://astral.sh/uv/install.sh | sh")
print(f" Then re-run: hermes memory setup")
return
try:
subprocess.run(
[sys.executable, "-m", "pip", "install", "--quiet"] + missing,
[uv_path, "pip", "install", "--python", sys.executable, "--quiet"] + missing,
check=True, timeout=120,
capture_output=True,
)
@@ -178,10 +188,10 @@ def _install_dependencies(provider_name: str) -> None:
stderr = (e.stderr or b"").decode()[:200]
if stderr:
print(f" {stderr}")
print(f" Run manually: pip install {' '.join(missing)}")
print(f" Run manually: uv pip install --python {sys.executable} {' '.join(missing)}")
except Exception as e:
print(f" ⚠ Install failed: {e}")
print(f" Run manually: pip install {' '.join(missing)}")
print(f" Run manually: uv pip install --python {sys.executable} {' '.join(missing)}")
# Also show external dependencies (non-pip) if any
ext_deps = meta.get("external_dependencies", [])
@@ -275,7 +285,6 @@ def cmd_setup(args) -> None:
schema = provider.get_config_schema() if hasattr(provider, "get_config_schema") else []
# Provider config section
provider_config = config["memory"].get(name, {})
if not isinstance(provider_config, dict):
provider_config = {}
@@ -290,11 +299,25 @@ def cmd_setup(args) -> None:
key = field["key"]
desc = field.get("description", key)
default = field.get("default")
# Dynamic default: look up default from another field's value
default_from = field.get("default_from")
if default_from and isinstance(default_from, dict):
ref_field = default_from.get("field", "")
ref_map = default_from.get("map", {})
ref_value = provider_config.get(ref_field, "")
if ref_value and ref_value in ref_map:
default = ref_map[ref_value]
is_secret = field.get("secret", False)
choices = field.get("choices")
env_var = field.get("env_var")
url = field.get("url")
# Skip fields whose "when" condition doesn't match
when = field.get("when")
if when and isinstance(when, dict):
if not all(provider_config.get(k) == v for k, v in when.items()):
continue
if choices and not is_secret:
# Use curses picker for choice fields
choice_items = [(c, "") for c in choices]
+67 -7
View File
@@ -1,11 +1,11 @@
# Hindsight Memory Provider
Long-term memory with knowledge graph, entity resolution, and multi-strategy retrieval. Supports cloud and local modes.
Long-term memory with knowledge graph, entity resolution, and multi-strategy retrieval. Supports cloud and local (embedded) modes.
## Requirements
- Cloud: `pip install hindsight-client` + API key from [app.hindsight.vectorize.io](https://app.hindsight.vectorize.io)
- Local: `pip install hindsight` + LLM API key for embeddings
- **Cloud:** API key from [ui.hindsight.vectorize.io](https://ui.hindsight.vectorize.io)
- **Local:** API key for a supported LLM provider (OpenAI, Anthropic, Gemini, Groq, MiniMax, or Ollama). Embeddings and reranking run locally — no additional API keys needed.
## Setup
@@ -13,26 +13,86 @@ Long-term memory with knowledge graph, entity resolution, and multi-strategy ret
hermes memory setup # select "hindsight"
```
Or manually:
The setup wizard will install dependencies automatically via `uv` and walk you through configuration.
Or manually (cloud mode with defaults):
```bash
hermes config set memory.provider hindsight
echo "HINDSIGHT_API_KEY=your-key" >> ~/.hermes/.env
```
### Cloud Mode
Connects to the Hindsight Cloud API. Requires an API key from [ui.hindsight.vectorize.io](https://ui.hindsight.vectorize.io).
### Local Mode
Runs an embedded Hindsight server with built-in PostgreSQL. Requires an LLM API key (e.g. Groq, OpenAI, Anthropic) for memory extraction and synthesis. The daemon starts automatically in the background on first use and stops after 5 minutes of inactivity.
Daemon startup logs: `~/.hermes/logs/hindsight-embed.log`
Daemon runtime logs: `~/.hindsight/profiles/<profile>.log`
## Config
Config file: `$HERMES_HOME/hindsight/config.json` (or `~/.hindsight/config.json` legacy)
Config file: `~/.hermes/hindsight/config.json`
### Connection
| Key | Default | Description |
|-----|---------|-------------|
| `mode` | `cloud` | `cloud` or `local` |
| `bank_id` | `hermes` | Memory bank identifier |
| `budget` | `mid` | Recall thoroughness: `low`/`mid`/`high` |
| `api_url` | `https://api.hindsight.vectorize.io` | API URL (cloud mode) |
| `api_url` | `http://localhost:8888` | API URL (local mode, unused — daemon manages its own port) |
### Memory
| Key | Default | Description |
|-----|---------|-------------|
| `bank_id` | `hermes` | Memory bank name |
| `budget` | `mid` | Recall thoroughness: `low` / `mid` / `high` |
### Integration
| Key | Default | Description |
|-----|---------|-------------|
| `memory_mode` | `hybrid` | How memories are integrated into the agent |
| `prefetch_method` | `recall` | Method for automatic context injection |
**memory_mode:**
- `hybrid` — automatic context injection + tools available to the LLM
- `context` — automatic injection only, no tools exposed
- `tools` — tools only, no automatic injection
**prefetch_method:**
- `recall` — injects raw memory facts (fast)
- `reflect` — injects LLM-synthesized summary (slower, more coherent)
### Local Mode LLM
| Key | Default | Description |
|-----|---------|-------------|
| `llm_provider` | `openai` | LLM provider: `openai`, `anthropic`, `gemini`, `groq`, `minimax`, `ollama` |
| `llm_model` | per-provider | Model name (e.g. `gpt-4o-mini`, `openai/gpt-oss-120b`) |
The LLM API key is stored in `~/.hermes/.env` as `HINDSIGHT_LLM_API_KEY`.
## Tools
Available in `hybrid` and `tools` memory modes:
| Tool | Description |
|------|-------------|
| `hindsight_retain` | Store information with auto entity extraction |
| `hindsight_recall` | Multi-strategy search (semantic + entity graph) |
| `hindsight_reflect` | Cross-memory synthesis (LLM-powered) |
## Environment Variables
| Variable | Description |
|----------|-------------|
| `HINDSIGHT_API_KEY` | API key for Hindsight Cloud |
| `HINDSIGHT_LLM_API_KEY` | LLM API key for local mode |
| `HINDSIGHT_API_URL` | Override API endpoint |
| `HINDSIGHT_BANK_ID` | Override bank name |
| `HINDSIGHT_BUDGET` | Override recall budget |
| `HINDSIGHT_MODE` | Override mode (`cloud` / `local`) |
+229 -72
View File
@@ -1,7 +1,7 @@
"""Hindsight memory plugin — MemoryProvider interface.
Long-term memory with knowledge graph, entity resolution, and multi-strategy
retrieval. Supports cloud (API key) and local (embedded PostgreSQL) modes.
retrieval. Supports cloud (API key) and local modes.
Original PR #1811 by benfrank241, adapted to MemoryProvider ABC.
@@ -18,10 +18,10 @@ Or via $HERMES_HOME/hindsight/config.json (profile-scoped), falling back to
from __future__ import annotations
import asyncio
import json
import logging
import os
import queue
import threading
from typing import Any, Dict, List
@@ -30,30 +30,51 @@ from agent.memory_provider import MemoryProvider
logger = logging.getLogger(__name__)
_DEFAULT_API_URL = "https://api.hindsight.vectorize.io"
_DEFAULT_LOCAL_URL = "http://localhost:8888"
_VALID_BUDGETS = {"low", "mid", "high"}
_PROVIDER_DEFAULT_MODELS = {
"openai": "gpt-4o-mini",
"anthropic": "claude-haiku-4-5",
"gemini": "gemini-2.5-flash",
"groq": "openai/gpt-oss-120b",
"minimax": "MiniMax-M2.7",
"ollama": "gemma3:12b",
"lmstudio": "local-model",
}
# ---------------------------------------------------------------------------
# Thread helper (from original PR — avoids aiohttp event loop conflicts)
# Dedicated event loop for Hindsight async calls (one per process, reused).
# Avoids creating ephemeral loops that leak aiohttp sessions.
# ---------------------------------------------------------------------------
def _run_in_thread(fn, timeout: float = 30.0):
result_q: queue.Queue = queue.Queue(maxsize=1)
_loop: asyncio.AbstractEventLoop | None = None
_loop_thread: threading.Thread | None = None
_loop_lock = threading.Lock()
def _run():
import asyncio
asyncio.set_event_loop(None)
try:
result_q.put(("ok", fn()))
except Exception as exc:
result_q.put(("err", exc))
t = threading.Thread(target=_run, daemon=True, name="hindsight-call")
t.start()
kind, value = result_q.get(timeout=timeout)
if kind == "err":
raise value
return value
def _get_loop() -> asyncio.AbstractEventLoop:
"""Return a long-lived event loop running on a background thread."""
global _loop, _loop_thread
with _loop_lock:
if _loop is not None and _loop.is_running():
return _loop
_loop = asyncio.new_event_loop()
def _run():
asyncio.set_event_loop(_loop)
_loop.run_forever()
_loop_thread = threading.Thread(target=_run, daemon=True, name="hindsight-loop")
_loop_thread.start()
return _loop
def _run_sync(coro, timeout: float = 120.0):
"""Schedule *coro* on the shared loop and block until done."""
loop = _get_loop()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result(timeout=timeout)
# ---------------------------------------------------------------------------
@@ -161,9 +182,13 @@ class HindsightMemoryProvider(MemoryProvider):
def __init__(self):
self._config = None
self._api_key = None
self._api_url = _DEFAULT_API_URL
self._bank_id = "hermes"
self._budget = "mid"
self._mode = "cloud"
self._memory_mode = "hybrid" # "context", "tools", or "hybrid"
self._prefetch_method = "recall" # "recall" or "reflect"
self._client = None
self._prefetch_result = ""
self._prefetch_lock = threading.Lock()
self._prefetch_thread = None
@@ -178,10 +203,10 @@ class HindsightMemoryProvider(MemoryProvider):
cfg = _load_config()
mode = cfg.get("mode", "cloud")
if mode == "local":
embed = cfg.get("embed", {})
return bool(embed.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY"))
api_key = cfg.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", "")
return bool(api_key)
return True
has_key = bool(cfg.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", ""))
has_url = bool(cfg.get("api_url") or os.environ.get("HINDSIGHT_API_URL", ""))
return has_key or has_url
except Exception:
return False
@@ -204,49 +229,148 @@ class HindsightMemoryProvider(MemoryProvider):
def get_config_schema(self):
return [
{"key": "mode", "description": "Cloud API or local embedded mode", "default": "cloud", "choices": ["cloud", "local"]},
{"key": "api_key", "description": "Hindsight Cloud API key", "secret": True, "env_var": "HINDSIGHT_API_KEY", "url": "https://app.hindsight.vectorize.io"},
{"key": "bank_id", "description": "Memory bank identifier", "default": "hermes"},
{"key": "api_url", "description": "Hindsight API URL", "default": _DEFAULT_API_URL, "when": {"mode": "cloud"}},
{"key": "api_key", "description": "Hindsight Cloud API key", "secret": True, "env_var": "HINDSIGHT_API_KEY", "url": "https://ui.hindsight.vectorize.io", "when": {"mode": "cloud"}},
{"key": "llm_provider", "description": "LLM provider for local mode", "default": "openai", "choices": ["openai", "anthropic", "gemini", "groq", "minimax", "ollama"], "when": {"mode": "local"}},
{"key": "llm_api_key", "description": "LLM API key for local Hindsight", "secret": True, "env_var": "HINDSIGHT_LLM_API_KEY", "when": {"mode": "local"}},
{"key": "llm_model", "description": "LLM model for local mode", "default": "gpt-4o-mini", "default_from": {"field": "llm_provider", "map": _PROVIDER_DEFAULT_MODELS}, "when": {"mode": "local"}},
{"key": "bank_id", "description": "Memory bank name", "default": "hermes"},
{"key": "budget", "description": "Recall thoroughness", "default": "mid", "choices": ["low", "mid", "high"]},
{"key": "llm_provider", "description": "LLM provider for local mode", "default": "anthropic", "choices": ["anthropic", "openai", "groq", "ollama"]},
{"key": "llm_api_key", "description": "LLM API key for local mode", "secret": True, "env_var": "HINDSIGHT_LLM_API_KEY"},
{"key": "llm_model", "description": "LLM model for local mode", "default": "claude-haiku-4-5-20251001"},
{"key": "memory_mode", "description": "Memory integration mode", "default": "hybrid", "choices": ["hybrid", "context", "tools"]},
{"key": "prefetch_method", "description": "Auto-recall method", "default": "recall", "choices": ["recall", "reflect"]},
]
def _make_client(self):
"""Create a fresh Hindsight client (thread-safe)."""
if self._mode == "local":
from hindsight import HindsightEmbedded
embed = self._config.get("embed", {})
return HindsightEmbedded(
profile=embed.get("profile", "hermes"),
llm_provider=embed.get("llmProvider", ""),
llm_api_key=embed.get("llmApiKey", ""),
llm_model=embed.get("llmModel", ""),
)
from hindsight_client import Hindsight
return Hindsight(api_key=self._api_key, timeout=30.0)
def _get_client(self):
"""Return the cached Hindsight client (created once, reused)."""
if self._client is None:
if self._mode == "local":
from hindsight import HindsightEmbedded
# Disable __del__ on the class to prevent "attached to a
# different loop" errors during GC — we handle cleanup in
# shutdown() instead.
HindsightEmbedded.__del__ = lambda self: None
self._client = HindsightEmbedded(
profile=self._config.get("profile", "hermes"),
llm_provider=self._config.get("llm_provider", ""),
llm_api_key=self._config.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY", ""),
llm_model=self._config.get("llm_model", ""),
)
else:
from hindsight_client import Hindsight
kwargs = {"base_url": self._api_url, "timeout": 30.0}
if self._api_key:
kwargs["api_key"] = self._api_key
self._client = Hindsight(**kwargs)
return self._client
def initialize(self, session_id: str, **kwargs) -> None:
self._config = _load_config()
self._mode = self._config.get("mode", "cloud")
self._api_key = self._config.get("apiKey") or os.environ.get("HINDSIGHT_API_KEY", "")
default_url = _DEFAULT_LOCAL_URL if self._mode == "local" else _DEFAULT_API_URL
self._api_url = self._config.get("api_url") or os.environ.get("HINDSIGHT_API_URL", default_url)
banks = self._config.get("banks", {}).get("hermes", {})
self._bank_id = banks.get("bankId", "hermes")
budget = banks.get("budget", "mid")
self._bank_id = self._config.get("bank_id") or banks.get("bankId", "hermes")
budget = self._config.get("budget") or banks.get("budget", "mid")
self._budget = budget if budget in _VALID_BUDGETS else "mid"
# Ensure bank exists
try:
client = _run_in_thread(self._make_client)
_run_in_thread(lambda: client.create_bank(bank_id=self._bank_id, name=self._bank_id))
except Exception:
pass # Already exists
memory_mode = self._config.get("memory_mode", "hybrid")
self._memory_mode = memory_mode if memory_mode in ("context", "tools", "hybrid") else "hybrid"
prefetch_method = self._config.get("prefetch_method", "recall")
self._prefetch_method = prefetch_method if prefetch_method in ("recall", "reflect") else "recall"
logger.info("Hindsight initialized: mode=%s, api_url=%s, bank=%s, budget=%s, memory_mode=%s, prefetch_method=%s",
self._mode, self._api_url, self._bank_id, self._budget, self._memory_mode, self._prefetch_method)
# For local mode, start the embedded daemon in the background so it
# doesn't block the chat. Redirect stdout/stderr to a log file to
# prevent rich startup output from spamming the terminal.
if self._mode == "local":
def _start_daemon():
import traceback
from pathlib import Path
log_dir = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) / "logs"
log_dir.mkdir(parents=True, exist_ok=True)
log_path = log_dir / "hindsight-embed.log"
try:
# Redirect the daemon manager's Rich console to our log file
# instead of stderr. This avoids global fd redirects that
# would capture output from other threads.
import hindsight_embed.daemon_embed_manager as dem
from rich.console import Console
dem.console = Console(file=open(log_path, "a"), force_terminal=False)
client = self._get_client()
profile = self._config.get("profile", "hermes")
# Update the profile .env to match our current config so
# the daemon always starts with the right settings.
# If the config changed and the daemon is running, stop it.
from pathlib import Path as _Path
profile_env = _Path.home() / ".hindsight" / "profiles" / f"{profile}.env"
current_key = self._config.get("llmApiKey") or os.environ.get("HINDSIGHT_LLM_API_KEY", "")
current_provider = self._config.get("llm_provider", "")
current_model = self._config.get("llm_model", "")
# Read saved profile config
saved = {}
if profile_env.exists():
for line in profile_env.read_text().splitlines():
if "=" in line and not line.startswith("#"):
k, v = line.split("=", 1)
saved[k.strip()] = v.strip()
config_changed = (
saved.get("HINDSIGHT_API_LLM_PROVIDER") != current_provider or
saved.get("HINDSIGHT_API_LLM_MODEL") != current_model or
saved.get("HINDSIGHT_API_LLM_API_KEY") != current_key
)
if config_changed:
# Write updated profile .env
profile_env.parent.mkdir(parents=True, exist_ok=True)
profile_env.write_text(
f"HINDSIGHT_API_LLM_PROVIDER={current_provider}\n"
f"HINDSIGHT_API_LLM_API_KEY={current_key}\n"
f"HINDSIGHT_API_LLM_MODEL={current_model}\n"
f"HINDSIGHT_API_LOG_LEVEL=info\n"
)
if client._manager.is_running(profile):
with open(log_path, "a") as f:
f.write("\n=== Config changed, restarting daemon ===\n")
client._manager.stop(profile)
client._ensure_started()
with open(log_path, "a") as f:
f.write("\n=== Daemon started successfully ===\n")
except Exception as e:
with open(log_path, "a") as f:
f.write(f"\n=== Daemon startup failed: {e} ===\n")
traceback.print_exc(file=f)
t = threading.Thread(target=_start_daemon, daemon=True, name="hindsight-daemon-start")
t.start()
def system_prompt_block(self) -> str:
if self._memory_mode == "context":
return (
f"# Hindsight Memory\n"
f"Active (context mode). Bank: {self._bank_id}, budget: {self._budget}.\n"
f"Relevant memories are automatically injected into context."
)
if self._memory_mode == "tools":
return (
f"# Hindsight Memory\n"
f"Active (tools mode). Bank: {self._bank_id}, budget: {self._budget}.\n"
f"Use hindsight_recall to search, hindsight_reflect for synthesis, "
f"hindsight_retain to store facts."
)
return (
f"# Hindsight Memory\n"
f"Active. Bank: {self._bank_id}, budget: {self._budget}.\n"
f"Relevant memories are automatically injected into context. "
f"Use hindsight_recall to search, hindsight_reflect for synthesis, "
f"hindsight_retain to store facts."
)
@@ -262,12 +386,18 @@ class HindsightMemoryProvider(MemoryProvider):
return f"## Hindsight Memory\n{result}"
def queue_prefetch(self, query: str, *, session_id: str = "") -> None:
if self._memory_mode == "tools":
return
def _run():
try:
client = self._make_client()
resp = client.recall(bank_id=self._bank_id, query=query, budget=self._budget)
if resp.results:
text = "\n".join(r.text for r in resp.results if r.text)
client = self._get_client()
if self._prefetch_method == "reflect":
resp = _run_sync(client.areflect(bank_id=self._bank_id, query=query, budget=self._budget))
text = resp.text or ""
else:
resp = _run_sync(client.arecall(bank_id=self._bank_id, query=query, budget=self._budget))
text = "\n".join(r.text for r in resp.results if r.text) if resp.results else ""
if text:
with self._prefetch_lock:
self._prefetch_result = text
except Exception as e:
@@ -282,11 +412,10 @@ class HindsightMemoryProvider(MemoryProvider):
def _sync():
try:
_run_in_thread(
lambda: self._make_client().retain(
bank_id=self._bank_id, content=combined, context="conversation"
)
)
client = self._get_client()
_run_sync(client.aretain(
bank_id=self._bank_id, content=combined, context="conversation"
))
except Exception as e:
logger.warning("Hindsight sync failed: %s", e)
@@ -296,22 +425,29 @@ class HindsightMemoryProvider(MemoryProvider):
self._sync_thread.start()
def get_tool_schemas(self) -> List[Dict[str, Any]]:
if self._memory_mode == "context":
return []
return [RETAIN_SCHEMA, RECALL_SCHEMA, REFLECT_SCHEMA]
def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str:
try:
client = self._get_client()
except Exception as e:
logger.warning("Hindsight client init failed: %s", e)
return json.dumps({"error": f"Hindsight client unavailable: {e}"})
if tool_name == "hindsight_retain":
content = args.get("content", "")
if not content:
return json.dumps({"error": "Missing required parameter: content"})
context = args.get("context")
try:
_run_in_thread(
lambda: self._make_client().retain(
bank_id=self._bank_id, content=content, context=context
)
)
_run_sync(client.aretain(
bank_id=self._bank_id, content=content, context=context
))
return json.dumps({"result": "Memory stored successfully."})
except Exception as e:
logger.warning("hindsight_retain failed: %s", e)
return json.dumps({"error": f"Failed to store memory: {e}"})
elif tool_name == "hindsight_recall":
@@ -319,16 +455,15 @@ class HindsightMemoryProvider(MemoryProvider):
if not query:
return json.dumps({"error": "Missing required parameter: query"})
try:
resp = _run_in_thread(
lambda: self._make_client().recall(
bank_id=self._bank_id, query=query, budget=self._budget
)
)
resp = _run_sync(client.arecall(
bank_id=self._bank_id, query=query, budget=self._budget
))
if not resp.results:
return json.dumps({"result": "No relevant memories found."})
lines = [f"{i}. {r.text}" for i, r in enumerate(resp.results, 1)]
return json.dumps({"result": "\n".join(lines)})
except Exception as e:
logger.warning("hindsight_recall failed: %s", e)
return json.dumps({"error": f"Failed to search memory: {e}"})
elif tool_name == "hindsight_reflect":
@@ -336,21 +471,43 @@ class HindsightMemoryProvider(MemoryProvider):
if not query:
return json.dumps({"error": "Missing required parameter: query"})
try:
resp = _run_in_thread(
lambda: self._make_client().reflect(
bank_id=self._bank_id, query=query, budget=self._budget
)
)
resp = _run_sync(client.areflect(
bank_id=self._bank_id, query=query, budget=self._budget
))
return json.dumps({"result": resp.text or "No relevant memories found."})
except Exception as e:
logger.warning("hindsight_reflect failed: %s", e)
return json.dumps({"error": f"Failed to reflect: {e}"})
return json.dumps({"error": f"Unknown tool: {tool_name}"})
def shutdown(self) -> None:
global _loop, _loop_thread
for t in (self._prefetch_thread, self._sync_thread):
if t and t.is_alive():
t.join(timeout=5.0)
if self._client is not None:
try:
if self._mode == "local":
# Use the public close() API. The RuntimeError from
# aiohttp's "attached to a different loop" is expected
# and harmless — the daemon keeps running independently.
try:
self._client.close()
except RuntimeError:
pass
else:
_run_sync(self._client.aclose())
except Exception:
pass
self._client = None
# Stop the background event loop so no tasks are pending at exit
if _loop is not None and _loop.is_running():
_loop.call_soon_threadsafe(_loop.stop)
if _loop_thread is not None:
_loop_thread.join(timeout=5.0)
_loop = None
_loop_thread = None
def register(ctx) -> None:
+1
View File
@@ -3,6 +3,7 @@ version: 1.0.0
description: "Hindsight — long-term memory with knowledge graph, entity resolution, and multi-strategy retrieval."
pip_dependencies:
- hindsight-client
- hindsight-all
requires_env:
- HINDSIGHT_API_KEY
hooks:
+20 -18
View File
@@ -283,9 +283,9 @@ class OpenVikingMemoryProvider(MemoryProvider):
# Provide brief info about the knowledge base
try:
# Check what's in the knowledge base via a root listing
resp = self._client.post("/api/v1/browse", {"action": "stat", "path": "viking://"})
result = resp.get("result", {})
children = result.get("children", 0)
resp = self._client.get("/api/v1/fs/ls", params={"uri": "viking://"})
result = resp.get("result", [])
children = len(result) if isinstance(result, list) else 0
if children == 0:
return ""
return (
@@ -495,16 +495,17 @@ class OpenVikingMemoryProvider(MemoryProvider):
return json.dumps({"error": "uri is required"})
level = args.get("level", "overview")
# Map our level names to OpenViking endpoints
# Map our level names to OpenViking GET endpoints
if level == "abstract":
resp = self._client.post("/api/v1/read/abstract", {"uri": uri})
resp = self._client.get("/api/v1/content/abstract", params={"uri": uri})
elif level == "full":
resp = self._client.post("/api/v1/read", {"uri": uri, "level": "read"})
resp = self._client.get("/api/v1/content/read", params={"uri": uri})
else: # overview
resp = self._client.post("/api/v1/read", {"uri": uri, "level": "overview"})
resp = self._client.get("/api/v1/content/overview", params={"uri": uri})
result = resp.get("result", {})
content = result.get("content", "")
result = resp.get("result", "")
# result is a plain string from the content endpoints
content = result if isinstance(result, str) else result.get("content", "")
# Truncate very long content to avoid flooding the context
if len(content) > 8000:
@@ -520,20 +521,21 @@ class OpenVikingMemoryProvider(MemoryProvider):
action = args.get("action", "list")
path = args.get("path", "viking://")
resp = self._client.post("/api/v1/browse", {
"action": action,
"path": path,
})
# Map action to the correct fs endpoint (all GET with uri= param)
endpoint_map = {"tree": "/api/v1/fs/tree", "list": "/api/v1/fs/ls", "stat": "/api/v1/fs/stat"}
endpoint = endpoint_map.get(action, "/api/v1/fs/ls")
resp = self._client.get(endpoint, params={"uri": path})
result = resp.get("result", {})
# Format for readability
if action == "list" and "entries" in result:
# Format list/tree results for readability
if action in ("list", "tree") and isinstance(result, list):
entries = []
for e in result["entries"][:50]: # cap at 50 entries
for e in result[:50]: # cap at 50 entries
entries.append({
"name": e.get("name", ""),
"name": e.get("rel_path", e.get("name", "")),
"uri": e.get("uri", ""),
"type": "dir" if e.get("is_dir") else "file",
"type": "dir" if e.get("isDir") else "file",
"abstract": e.get("abstract", ""),
})
return json.dumps({"path": path, "entries": entries}, ensure_ascii=False)
+17 -4
View File
@@ -6649,8 +6649,8 @@ class AIAgent:
# Plugin hook: pre_llm_call
# Fired once per turn before the tool-calling loop. Plugins can
# return a dict with a ``context`` key whose value is a string
# that will be appended to the ephemeral system prompt for every
# API call in this turn (not persisted to session DB or cache).
# that will be injected at request time for every API call in
# this turn (not persisted to session DB or cached prefix).
_plugin_turn_context = ""
try:
from hermes_cli.plugins import invoke_hook as _invoke_hook
@@ -6796,8 +6796,11 @@ class AIAgent:
effective_system = active_system_prompt or ""
if self.ephemeral_system_prompt:
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
# Plugin context from pre_llm_call hooks — ephemeral, not cached.
if _plugin_turn_context:
# Plugin context from pre_llm_call hooks.
# For non-cached providers/requests we can append directly.
# For Anthropic prompt-cached requests we inject it later as an
# uncached system suffix block so the cache key stays stable.
if _plugin_turn_context and not self._use_prompt_caching:
effective_system = (effective_system + "\n\n" + _plugin_turn_context).strip()
if effective_system:
api_messages = [{"role": "system", "content": effective_system}] + api_messages
@@ -6816,6 +6819,16 @@ class AIAgent:
if self._use_prompt_caching:
api_messages = apply_anthropic_cache_control(api_messages, cache_ttl=self._cache_ttl, native_anthropic=(self.api_mode == 'anthropic_messages'))
# Append plugin context AFTER cache markers so the system-level
# cache key stays stable even when plugin output varies per turn.
if _plugin_turn_context and api_messages and api_messages[0].get("role") == "system":
_sys = api_messages[0].get("content", "")
_blocks = list(_sys) if isinstance(_sys, list) else [{"type": "text", "text": _sys}] if isinstance(_sys, str) else []
_blocks.append({"type": "text", "text": _plugin_turn_context})
api_messages[0]["content"] = _blocks
elif _plugin_turn_context:
api_messages.insert(0, {"role": "system", "content": _plugin_turn_context})
# Safety net: strip orphaned tool results / add stubs for missing
# results before sending to the API. Runs unconditionally — not
# gated on context_compressor — so orphans from session loading or
+250
View File
@@ -547,3 +547,253 @@ class TestPluginMemoryDiscovery:
"""load_memory_provider returns None for unknown names."""
from plugins.memory import load_memory_provider
assert load_memory_provider("nonexistent_provider") is None
# ---------------------------------------------------------------------------
# Sequential dispatch routing tests
# ---------------------------------------------------------------------------
class TestSequentialDispatchRouting:
"""Verify that memory provider tools are correctly routed through
memory_manager.has_tool() and handle_tool_call().
This is a regression test for a bug where _execute_tool_calls_sequential
in run_agent.py had its own inline dispatch chain that skipped
memory_manager.has_tool(), causing all memory provider tools to fall
through to the registry and return "Unknown tool". The fix added
has_tool() + handle_tool_call() to the sequential path.
These tests verify the memory_manager contract that both dispatch
paths rely on: has_tool() returns True for registered provider tools,
and handle_tool_call() routes to the correct provider.
"""
def test_has_tool_returns_true_for_provider_tools(self):
"""has_tool returns True for tools registered by memory providers."""
mgr = MemoryManager()
provider = FakeMemoryProvider("ext", tools=[
{"name": "ext_recall", "description": "Ext recall", "parameters": {}},
{"name": "ext_retain", "description": "Ext retain", "parameters": {}},
])
mgr.add_provider(provider)
assert mgr.has_tool("ext_recall")
assert mgr.has_tool("ext_retain")
def test_has_tool_returns_false_for_builtin_tools(self):
"""has_tool returns False for agent-level tools (terminal, memory, etc.)."""
mgr = MemoryManager()
provider = FakeMemoryProvider("ext", tools=[
{"name": "ext_recall", "description": "Ext", "parameters": {}},
])
mgr.add_provider(provider)
assert not mgr.has_tool("terminal")
assert not mgr.has_tool("memory")
assert not mgr.has_tool("todo")
assert not mgr.has_tool("session_search")
assert not mgr.has_tool("nonexistent")
def test_handle_tool_call_routes_to_provider(self):
"""handle_tool_call dispatches to the correct provider's handler."""
mgr = MemoryManager()
provider = FakeMemoryProvider("hindsight", tools=[
{"name": "hindsight_recall", "description": "Recall", "parameters": {}},
{"name": "hindsight_retain", "description": "Retain", "parameters": {}},
])
mgr.add_provider(provider)
result = json.loads(mgr.handle_tool_call("hindsight_recall", {"query": "alice"}))
assert result["handled"] == "hindsight_recall"
assert result["args"] == {"query": "alice"}
def test_handle_tool_call_unknown_returns_error(self):
"""handle_tool_call returns error for tools not in any provider."""
mgr = MemoryManager()
provider = FakeMemoryProvider("ext", tools=[
{"name": "ext_recall", "description": "Ext", "parameters": {}},
])
mgr.add_provider(provider)
result = json.loads(mgr.handle_tool_call("terminal", {"command": "ls"}))
assert "error" in result
def test_multiple_providers_route_to_correct_one(self):
"""Tools from different providers route to the right handler."""
mgr = MemoryManager()
builtin = FakeMemoryProvider("builtin", tools=[
{"name": "builtin_tool", "description": "Builtin", "parameters": {}},
])
external = FakeMemoryProvider("hindsight", tools=[
{"name": "hindsight_recall", "description": "Recall", "parameters": {}},
])
mgr.add_provider(builtin)
mgr.add_provider(external)
r1 = json.loads(mgr.handle_tool_call("builtin_tool", {}))
assert r1["handled"] == "builtin_tool"
r2 = json.loads(mgr.handle_tool_call("hindsight_recall", {"query": "test"}))
assert r2["handled"] == "hindsight_recall"
def test_tool_names_include_all_providers(self):
"""get_all_tool_names returns tools from all registered providers."""
mgr = MemoryManager()
builtin = FakeMemoryProvider("builtin", tools=[
{"name": "builtin_tool", "description": "B", "parameters": {}},
])
external = FakeMemoryProvider("ext", tools=[
{"name": "ext_recall", "description": "E1", "parameters": {}},
{"name": "ext_retain", "description": "E2", "parameters": {}},
])
mgr.add_provider(builtin)
mgr.add_provider(external)
names = mgr.get_all_tool_names()
assert names == {"builtin_tool", "ext_recall", "ext_retain"}
# ---------------------------------------------------------------------------
# Setup wizard field filtering tests (when clause and default_from)
# ---------------------------------------------------------------------------
class TestSetupFieldFiltering:
"""Test the 'when' clause and 'default_from' logic used by the
memory setup wizard in hermes_cli/memory_setup.py.
These features are generic any memory plugin can use them in
get_config_schema(). Currently used by the hindsight plugin.
"""
def _filter_fields(self, schema, provider_config):
"""Simulate the setup wizard's field filtering logic.
Returns list of (key, effective_default) for fields that pass
the 'when' filter.
"""
results = []
for field in schema:
key = field["key"]
default = field.get("default")
# Dynamic default
default_from = field.get("default_from")
if default_from and isinstance(default_from, dict):
ref_field = default_from.get("field", "")
ref_map = default_from.get("map", {})
ref_value = provider_config.get(ref_field, "")
if ref_value and ref_value in ref_map:
default = ref_map[ref_value]
# When clause
when = field.get("when")
if when and isinstance(when, dict):
if not all(provider_config.get(k) == v for k, v in when.items()):
continue
results.append((key, default))
return results
def test_when_clause_filters_fields(self):
"""Fields with 'when' are skipped if the condition doesn't match."""
schema = [
{"key": "mode", "default": "cloud"},
{"key": "api_url", "default": "https://api.example.com", "when": {"mode": "cloud"}},
{"key": "api_key", "default": None, "when": {"mode": "cloud"}},
{"key": "llm_provider", "default": "openai", "when": {"mode": "local"}},
{"key": "llm_model", "default": "gpt-4o-mini", "when": {"mode": "local"}},
{"key": "budget", "default": "mid"},
]
# Cloud mode: should see mode, api_url, api_key, budget
cloud_fields = self._filter_fields(schema, {"mode": "cloud"})
cloud_keys = [k for k, _ in cloud_fields]
assert cloud_keys == ["mode", "api_url", "api_key", "budget"]
# Local mode: should see mode, llm_provider, llm_model, budget
local_fields = self._filter_fields(schema, {"mode": "local"})
local_keys = [k for k, _ in local_fields]
assert local_keys == ["mode", "llm_provider", "llm_model", "budget"]
def test_when_clause_no_condition_always_shown(self):
"""Fields without 'when' are always included."""
schema = [
{"key": "bank_id", "default": "hermes"},
{"key": "budget", "default": "mid"},
]
fields = self._filter_fields(schema, {"mode": "cloud"})
assert [k for k, _ in fields] == ["bank_id", "budget"]
def test_default_from_resolves_dynamic_default(self):
"""default_from looks up the default from another field's value."""
provider_models = {
"openai": "gpt-4o-mini",
"groq": "openai/gpt-oss-120b",
"anthropic": "claude-haiku-4-5",
}
schema = [
{"key": "llm_provider", "default": "openai"},
{"key": "llm_model", "default": "gpt-4o-mini",
"default_from": {"field": "llm_provider", "map": provider_models}},
]
# Groq selected: model should default to groq's default
fields = self._filter_fields(schema, {"llm_provider": "groq"})
model_default = dict(fields)["llm_model"]
assert model_default == "openai/gpt-oss-120b"
# Anthropic selected
fields = self._filter_fields(schema, {"llm_provider": "anthropic"})
model_default = dict(fields)["llm_model"]
assert model_default == "claude-haiku-4-5"
def test_default_from_falls_back_to_static_default(self):
"""default_from falls back to static default if provider not in map."""
schema = [
{"key": "llm_model", "default": "gpt-4o-mini",
"default_from": {"field": "llm_provider", "map": {"groq": "openai/gpt-oss-120b"}}},
]
# Unknown provider: should fall back to static default
fields = self._filter_fields(schema, {"llm_provider": "unknown_provider"})
model_default = dict(fields)["llm_model"]
assert model_default == "gpt-4o-mini"
def test_default_from_with_no_ref_value(self):
"""default_from keeps static default if referenced field is not set."""
schema = [
{"key": "llm_model", "default": "gpt-4o-mini",
"default_from": {"field": "llm_provider", "map": {"groq": "openai/gpt-oss-120b"}}},
]
# No provider set at all
fields = self._filter_fields(schema, {})
model_default = dict(fields)["llm_model"]
assert model_default == "gpt-4o-mini"
def test_when_and_default_from_combined(self):
"""when clause and default_from work together correctly."""
provider_models = {"groq": "openai/gpt-oss-120b", "openai": "gpt-4o-mini"}
schema = [
{"key": "mode", "default": "local"},
{"key": "llm_provider", "default": "openai", "when": {"mode": "local"}},
{"key": "llm_model", "default": "gpt-4o-mini",
"default_from": {"field": "llm_provider", "map": provider_models},
"when": {"mode": "local"}},
{"key": "api_url", "default": "https://api.example.com", "when": {"mode": "cloud"}},
]
# Local + groq: should see llm_model with groq default, no api_url
fields = self._filter_fields(schema, {"mode": "local", "llm_provider": "groq"})
keys = [k for k, _ in fields]
assert "llm_model" in keys
assert "api_url" not in keys
assert dict(fields)["llm_model"] == "openai/gpt-oss-120b"
# Cloud: should see api_url, no llm_model
fields = self._filter_fields(schema, {"mode": "cloud"})
keys = [k for k, _ in fields]
assert "api_url" in keys
assert "llm_model" not in keys
+300
View File
@@ -0,0 +1,300 @@
"""Tests for cron job script injection feature.
Tests cover:
- Script field in job creation / storage / update
- Script execution and output injection into prompts
- Error handling (missing script, timeout, non-zero exit)
- Path resolution (absolute, relative to HERMES_HOME/scripts/)
"""
import json
import os
import stat
import sys
import textwrap
from pathlib import Path
from unittest.mock import patch
import pytest
# Ensure project root is importable
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
@pytest.fixture
def cron_env(tmp_path, monkeypatch):
"""Isolated cron environment with temp HERMES_HOME."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / "cron").mkdir()
(hermes_home / "cron" / "output").mkdir()
(hermes_home / "scripts").mkdir()
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
# Clear cached module-level paths
import cron.jobs as jobs_mod
monkeypatch.setattr(jobs_mod, "HERMES_DIR", hermes_home)
monkeypatch.setattr(jobs_mod, "CRON_DIR", hermes_home / "cron")
monkeypatch.setattr(jobs_mod, "JOBS_FILE", hermes_home / "cron" / "jobs.json")
monkeypatch.setattr(jobs_mod, "OUTPUT_DIR", hermes_home / "cron" / "output")
return hermes_home
class TestJobScriptField:
"""Test that the script field is stored and retrieved correctly."""
def test_create_job_with_script(self, cron_env):
from cron.jobs import create_job, get_job
job = create_job(
prompt="Analyze the data",
schedule="every 30m",
script="/path/to/monitor.py",
)
assert job["script"] == "/path/to/monitor.py"
loaded = get_job(job["id"])
assert loaded["script"] == "/path/to/monitor.py"
def test_create_job_without_script(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h")
assert job.get("script") is None
def test_create_job_empty_script_normalized_to_none(self, cron_env):
from cron.jobs import create_job
job = create_job(prompt="Hello", schedule="every 1h", script=" ")
assert job.get("script") is None
def test_update_job_add_script(self, cron_env):
from cron.jobs import create_job, update_job
job = create_job(prompt="Hello", schedule="every 1h")
assert job.get("script") is None
updated = update_job(job["id"], {"script": "/new/script.py"})
assert updated["script"] == "/new/script.py"
def test_update_job_clear_script(self, cron_env):
from cron.jobs import create_job, update_job
job = create_job(prompt="Hello", schedule="every 1h", script="/some/script.py")
assert job["script"] == "/some/script.py"
updated = update_job(job["id"], {"script": None})
assert updated.get("script") is None
class TestRunJobScript:
"""Test the _run_job_script() function."""
def test_successful_script(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "test.py"
script.write_text('print("hello from script")\n')
success, output = _run_job_script(str(script))
assert success is True
assert output == "hello from script"
def test_script_relative_path(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "relative.py"
script.write_text('print("relative works")\n')
success, output = _run_job_script("relative.py")
assert success is True
assert output == "relative works"
def test_script_not_found(self, cron_env):
from cron.scheduler import _run_job_script
success, output = _run_job_script("/nonexistent/script.py")
assert success is False
assert "not found" in output.lower()
def test_script_nonzero_exit(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "fail.py"
script.write_text(textwrap.dedent("""\
import sys
print("partial output")
print("error info", file=sys.stderr)
sys.exit(1)
"""))
success, output = _run_job_script(str(script))
assert success is False
assert "exited with code 1" in output
assert "error info" in output
def test_script_empty_output(self, cron_env):
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "empty.py"
script.write_text("# no output\n")
success, output = _run_job_script(str(script))
assert success is True
assert output == ""
def test_script_timeout(self, cron_env, monkeypatch):
from cron import scheduler as sched_mod
from cron.scheduler import _run_job_script
# Use a very short timeout
monkeypatch.setattr(sched_mod, "_SCRIPT_TIMEOUT", 1)
script = cron_env / "scripts" / "slow.py"
script.write_text("import time; time.sleep(30)\n")
success, output = _run_job_script(str(script))
assert success is False
assert "timed out" in output.lower()
def test_script_json_output(self, cron_env):
"""Scripts can output structured JSON for the LLM to parse."""
from cron.scheduler import _run_job_script
script = cron_env / "scripts" / "json_out.py"
script.write_text(textwrap.dedent("""\
import json
data = {"new_prs": [{"number": 42, "title": "Fix bug"}]}
print(json.dumps(data, indent=2))
"""))
success, output = _run_job_script(str(script))
assert success is True
parsed = json.loads(output)
assert parsed["new_prs"][0]["number"] == 42
class TestBuildJobPromptWithScript:
"""Test that script output is injected into the prompt."""
def test_script_output_injected(self, cron_env):
from cron.scheduler import _build_job_prompt
script = cron_env / "scripts" / "data.py"
script.write_text('print("new PR: #123 fix typo")\n')
job = {
"prompt": "Report any notable changes.",
"script": str(script),
}
prompt = _build_job_prompt(job)
assert "## Script Output" in prompt
assert "new PR: #123 fix typo" in prompt
assert "Report any notable changes." in prompt
def test_script_error_injected(self, cron_env):
from cron.scheduler import _build_job_prompt
job = {
"prompt": "Report status.",
"script": "/nonexistent/script.py",
}
prompt = _build_job_prompt(job)
assert "## Script Error" in prompt
assert "not found" in prompt.lower()
assert "Report status." in prompt
def test_no_script_unchanged(self, cron_env):
from cron.scheduler import _build_job_prompt
job = {"prompt": "Simple job."}
prompt = _build_job_prompt(job)
assert "## Script Output" not in prompt
assert "Simple job." in prompt
def test_script_empty_output_noted(self, cron_env):
from cron.scheduler import _build_job_prompt
script = cron_env / "scripts" / "noop.py"
script.write_text("# nothing\n")
job = {
"prompt": "Check status.",
"script": str(script),
}
prompt = _build_job_prompt(job)
assert "no output" in prompt.lower()
assert "Check status." in prompt
class TestCronjobToolScript:
"""Test the cronjob tool's script parameter."""
def test_create_with_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
script="/home/user/monitor.py",
))
assert result["success"] is True
assert result["job"]["script"] == "/home/user/monitor.py"
def test_update_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
create_result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
))
job_id = create_result["job_id"]
update_result = json.loads(cronjob(
action="update",
job_id=job_id,
script="/new/script.py",
))
assert update_result["success"] is True
assert update_result["job"]["script"] == "/new/script.py"
def test_clear_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
create_result = json.loads(cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
script="/some/script.py",
))
job_id = create_result["job_id"]
update_result = json.loads(cronjob(
action="update",
job_id=job_id,
script="",
))
assert update_result["success"] is True
assert "script" not in update_result["job"]
def test_list_shows_script(self, cron_env, monkeypatch):
monkeypatch.setenv("HERMES_INTERACTIVE", "1")
from tools.cronjob_tools import cronjob
cronjob(
action="create",
schedule="every 1h",
prompt="Monitor things",
script="/path/to/script.py",
)
list_result = json.loads(cronjob(action="list"))
assert list_result["success"] is True
assert len(list_result["jobs"]) == 1
assert list_result["jobs"][0]["script"] == "/path/to/script.py"
+10 -1
View File
@@ -591,7 +591,16 @@ class TestBlockingApprovalE2E:
]
for t in threads:
t.start()
time.sleep(0.3)
# Wait for both threads to register pending approvals instead of
# relying on a fixed sleep. The approval module stores entries in
# _gateway_queues[session_key] — poll until we see 2 entries.
from tools.approval import _gateway_queues
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if len(_gateway_queues.get(session_key, [])) >= 2:
break
time.sleep(0.05)
# Approve first, deny second
resolve_gateway_approval(session_key, "once") # oldest
+492
View File
@@ -0,0 +1,492 @@
"""Tests for Matrix require-mention gating and auto-thread features."""
import json
import sys
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import PlatformConfig
def _ensure_nio_mock():
"""Install a mock nio module when matrix-nio isn't available."""
if "nio" in sys.modules and hasattr(sys.modules["nio"], "__file__"):
return
nio_mod = MagicMock()
nio_mod.MegolmEvent = type("MegolmEvent", (), {})
nio_mod.RoomMessageText = type("RoomMessageText", (), {})
nio_mod.RoomMessageImage = type("RoomMessageImage", (), {})
nio_mod.RoomMessageAudio = type("RoomMessageAudio", (), {})
nio_mod.RoomMessageVideo = type("RoomMessageVideo", (), {})
nio_mod.RoomMessageFile = type("RoomMessageFile", (), {})
nio_mod.DownloadResponse = type("DownloadResponse", (), {})
nio_mod.MemoryDownloadResponse = type("MemoryDownloadResponse", (), {})
nio_mod.InviteMemberEvent = type("InviteMemberEvent", (), {})
sys.modules.setdefault("nio", nio_mod)
_ensure_nio_mock()
def _make_adapter(tmp_path=None):
"""Create a MatrixAdapter with mocked config."""
from gateway.platforms.matrix import MatrixAdapter
config = PlatformConfig(
enabled=True,
token="syt_test_token",
extra={
"homeserver": "https://matrix.example.org",
"user_id": "@hermes:example.org",
},
)
adapter = MatrixAdapter(config)
adapter.handle_message = AsyncMock()
adapter._startup_ts = time.time() - 10 # avoid startup grace filter
return adapter
def _make_room(room_id="!room1:example.org", member_count=5, is_dm=False):
"""Create a fake Matrix room."""
room = SimpleNamespace(
room_id=room_id,
member_count=member_count,
users={},
)
return room
def _make_event(
body,
sender="@alice:example.org",
event_id="$evt1",
formatted_body=None,
thread_id=None,
):
"""Create a fake RoomMessageText event."""
content = {"body": body, "msgtype": "m.text"}
if formatted_body:
content["formatted_body"] = formatted_body
content["format"] = "org.matrix.custom.html"
relates_to = {}
if thread_id:
relates_to["rel_type"] = "m.thread"
relates_to["event_id"] = thread_id
if relates_to:
content["m.relates_to"] = relates_to
return SimpleNamespace(
sender=sender,
event_id=event_id,
server_timestamp=int(time.time() * 1000),
body=body,
source={"content": content},
)
# ---------------------------------------------------------------------------
# Mention detection helpers
# ---------------------------------------------------------------------------
class TestIsBotMentioned:
def setup_method(self):
self.adapter = _make_adapter()
def test_full_user_id_in_body(self):
assert self.adapter._is_bot_mentioned("hey @hermes:example.org help")
def test_localpart_in_body(self):
assert self.adapter._is_bot_mentioned("hermes can you help?")
def test_localpart_case_insensitive(self):
assert self.adapter._is_bot_mentioned("HERMES can you help?")
def test_matrix_pill_in_formatted_body(self):
html = '<a href="https://matrix.to/#/@hermes:example.org">Hermes</a> help'
assert self.adapter._is_bot_mentioned("Hermes help", html)
def test_no_mention(self):
assert not self.adapter._is_bot_mentioned("hello everyone")
def test_empty_body(self):
assert not self.adapter._is_bot_mentioned("")
def test_partial_localpart_no_match(self):
# "hermesbot" should not match word-boundary check for "hermes"
assert not self.adapter._is_bot_mentioned("hermesbot is here")
class TestStripMention:
def setup_method(self):
self.adapter = _make_adapter()
def test_strip_full_user_id(self):
result = self.adapter._strip_mention("@hermes:example.org help me")
assert result == "help me"
def test_strip_localpart(self):
result = self.adapter._strip_mention("hermes help me")
assert result == "help me"
def test_strip_returns_empty_for_mention_only(self):
result = self.adapter._strip_mention("@hermes:example.org")
assert result == ""
# ---------------------------------------------------------------------------
# Require-mention gating in _on_room_message
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_require_mention_default_ignores_unmentioned(monkeypatch):
"""Default (require_mention=true): messages without mention are ignored."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
adapter = _make_adapter()
room = _make_room()
event = _make_event("hello everyone")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_not_awaited()
@pytest.mark.asyncio
async def test_require_mention_default_processes_mentioned(monkeypatch):
"""Default: messages with mention are processed, mention stripped."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room()
event = _make_event("@hermes:example.org help me")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "help me"
@pytest.mark.asyncio
async def test_require_mention_html_pill(monkeypatch):
"""Bot mentioned via HTML pill should be processed."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room()
formatted = '<a href="https://matrix.to/#/@hermes:example.org">Hermes</a> help'
event = _make_event("Hermes help", formatted_body=formatted)
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_require_mention_dm_always_responds(monkeypatch):
"""DMs always respond regardless of mention setting."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
# member_count=2 triggers DM detection
room = _make_room(member_count=2)
event = _make_event("hello without mention")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_dm_strips_mention(monkeypatch):
"""DMs strip mention from body, matching Discord behavior."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room(member_count=2)
event = _make_event("@hermes:example.org help me")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "help me"
@pytest.mark.asyncio
async def test_bare_mention_passes_empty_string(monkeypatch):
"""A message that is only a mention should pass through as empty, not be dropped."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room()
event = _make_event("@hermes:example.org")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == ""
@pytest.mark.asyncio
async def test_require_mention_free_response_room(monkeypatch):
"""Free-response rooms bypass mention requirement."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", "!room1:example.org,!room2:example.org")
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room(room_id="!room1:example.org")
event = _make_event("hello without mention")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_require_mention_bot_participated_thread(monkeypatch):
"""Threads with prior bot participation bypass mention requirement."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
adapter._bot_participated_threads.add("$thread1")
room = _make_room()
event = _make_event("hello without mention", thread_id="$thread1")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
@pytest.mark.asyncio
async def test_require_mention_disabled(monkeypatch):
"""MATRIX_REQUIRE_MENTION=false: all messages processed."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room()
event = _make_event("hello without mention")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.text == "hello without mention"
# ---------------------------------------------------------------------------
# Auto-thread in _on_room_message
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_auto_thread_default_creates_thread(monkeypatch):
"""Default (auto_thread=true): sets thread_id to event.event_id."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
adapter = _make_adapter()
room = _make_room()
event = _make_event("hello", event_id="$msg1")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.source.thread_id == "$msg1"
@pytest.mark.asyncio
async def test_auto_thread_preserves_existing_thread(monkeypatch):
"""If message is already in a thread, thread_id is not overridden."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
adapter = _make_adapter()
adapter._bot_participated_threads.add("$thread_root")
room = _make_room()
event = _make_event("reply in thread", thread_id="$thread_root")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.source.thread_id == "$thread_root"
@pytest.mark.asyncio
async def test_auto_thread_skips_dm(monkeypatch):
"""DMs should not get auto-threaded."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
adapter = _make_adapter()
room = _make_room(member_count=2)
event = _make_event("hello dm", event_id="$dm1")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.source.thread_id is None
@pytest.mark.asyncio
async def test_auto_thread_disabled(monkeypatch):
"""MATRIX_AUTO_THREAD=false: thread_id stays None."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.setenv("MATRIX_AUTO_THREAD", "false")
adapter = _make_adapter()
room = _make_room()
event = _make_event("hello", event_id="$msg1")
await adapter._on_room_message(room, event)
adapter.handle_message.assert_awaited_once()
msg = adapter.handle_message.await_args.args[0]
assert msg.source.thread_id is None
@pytest.mark.asyncio
async def test_auto_thread_tracks_participation(monkeypatch):
"""Auto-created threads are tracked in _bot_participated_threads."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "false")
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
adapter = _make_adapter()
room = _make_room()
event = _make_event("hello", event_id="$msg1")
with patch.object(adapter, "_save_participated_threads"):
await adapter._on_room_message(room, event)
assert "$msg1" in adapter._bot_participated_threads
# ---------------------------------------------------------------------------
# Thread persistence
# ---------------------------------------------------------------------------
class TestThreadPersistence:
def test_empty_state_file(self, tmp_path, monkeypatch):
"""No state file → empty set."""
monkeypatch.setattr(
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
staticmethod(lambda: tmp_path / "matrix_threads.json"),
)
adapter = _make_adapter()
loaded = adapter._load_participated_threads()
assert loaded == set()
def test_track_thread_persists(self, tmp_path, monkeypatch):
"""_track_thread writes to disk."""
state_path = tmp_path / "matrix_threads.json"
monkeypatch.setattr(
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
staticmethod(lambda: state_path),
)
adapter = _make_adapter()
adapter._track_thread("$thread_abc")
data = json.loads(state_path.read_text())
assert "$thread_abc" in data
def test_threads_survive_reload(self, tmp_path, monkeypatch):
"""Persisted threads are loaded by a new adapter instance."""
state_path = tmp_path / "matrix_threads.json"
state_path.write_text(json.dumps(["$t1", "$t2"]))
monkeypatch.setattr(
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
staticmethod(lambda: state_path),
)
adapter = _make_adapter()
assert "$t1" in adapter._bot_participated_threads
assert "$t2" in adapter._bot_participated_threads
def test_cap_max_tracked_threads(self, tmp_path, monkeypatch):
"""Thread set is trimmed to _MAX_TRACKED_THREADS."""
state_path = tmp_path / "matrix_threads.json"
monkeypatch.setattr(
"gateway.platforms.matrix.MatrixAdapter._thread_state_path",
staticmethod(lambda: state_path),
)
adapter = _make_adapter()
adapter._MAX_TRACKED_THREADS = 5
for i in range(10):
adapter._bot_participated_threads.add(f"$t{i}")
adapter._save_participated_threads()
data = json.loads(state_path.read_text())
assert len(data) == 5
# ---------------------------------------------------------------------------
# YAML config bridge
# ---------------------------------------------------------------------------
class TestMatrixConfigBridge:
def test_yaml_bridge_sets_env_vars(self, monkeypatch, tmp_path):
"""Matrix YAML config should bridge to env vars."""
monkeypatch.delenv("MATRIX_REQUIRE_MENTION", raising=False)
monkeypatch.delenv("MATRIX_FREE_RESPONSE_ROOMS", raising=False)
monkeypatch.delenv("MATRIX_AUTO_THREAD", raising=False)
yaml_content = {
"matrix": {
"require_mention": False,
"free_response_rooms": ["!room1:example.org", "!room2:example.org"],
"auto_thread": False,
}
}
import os
import yaml
config_file = tmp_path / "config.yaml"
config_file.write_text(yaml.dump(yaml_content))
# Simulate the bridge logic from gateway/config.py
yaml_cfg = yaml.safe_load(config_file.read_text())
matrix_cfg = yaml_cfg.get("matrix", {})
if isinstance(matrix_cfg, dict):
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
frc = matrix_cfg.get("free_response_rooms")
if frc is not None and not os.getenv("MATRIX_FREE_RESPONSE_ROOMS"):
if isinstance(frc, list):
frc = ",".join(str(v) for v in frc)
monkeypatch.setenv("MATRIX_FREE_RESPONSE_ROOMS", str(frc))
if "auto_thread" in matrix_cfg and not os.getenv("MATRIX_AUTO_THREAD"):
monkeypatch.setenv("MATRIX_AUTO_THREAD", str(matrix_cfg["auto_thread"]).lower())
assert os.getenv("MATRIX_REQUIRE_MENTION") == "false"
assert os.getenv("MATRIX_FREE_RESPONSE_ROOMS") == "!room1:example.org,!room2:example.org"
assert os.getenv("MATRIX_AUTO_THREAD") == "false"
def test_env_vars_take_precedence_over_yaml(self, monkeypatch):
"""Env vars should not be overwritten by YAML values."""
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", "true")
import os
yaml_cfg = {"matrix": {"require_mention": False}}
matrix_cfg = yaml_cfg.get("matrix", {})
if "require_mention" in matrix_cfg and not os.getenv("MATRIX_REQUIRE_MENTION"):
monkeypatch.setenv("MATRIX_REQUIRE_MENTION", str(matrix_cfg["require_mention"]).lower())
assert os.getenv("MATRIX_REQUIRE_MENTION") == "true"
+8 -7
View File
@@ -7,7 +7,7 @@ Verifies that:
"""
import pytest
pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
#pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
@@ -318,12 +318,13 @@ class TestPreflightCompression:
def test_preflight_compresses_oversized_history(self, agent):
"""When loaded history exceeds the model's context threshold, compress before API call."""
agent.compression_enabled = True
# Set a very small context so the history is "oversized"
agent.context_compressor.context_length = 100
agent.context_compressor.threshold_tokens = 85 # 85% of 100
# Set a small context so the history is "oversized", but large enough
# that the compressed result (2 short messages) fits in a single pass.
agent.context_compressor.context_length = 2000
agent.context_compressor.threshold_tokens = 200
# Build a history that will be large enough to trigger preflight
# (each message ~20 chars = ~5 tokens, 20 messages = ~100 tokens > 85 threshold)
# (each message ~50 chars ≈ 13 tokens, 40 messages ≈ 520 tokens > 200 threshold)
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message number {i} with some extra text padding"})
@@ -338,7 +339,7 @@ class TestPreflightCompression:
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
# Simulate compression reducing messages
# Simulate compression reducing messages to a small set that fits
mock_compress.return_value = (
[
{"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"},
@@ -411,7 +412,7 @@ class TestToolResultPreflightCompression:
"""When tool results push estimated tokens past threshold, compress before next call."""
agent.compression_enabled = True
agent.context_compressor.context_length = 200_000
agent.context_compressor.threshold_tokens = 140_000
agent.context_compressor.threshold_tokens = 130_000 # below the 135k reported usage
agent.context_compressor.last_prompt_tokens = 130_000
agent.context_compressor.last_completion_tokens = 5_000
+1 -1
View File
@@ -28,7 +28,7 @@ from unittest.mock import patch
import pytest
pytestmark = pytest.mark.skip(reason="Live API integration test — hangs in batch runs")
# pytestmark removed — tests skip gracefully via OPENROUTER_API_KEY check on line 59
# Ensure repo root is importable
_repo_root = Path(__file__).resolve().parent.parent
+34
View File
@@ -1573,6 +1573,40 @@ class TestRunConversation:
assert "Local/custom backend returned reasoning-only output" in result["error"]
assert "wrong /v1 endpoint" in result["error"]
def test_plugin_context_is_uncached_system_suffix_when_prompt_caching_enabled(self, agent):
self._setup_agent(agent)
agent._use_prompt_caching = True
captured = {}
def _fake_api_call(api_kwargs):
captured["kwargs"] = api_kwargs
return _mock_response(content="ok", finish_reason="stop")
with (
patch(
"hermes_cli.plugins.invoke_hook",
return_value=[{"context": "plugin-turn-context"}],
),
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello")
assert result["completed"] is True
assert result["final_response"] == "ok"
messages = captured["kwargs"]["messages"]
assert messages[0]["role"] == "system"
system_blocks = messages[0]["content"]
assert isinstance(system_blocks, list)
assert system_blocks[0]["text"] == "You are helpful."
assert system_blocks[0]["cache_control"]["type"] == "ephemeral"
assert system_blocks[-1]["text"] == "plugin-turn-context"
assert "cache_control" not in system_blocks[-1]
def test_nous_401_refreshes_after_remint_and_retries(self, agent):
self._setup_agent(agent)
agent.provider = "nous"
+1 -1
View File
@@ -13,7 +13,7 @@ Run with: python -m pytest tests/test_code_execution.py -v
"""
import pytest
pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
# pytestmark removed — tests run fine (61 pass, ~99s)
import json
+1 -1
View File
@@ -9,7 +9,7 @@ asserts zero contamination from shell noise via _assert_clean().
"""
import pytest
pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
+10 -5
View File
@@ -61,7 +61,8 @@ class TestProbeMcpServerTools:
async def fake_connect(name, cfg):
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
@@ -102,7 +103,8 @@ class TestProbeMcpServerTools:
raise ConnectionError("Server not found")
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
@@ -135,7 +137,8 @@ class TestProbeMcpServerTools:
async def fake_connect(name, cfg):
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
@@ -159,7 +162,8 @@ class TestProbeMcpServerTools:
"""_stop_mcp_loop is called even when probe fails."""
config = {"github": {"command": "npx", "connect_timeout": 5}}
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop", side_effect=RuntimeError("boom")), \
patch("tools.mcp_tool._stop_mcp_loop") as mock_stop:
@@ -187,7 +191,8 @@ class TestProbeMcpServerTools:
connect_calls.append(name)
return mock_server
with patch("tools.mcp_tool._load_mcp_config", return_value=config), \
with patch("tools.mcp_tool._MCP_AVAILABLE", True), \
patch("tools.mcp_tool._load_mcp_config", return_value=config), \
patch("tools.mcp_tool._connect_server", side_effect=fake_connect), \
patch("tools.mcp_tool._ensure_mcp_loop"), \
patch("tools.mcp_tool._run_on_mcp_loop") as mock_run, \
+12 -1
View File
@@ -1,11 +1,22 @@
import asyncio
import os
import sys
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from tools.mcp_tool import MCPServerTask, _format_connect_error, _resolve_stdio_command
from tools.mcp_tool import MCPServerTask, _format_connect_error, _resolve_stdio_command, _MCP_AVAILABLE
# Ensure the mcp module symbols exist for patching even when the SDK isn't installed
if not _MCP_AVAILABLE:
import tools.mcp_tool as _mcp_mod
if not hasattr(_mcp_mod, "StdioServerParameters"):
_mcp_mod.StdioServerParameters = MagicMock
if not hasattr(_mcp_mod, "stdio_client"):
_mcp_mod.stdio_client = MagicMock
if not hasattr(_mcp_mod, "ClientSession"):
_mcp_mod.ClientSession = MagicMock
def test_resolve_stdio_command_falls_back_to_hermes_node_bin(tmp_path):
+545 -13
View File
@@ -5,18 +5,30 @@ Code Execution Tool -- Programmatic Tool Calling (PTC)
Lets the LLM write a Python script that calls Hermes tools via RPC,
collapsing multi-step tool chains into a single inference turn.
Architecture:
1. Parent generates a `hermes_tools.py` stub module with RPC functions
Architecture (two transports):
**Local backend (UDS):**
1. Parent generates a `hermes_tools.py` stub module with UDS RPC functions
2. Parent opens a Unix domain socket and starts an RPC listener thread
3. Parent spawns a child process that runs the LLM's script
4. When the script calls a tool function, the call travels over the UDS
back to the parent, which dispatches through handle_function_call
5. Only the script's stdout is returned to the LLM; intermediate tool
results never enter the context window
4. Tool calls travel over the UDS back to the parent for dispatch
Platform: Linux / macOS only (Unix domain sockets). Disabled on Windows.
**Remote backends (file-based RPC):**
1. Parent generates `hermes_tools.py` with file-based RPC stubs
2. Parent ships both files to the remote environment
3. Script runs inside the terminal backend (Docker/SSH/Modal/Daytona/etc.)
4. Tool calls are written as request files; a polling thread on the parent
reads them via execute_oneshot(), dispatches, and writes response files
5. The script polls for response files and continues
In both cases, only the script's stdout is returned to the LLM; intermediate
tool results never enter the context window.
Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Windows.
Remote execution additionally requires Python 3 in the terminal backend.
"""
import base64
import json
import logging
import os
@@ -114,11 +126,17 @@ _TOOL_STUBS = {
}
def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
def generate_hermes_tools_module(enabled_tools: List[str],
transport: str = "uds") -> str:
"""
Build the source code for the hermes_tools.py stub module.
Only tools in both SANDBOX_ALLOWED_TOOLS and enabled_tools get stubs.
Args:
enabled_tools: Tool names enabled in the current session.
transport: ``"uds"`` for Unix domain socket (local backend) or
``"file"`` for file-based RPC (remote backends).
"""
tools_to_generate = sorted(SANDBOX_ALLOWED_TOOLS & set(enabled_tools))
@@ -135,13 +153,18 @@ def generate_hermes_tools_module(enabled_tools: List[str]) -> str:
)
export_names.append(func_name)
header = '''\
"""Auto-generated Hermes tools RPC stubs."""
import json, os, socket, shlex, time
if transport == "file":
header = _FILE_TRANSPORT_HEADER
else:
header = _UDS_TRANSPORT_HEADER
_sock = None
return header + "\n".join(stub_functions)
# ---- Shared helpers section (embedded in both transport headers) ----------
_COMMON_HELPERS = '''\
# ---------------------------------------------------------------------------
# Convenience helpers (avoid common scripting pitfalls)
# ---------------------------------------------------------------------------
@@ -176,6 +199,17 @@ def retry(fn, max_attempts=3, delay=2):
time.sleep(delay * (2 ** attempt))
raise last_err
'''
# ---- UDS transport (local backend) ---------------------------------------
_UDS_TRANSPORT_HEADER = '''\
"""Auto-generated Hermes tools RPC stubs."""
import json, os, socket, shlex, time
_sock = None
''' + _COMMON_HELPERS + '''\
def _connect():
global _sock
if _sock is None:
@@ -208,7 +242,57 @@ def _call(tool_name, args):
'''
return header + "\n".join(stub_functions)
# ---- File-based transport (remote backends) -------------------------------
_FILE_TRANSPORT_HEADER = '''\
"""Auto-generated Hermes tools RPC stubs (file-based transport)."""
import json, os, shlex, time
_RPC_DIR = os.environ.get("HERMES_RPC_DIR", "/tmp/hermes_rpc")
_seq = 0
''' + _COMMON_HELPERS + '''\
def _call(tool_name, args):
"""Send a tool call request via file-based RPC and wait for response."""
global _seq
_seq += 1
seq_str = f"{_seq:06d}"
req_file = os.path.join(_RPC_DIR, f"req_{seq_str}")
res_file = os.path.join(_RPC_DIR, f"res_{seq_str}")
# Write request atomically (write to .tmp, then rename)
tmp = req_file + ".tmp"
with open(tmp, "w") as f:
json.dump({"tool": tool_name, "args": args, "seq": _seq}, f)
os.rename(tmp, req_file)
# Wait for response with adaptive polling
deadline = time.monotonic() + 300 # 5-minute timeout per tool call
poll_interval = 0.05 # Start at 50ms
while not os.path.exists(res_file):
if time.monotonic() > deadline:
raise RuntimeError(f"RPC timeout: no response for {tool_name} after 300s")
time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.2, 0.25) # Back off to 250ms
with open(res_file) as f:
raw = f.read()
# Clean up response file
try:
os.unlink(res_file)
except OSError:
pass
result = json.loads(raw)
if isinstance(result, str):
try:
return json.loads(result)
except (json.JSONDecodeError, TypeError):
return result
return result
'''
# ---------------------------------------------------------------------------
@@ -339,6 +423,443 @@ def _rpc_server_loop(
logger.debug("RPC conn close error: %s", e)
# ---------------------------------------------------------------------------
# Remote execution support (file-based RPC via terminal backend)
# ---------------------------------------------------------------------------
def _get_or_create_env(task_id: str):
"""Get or create the terminal environment for *task_id*.
Reuses the same environment (container/sandbox/SSH session) that the
terminal and file tools use, creating one if it doesn't exist yet.
Returns ``(env, env_type)`` tuple.
"""
from tools.terminal_tool import (
_active_environments, _env_lock, _create_environment,
_get_env_config, _last_activity, _start_cleanup_thread,
_creation_locks, _creation_locks_lock, _task_env_overrides,
)
effective_task_id = task_id or "default"
# Fast path: environment already exists
with _env_lock:
if effective_task_id in _active_environments:
_last_activity[effective_task_id] = time.time()
return _active_environments[effective_task_id], _get_env_config()["env_type"]
# Slow path: create environment (same pattern as file_tools._get_file_ops)
with _creation_locks_lock:
if effective_task_id not in _creation_locks:
_creation_locks[effective_task_id] = threading.Lock()
task_lock = _creation_locks[effective_task_id]
with task_lock:
with _env_lock:
if effective_task_id in _active_environments:
_last_activity[effective_task_id] = time.time()
return _active_environments[effective_task_id], _get_env_config()["env_type"]
config = _get_env_config()
env_type = config["env_type"]
overrides = _task_env_overrides.get(effective_task_id, {})
if env_type == "docker":
image = overrides.get("docker_image") or config["docker_image"]
elif env_type == "singularity":
image = overrides.get("singularity_image") or config["singularity_image"]
elif env_type == "modal":
image = overrides.get("modal_image") or config["modal_image"]
elif env_type == "daytona":
image = overrides.get("daytona_image") or config["daytona_image"]
else:
image = ""
cwd = overrides.get("cwd") or config["cwd"]
container_config = None
if env_type in ("docker", "singularity", "modal", "daytona"):
container_config = {
"container_cpu": config.get("container_cpu", 1),
"container_memory": config.get("container_memory", 5120),
"container_disk": config.get("container_disk", 51200),
"container_persistent": config.get("container_persistent", True),
"docker_volumes": config.get("docker_volumes", []),
}
ssh_config = None
if env_type == "ssh":
ssh_config = {
"host": config.get("ssh_host", ""),
"user": config.get("ssh_user", ""),
"port": config.get("ssh_port", 22),
"key": config.get("ssh_key", ""),
"persistent": config.get("ssh_persistent", False),
}
local_config = None
if env_type == "local":
local_config = {
"persistent": config.get("local_persistent", False),
}
logger.info("Creating new %s environment for execute_code task %s...",
env_type, effective_task_id[:8])
env = _create_environment(
env_type=env_type,
image=image,
cwd=cwd,
timeout=config["timeout"],
ssh_config=ssh_config,
container_config=container_config,
local_config=local_config,
task_id=effective_task_id,
host_cwd=config.get("host_cwd"),
)
with _env_lock:
_active_environments[effective_task_id] = env
_last_activity[effective_task_id] = time.time()
_start_cleanup_thread()
logger.info("%s environment ready for execute_code task %s",
env_type, effective_task_id[:8])
return env, env_type
def _ship_file_to_remote(env, remote_path: str, content: str) -> None:
"""Write *content* to *remote_path* on the remote environment.
Uses ``echo | base64 -d`` rather than stdin piping because some
backends (Modal) don't reliably deliver stdin_data to chained
commands. Base64 output is shell-safe ([A-Za-z0-9+/=]) so single
quotes are fine.
"""
encoded = base64.b64encode(content.encode("utf-8")).decode("ascii")
env.execute_oneshot(
f"echo '{encoded}' | base64 -d > {remote_path}",
cwd="/",
timeout=30,
)
def _rpc_poll_loop(
env,
rpc_dir: str,
task_id: str,
tool_call_log: list,
tool_call_counter: list,
max_tool_calls: int,
allowed_tools: frozenset,
stop_event: threading.Event,
):
"""Poll the remote filesystem for tool call requests and dispatch them.
Runs in a background thread. Uses ``env.execute_oneshot()`` so it can
operate concurrently with the script-execution thread that holds
``env.execute()`` (important for persistent-shell backends like SSH).
"""
from model_tools import handle_function_call
poll_interval = 0.1 # 100 ms
while not stop_event.is_set():
try:
# List pending request files (skip .tmp partials)
ls_result = env.execute_oneshot(
f"ls -1 {rpc_dir}/req_* 2>/dev/null || true",
cwd="/",
timeout=10,
)
output = ls_result.get("output", "").strip()
if not output:
stop_event.wait(poll_interval)
continue
req_files = sorted([
f.strip() for f in output.split("\n")
if f.strip()
and not f.strip().endswith(".tmp")
and "/req_" in f.strip()
])
for req_file in req_files:
if stop_event.is_set():
break
call_start = time.monotonic()
# Read request
read_result = env.execute_oneshot(
f"cat {req_file}",
cwd="/",
timeout=10,
)
try:
request = json.loads(read_result.get("output", ""))
except (json.JSONDecodeError, ValueError):
logger.debug("Malformed RPC request in %s", req_file)
# Remove bad request to avoid infinite retry
env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5)
continue
tool_name = request.get("tool", "")
tool_args = request.get("args", {})
seq = request.get("seq", 0)
seq_str = f"{seq:06d}"
res_file = f"{rpc_dir}/res_{seq_str}"
# Enforce allow-list
if tool_name not in allowed_tools:
available = ", ".join(sorted(allowed_tools))
tool_result = json.dumps({
"error": (
f"Tool '{tool_name}' is not available in execute_code. "
f"Available: {available}"
)
})
# Enforce tool call limit
elif tool_call_counter[0] >= max_tool_calls:
tool_result = json.dumps({
"error": (
f"Tool call limit reached ({max_tool_calls}). "
"No more tool calls allowed in this execution."
)
})
else:
# Strip forbidden terminal parameters
if tool_name == "terminal" and isinstance(tool_args, dict):
for param in _TERMINAL_BLOCKED_PARAMS:
tool_args.pop(param, None)
# Dispatch through the standard tool handler
try:
_real_stdout, _real_stderr = sys.stdout, sys.stderr
devnull = open(os.devnull, "w")
try:
sys.stdout = devnull
sys.stderr = devnull
tool_result = handle_function_call(
tool_name, tool_args, task_id=task_id
)
finally:
sys.stdout, sys.stderr = _real_stdout, _real_stderr
devnull.close()
except Exception as exc:
logger.error("Tool call failed in remote sandbox: %s",
exc, exc_info=True)
tool_result = json.dumps({"error": str(exc)})
tool_call_counter[0] += 1
call_duration = time.monotonic() - call_start
tool_call_log.append({
"tool": tool_name,
"args_preview": str(tool_args)[:80],
"duration": round(call_duration, 2),
})
# Write response atomically (tmp + rename).
# Use echo piping (not stdin_data) because Modal doesn't
# reliably deliver stdin to chained commands.
encoded_result = base64.b64encode(
tool_result.encode("utf-8")
).decode("ascii")
env.execute_oneshot(
f"echo '{encoded_result}' | base64 -d > {res_file}.tmp"
f" && mv {res_file}.tmp {res_file}",
cwd="/",
timeout=60,
)
# Remove the request file
env.execute_oneshot(f"rm -f {req_file}", cwd="/", timeout=5)
except Exception as e:
if not stop_event.is_set():
logger.debug("RPC poll error: %s", e, exc_info=True)
if not stop_event.is_set():
stop_event.wait(poll_interval)
def _execute_remote(
code: str,
task_id: Optional[str],
enabled_tools: Optional[List[str]],
) -> str:
"""Run a script on the remote terminal backend via file-based RPC.
The script and the generated hermes_tools.py module are shipped to
the remote environment, and tool calls are proxied through a polling
thread that communicates via request/response files.
"""
from tools.terminal_tool import _interrupt_event
_cfg = _load_config()
timeout = _cfg.get("timeout", DEFAULT_TIMEOUT)
max_tool_calls = _cfg.get("max_tool_calls", DEFAULT_MAX_TOOL_CALLS)
session_tools = set(enabled_tools) if enabled_tools else set()
sandbox_tools = frozenset(SANDBOX_ALLOWED_TOOLS & session_tools)
if not sandbox_tools:
sandbox_tools = SANDBOX_ALLOWED_TOOLS
effective_task_id = task_id or "default"
env, env_type = _get_or_create_env(effective_task_id)
sandbox_id = uuid.uuid4().hex[:12]
sandbox_dir = f"/tmp/hermes_exec_{sandbox_id}"
tool_call_log: list = []
tool_call_counter = [0]
exec_start = time.monotonic()
stop_event = threading.Event()
rpc_thread = None
try:
# Verify Python is available on the remote
py_check = env.execute_oneshot(
"command -v python3 >/dev/null 2>&1 && echo OK",
cwd="/", timeout=15,
)
if "OK" not in py_check.get("output", ""):
return json.dumps({
"status": "error",
"error": (
f"Python 3 is not available in the {env_type} terminal "
"environment. Install Python to use execute_code with "
"remote backends."
),
"tool_calls_made": 0,
"duration_seconds": 0,
})
# Create sandbox directory on remote
env.execute_oneshot(
f"mkdir -p {sandbox_dir}/rpc", cwd="/", timeout=10,
)
# Generate and ship files
tools_src = generate_hermes_tools_module(
list(sandbox_tools), transport="file",
)
_ship_file_to_remote(env, f"{sandbox_dir}/hermes_tools.py", tools_src)
_ship_file_to_remote(env, f"{sandbox_dir}/script.py", code)
# Start RPC polling thread
rpc_thread = threading.Thread(
target=_rpc_poll_loop,
args=(
env, f"{sandbox_dir}/rpc", effective_task_id,
tool_call_log, tool_call_counter, max_tool_calls,
sandbox_tools, stop_event,
),
daemon=True,
)
rpc_thread.start()
# Build environment variable prefix for the script
env_prefix = (
f"HERMES_RPC_DIR={sandbox_dir}/rpc "
f"PYTHONDONTWRITEBYTECODE=1"
)
tz = os.getenv("HERMES_TIMEZONE", "").strip()
if tz:
env_prefix += f" TZ={tz}"
# Execute the script on the remote backend
logger.info("Executing code on %s backend (task %s)...",
env_type, effective_task_id[:8])
script_result = env.execute(
f"cd {sandbox_dir} && {env_prefix} python3 script.py",
timeout=timeout,
)
stdout_text = script_result.get("output", "")
exit_code = script_result.get("returncode", -1)
status = "success"
# Check for timeout/interrupt from the backend
if exit_code == 124:
status = "timeout"
elif exit_code == 130:
status = "interrupted"
except Exception as exc:
duration = round(time.monotonic() - exec_start, 2)
logger.error(
"execute_code remote failed after %ss with %d tool calls: %s: %s",
duration, tool_call_counter[0], type(exc).__name__, exc,
exc_info=True,
)
return json.dumps({
"status": "error",
"error": str(exc),
"tool_calls_made": tool_call_counter[0],
"duration_seconds": duration,
}, ensure_ascii=False)
finally:
# Stop the polling thread
stop_event.set()
if rpc_thread is not None:
rpc_thread.join(timeout=5)
# Clean up remote sandbox dir
try:
env.execute_oneshot(
f"rm -rf {sandbox_dir}", cwd="/", timeout=15,
)
except Exception:
logger.debug("Failed to clean up remote sandbox %s", sandbox_dir)
duration = round(time.monotonic() - exec_start, 2)
# --- Post-process output (same as local path) ---
# Truncate stdout to cap
if len(stdout_text) > MAX_STDOUT_BYTES:
head_bytes = int(MAX_STDOUT_BYTES * 0.4)
tail_bytes = MAX_STDOUT_BYTES - head_bytes
head = stdout_text[:head_bytes]
tail = stdout_text[-tail_bytes:]
omitted = len(stdout_text) - len(head) - len(tail)
stdout_text = (
head
+ f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted "
f"out of {len(stdout_text):,} total] ...\n\n"
+ tail
)
# Strip ANSI escape sequences
from tools.ansi_strip import strip_ansi
stdout_text = strip_ansi(stdout_text)
# Redact secrets
from agent.redact import redact_sensitive_text
stdout_text = redact_sensitive_text(stdout_text)
# Build response
result: Dict[str, Any] = {
"status": status,
"output": stdout_text,
"tool_calls_made": tool_call_counter[0],
"duration_seconds": duration,
}
if status == "timeout":
result["error"] = f"Script timed out after {timeout}s and was killed."
elif status == "interrupted":
result["output"] = (
stdout_text + "\n[execution interrupted — user sent a new message]"
)
elif exit_code != 0:
result["status"] = "error"
result["error"] = f"Script exited with code {exit_code}"
return json.dumps(result, ensure_ascii=False)
# ---------------------------------------------------------------------------
# Main entry point
# ---------------------------------------------------------------------------
@@ -352,6 +873,9 @@ def execute_code(
Run a Python script in a sandboxed child process with RPC access
to a subset of Hermes tools.
Dispatches to the local (UDS) or remote (file-based RPC) path
depending on the configured terminal backend.
Args:
code: Python source code to execute.
task_id: Session task ID for tool isolation (terminal env, etc.).
@@ -369,6 +893,14 @@ def execute_code(
if not code or not code.strip():
return json.dumps({"error": "No code provided."})
# Dispatch: remote backends use file-based RPC, local uses UDS
from tools.terminal_tool import _get_env_config
env_type = _get_env_config()["env_type"]
if env_type != "local":
return _execute_remote(code, task_id, enabled_tools)
# --- Local execution path (UDS) --- below this line is unchanged ---
# Import interrupt event from terminal_tool (cooperative cancellation)
from tools.terminal_tool import _interrupt_event
+19 -1
View File
@@ -116,7 +116,7 @@ def _normalize_optional_job_value(value: Optional[Any], *, strip_trailing_slash:
def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
prompt = job.get("prompt", "")
skills = _canonical_skills(job.get("skill"), job.get("skills"))
return {
result = {
"job_id": job["id"],
"name": job["name"],
"skill": skills[0] if skills else None,
@@ -136,6 +136,9 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
}
if job.get("script"):
result["script"] = job["script"]
return result
def cronjob(
@@ -153,6 +156,7 @@ def cronjob(
provider: Optional[str] = None,
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@@ -183,6 +187,7 @@ def cronjob(
model=_normalize_optional_job_value(model),
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=_normalize_optional_job_value(script),
)
return json.dumps(
{
@@ -265,6 +270,9 @@ def cronjob(
updates["provider"] = _normalize_optional_job_value(provider)
if base_url is not None:
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
if script is not None:
# Pass empty string to clear an existing script
updates["script"] = _normalize_optional_job_value(script) if script else None
if repeat is not None:
# Normalize: treat 0 or negative as None (infinite)
normalized_repeat = None if repeat <= 0 else repeat
@@ -338,6 +346,11 @@ Jobs run in a fresh session with no current-chat context, so prompts must be sel
If skill or skills are provided on create, the future cron run loads those skills in order, then follows the prompt as the task instruction.
On update, passing skills=[] clears attached skills.
If script is provided on create, the referenced Python script runs before each agent turn.
Its stdout is injected into the prompt as context. Use this for data collection and change
detection the script handles gathering data, the agent analyzes and reports.
On update, pass script="" to clear an attached script.
NOTE: The agent's final response is auto-delivered to the target. Put the primary
user-facing content in the final response. Cron jobs run autonomously with no user
present they cannot ask questions or request clarification.
@@ -402,6 +415,10 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"reason": {
"type": "string",
"description": "Optional pause reason"
},
"script": {
"type": "string",
"description": "Optional path to a Python script that runs before each cron job execution. Its stdout is injected into the prompt as context. Use for data collection and change detection. Relative paths resolve under ~/.hermes/scripts/. On update, pass empty string to clear."
}
},
"required": ["action"]
@@ -451,6 +468,7 @@ registry.register(
provider=args.get("provider"),
base_url=args.get("base_url"),
reason=args.get("reason"),
script=args.get("script"),
task_id=kw.get("task_id"),
),
check_fn=check_cronjob_requirements,
+13
View File
@@ -91,6 +91,19 @@ class BaseEnvironment(ABC):
kw["stdin"] = subprocess.DEVNULL
return kw
def execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
"""Execute a command bypassing any persistent shell.
Safe for concurrent use alongside a long-running execute() call.
Backends that maintain a persistent shell (SSH, Local) override this
to route through their oneshot path, avoiding the shell lock.
Non-persistent backends delegate to execute().
"""
return self.execute(command, cwd=cwd, timeout=timeout,
stdin_data=stdin_data)
def _timeout_result(self, timeout: int | None) -> dict:
"""Standard return dict when a command times out."""
return {
+13
View File
@@ -141,6 +141,19 @@ class PersistentShellMixin:
command, cwd, timeout=timeout, stdin_data=stdin_data,
)
def execute_oneshot(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
"""Always use the oneshot (non-persistent) execution path.
This bypasses _shell_lock so it can run concurrently with a
long-running command in the persistent shell used by
execute_code's file-based RPC polling thread.
"""
return self._execute_oneshot(
command, cwd, timeout=timeout, stdin_data=stdin_data,
)
def cleanup(self):
if self.persistent:
self._cleanup_persistent_shell()
+1 -1
View File
@@ -898,7 +898,7 @@ class ShellFileOperations(FileOperations):
hidden_exclude = "-not -path '*/.*'"
cmd = f"find {self._escape_shell_arg(path)} {hidden_exclude} -type f -name {self._escape_shell_arg(search_pattern)} " \
f"-printf '%T@ %p\\\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
f"-printf '%T@ %p\\n' 2>/dev/null | sort -rn | tail -n +{offset + 1} | head -n {limit}"
result = self._exec(cmd, timeout=60)
@@ -232,6 +232,9 @@ For cloud sandbox backends, persistence is filesystem-oriented. `TERMINAL_LIFETI
| `MATRIX_ALLOWED_USERS` | Comma-separated Matrix user IDs allowed to message the bot (e.g. `@alice:matrix.org`) |
| `MATRIX_HOME_ROOM` | Room ID for proactive message delivery (e.g. `!abc123:matrix.org`) |
| `MATRIX_ENCRYPTION` | Enable end-to-end encryption (`true`/`false`, default: `false`) |
| `MATRIX_REQUIRE_MENTION` | Require `@mention` in rooms (default: `true`). Set to `false` to respond to all messages. |
| `MATRIX_FREE_RESPONSE_ROOMS` | Comma-separated room IDs where bot responds without `@mention` |
| `MATRIX_AUTO_THREAD` | Auto-create threads for room messages (default: `true`) |
| `HASS_TOKEN` | Home Assistant Long-Lived Access Token (enables HA platform + tools) |
| `HASS_URL` | Home Assistant URL (default: `http://homeassistant.local:8123`) |
| `WEBHOOK_ENABLED` | Enable the webhook platform adapter (`true`/`false`) |
+27 -2
View File
@@ -17,8 +17,9 @@ Before setup, here's the part most people want to know: how Hermes behaves once
| Context | Behavior |
|---------|----------|
| **DMs** | Hermes responds to every message. No `@mention` needed. Each DM has its own session. |
| **Rooms** | Hermes responds to all messages in rooms it has joined. Room invites are auto-accepted. |
| **Threads** | Hermes supports Matrix threads (MSC3440). If you reply in a thread, Hermes keeps the thread context isolated from the main room timeline. |
| **Rooms** | By default, Hermes requires an `@mention` to respond. Set `MATRIX_REQUIRE_MENTION=false` or add room IDs to `MATRIX_FREE_RESPONSE_ROOMS` for free-response rooms. Room invites are auto-accepted. |
| **Threads** | Hermes supports Matrix threads (MSC3440). If you reply in a thread, Hermes keeps the thread context isolated from the main room timeline. Threads where the bot has already participated do not require a mention. |
| **Auto-threading** | By default, Hermes auto-creates a thread for each message it responds to in a room. This keeps conversations isolated. Set `MATRIX_AUTO_THREAD=false` to disable. |
| **Shared rooms with multiple users** | By default, Hermes isolates session history per user inside the room. Two people talking in the same room do not share one transcript unless you explicitly disable that. |
:::tip
@@ -51,6 +52,30 @@ Shared sessions can be useful for a collaborative room, but they also mean:
- one person's long tool-heavy task can bloat everyone else's context
- one person's in-flight run can interrupt another person's follow-up in the same room
### Mention and Threading Configuration
You can configure mention and auto-threading behavior via environment variables or `config.yaml`:
```yaml
matrix:
require_mention: true # Require @mention in rooms (default: true)
free_response_rooms: # Rooms exempt from mention requirement
- "!abc123:matrix.org"
auto_thread: true # Auto-create threads for responses (default: true)
```
Or via environment variables:
```bash
MATRIX_REQUIRE_MENTION=true
MATRIX_FREE_RESPONSE_ROOMS=!abc123:matrix.org,!def456:matrix.org
MATRIX_AUTO_THREAD=true
```
:::note
If you are upgrading from a version that did not have `MATRIX_REQUIRE_MENTION`, the bot previously responded to all messages in rooms. To preserve that behavior, set `MATRIX_REQUIRE_MENTION=false`.
:::
This guide walks you through the full setup process — from creating your bot account to sending your first message.
## Step 1: Create a Bot Account