Compare commits

..

7 Commits

Author SHA1 Message Date
Ben Barclay 2a7a7c509d Install whatsapp bridge deps in container 2026-03-29 14:30:09 +11:00
Ben Barclay 034edf4ffa Remove git submodules from container 2026-03-29 14:29:29 +11:00
Ben Barclay d9e8d857e8 apt -> apt-get 2026-03-29 13:55:18 +11:00
Ben Barclay c09f81bd33 Add .dockerignore file 2026-03-29 13:55:18 +11:00
Ben Barclay a6debb0c53 Fix incorrect Dockerfile reference in GitHub action 2026-03-29 13:55:18 +11:00
Ben Barclay ec1e66b6f2 Pin Docker version 2026-03-29 13:55:18 +11:00
Ben Barclay bc78b2ef29 feat(docker): Add a docker container for the agent 2026-03-29 13:55:18 +11:00
64 changed files with 306 additions and 4454 deletions
-78
View File
@@ -210,10 +210,6 @@ registry.register(
The registry handles schema collection, dispatch, availability checking, and error wrapping. All handlers MUST return a JSON string.
**Path references in tool schemas**: If the schema description mentions file paths (e.g. default output directories), use `display_hermes_home()` to make them profile-aware. The schema is generated at import time, which is after `_apply_profile_override()` sets `HERMES_HOME`.
**State files**: If a tool stores persistent state (caches, logs, checkpoints), use `get_hermes_home()` for the base directory — never `Path.home() / ".hermes"`. This ensures each profile gets its own state.
**Agent-level tools** (todo, memory): intercepted by `run_agent.py` before `handle_function_call()`. See `todo_tool.py` for the pattern.
---
@@ -362,69 +358,8 @@ in config.yaml (or `HERMES_BACKGROUND_NOTIFICATIONS` env var):
---
## Profiles: Multi-Instance Support
Hermes supports **profiles** — multiple fully isolated instances, each with its own
`HERMES_HOME` directory (config, API keys, memory, sessions, skills, gateway, etc.).
The core mechanism: `_apply_profile_override()` in `hermes_cli/main.py` sets
`HERMES_HOME` before any module imports. All 119+ references to `get_hermes_home()`
automatically scope to the active profile.
### Rules for profile-safe code
1. **Use `get_hermes_home()` for all HERMES_HOME paths.** Import from `hermes_constants`.
NEVER hardcode `~/.hermes` or `Path.home() / ".hermes"` in code that reads/writes state.
```python
# GOOD
from hermes_constants import get_hermes_home
config_path = get_hermes_home() / "config.yaml"
# BAD — breaks profiles
config_path = Path.home() / ".hermes" / "config.yaml"
```
2. **Use `display_hermes_home()` for user-facing messages.** Import from `hermes_constants`.
This returns `~/.hermes` for default or `~/.hermes/profiles/<name>` for profiles.
```python
# GOOD
from hermes_constants import display_hermes_home
print(f"Config saved to {display_hermes_home()}/config.yaml")
# BAD — shows wrong path for profiles
print("Config saved to ~/.hermes/config.yaml")
```
3. **Module-level constants are fine** — they cache `get_hermes_home()` at import time,
which is AFTER `_apply_profile_override()` sets the env var. Just use `get_hermes_home()`,
not `Path.home() / ".hermes"`.
4. **Tests that mock `Path.home()` must also set `HERMES_HOME`** — since code now uses
`get_hermes_home()` (reads env var), not `Path.home() / ".hermes"`:
```python
with patch.object(Path, "home", return_value=tmp_path), \
patch.dict(os.environ, {"HERMES_HOME": str(tmp_path / ".hermes")}):
...
```
5. **Gateway platform adapters should use token locks** — if the adapter connects with
a unique credential (bot token, API key), call `acquire_scoped_lock()` from
`gateway.status` in the `connect()`/`start()` method and `release_scoped_lock()` in
`disconnect()`/`stop()`. This prevents two profiles from using the same credential.
See `gateway/platforms/telegram.py` for the canonical pattern.
6. **Profile operations are HOME-anchored, not HERMES_HOME-anchored** — `_get_profiles_root()`
returns `Path.home() / ".hermes" / "profiles"`, NOT `get_hermes_home() / "profiles"`.
This is intentional — it lets `hermes -p coder profile list` see all profiles regardless
of which one is active.
## Known Pitfalls
### DO NOT hardcode `~/.hermes` paths
Use `get_hermes_home()` from `hermes_constants` for code paths. Use `display_hermes_home()`
for user-facing print/log messages. Hardcoding `~/.hermes` breaks profiles — each profile
has its own `HERMES_HOME` directory. This was the source of 5 bugs fixed in PR #3575.
### DO NOT use `simple_term_menu` for interactive menus
Rendering bugs in tmux/iTerm2 — ghosting on scroll. Use `curses` (stdlib) instead. See `hermes_cli/tools_config.py` for the pattern.
@@ -440,19 +375,6 @@ Tool schema descriptions must not mention tools from other toolsets by name (e.g
### Tests must not write to `~/.hermes/`
The `_isolate_hermes_home` autouse fixture in `tests/conftest.py` redirects `HERMES_HOME` to a temp dir. Never hardcode `~/.hermes/` paths in tests.
**Profile tests**: When testing profile features, also mock `Path.home()` so that
`_get_profiles_root()` and `_get_default_hermes_home()` resolve within the temp dir.
Use the pattern from `tests/hermes_cli/test_profiles.py`:
```python
@pytest.fixture
def profile_env(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setattr(Path, "home", lambda: tmp_path)
monkeypatch.setenv("HERMES_HOME", str(home))
return home
```
---
## Testing
+1 -1
View File
@@ -74,7 +74,7 @@ def main() -> None:
agent = HermesACPAgent()
try:
asyncio.run(acp.run_agent(agent, use_unstable_protocol=True))
asyncio.run(acp.run_agent(agent))
except KeyboardInterrupt:
logger.info("Shutting down (KeyboardInterrupt)")
except Exception:
+3 -46
View File
@@ -25,9 +25,6 @@ from acp.schema import (
NewSessionResponse,
PromptResponse,
ResumeSessionResponse,
SetSessionConfigOptionResponse,
SetSessionModelResponse,
SetSessionModeResponse,
ResourceContentBlock,
SessionCapabilities,
SessionForkCapabilities,
@@ -97,14 +94,11 @@ class HermesACPAgent(acp.Agent):
async def initialize(
self,
protocol_version: int | None = None,
protocol_version: int,
client_capabilities: ClientCapabilities | None = None,
client_info: Implementation | None = None,
**kwargs: Any,
) -> InitializeResponse:
resolved_protocol_version = (
protocol_version if isinstance(protocol_version, int) else acp.PROTOCOL_VERSION
)
provider = detect_provider()
auth_methods = None
if provider:
@@ -117,11 +111,7 @@ class HermesACPAgent(acp.Agent):
]
client_name = client_info.name if client_info else "unknown"
logger.info(
"Initialize from %s (protocol v%s)",
client_name,
resolved_protocol_version,
)
logger.info("Initialize from %s (protocol v%s)", client_name, protocol_version)
return InitializeResponse(
protocol_version=acp.PROTOCOL_VERSION,
@@ -481,7 +471,7 @@ class HermesACPAgent(acp.Agent):
async def set_session_model(
self, model_id: str, session_id: str, **kwargs: Any
) -> SetSessionModelResponse | None:
):
"""Switch the model for a session (called by ACP protocol)."""
state = self.session_manager.get_session(session_id)
if state:
@@ -499,37 +489,4 @@ class HermesACPAgent(acp.Agent):
)
self.session_manager.save_session(session_id)
logger.info("Session %s: model switched to %s", session_id, model_id)
return SetSessionModelResponse()
logger.warning("Session %s: model switch requested for missing session", session_id)
return None
async def set_session_mode(
self, mode_id: str, session_id: str, **kwargs: Any
) -> SetSessionModeResponse | None:
"""Persist the editor-requested mode so ACP clients do not fail on mode switches."""
state = self.session_manager.get_session(session_id)
if state is None:
logger.warning("Session %s: mode switch requested for missing session", session_id)
return None
setattr(state, "mode", mode_id)
self.session_manager.save_session(session_id)
logger.info("Session %s: mode switched to %s", session_id, mode_id)
return SetSessionModeResponse()
async def set_config_option(
self, config_id: str, session_id: str, value: str, **kwargs: Any
) -> SetSessionConfigOptionResponse | None:
"""Accept ACP config option updates even when Hermes has no typed ACP config surface yet."""
state = self.session_manager.get_session(session_id)
if state is None:
logger.warning("Session %s: config update requested for missing session", session_id)
return None
options = getattr(state, "config_options", None)
if not isinstance(options, dict):
options = {}
options[str(config_id)] = value
setattr(state, "config_options", options)
self.session_manager.save_session(session_id)
logger.info("Session %s: config option %s updated", session_id, config_id)
return SetSessionConfigOptionResponse(config_options=[])
+1 -59
View File
@@ -18,7 +18,6 @@ from typing import Optional
from agent.skill_utils import (
extract_skill_conditions,
extract_skill_description,
get_all_skills_dirs,
get_disabled_skill_names,
iter_skill_index_files,
parse_frontmatter,
@@ -445,23 +444,16 @@ def build_skills_system_prompt(
mtime/size manifest — survives process restarts
Falls back to a full filesystem scan when both layers miss.
External skill directories (``skills.external_dirs`` in config.yaml) are
scanned alongside the local ``~/.hermes/skills/`` directory. External dirs
are read-only — they appear in the index but new skills are always created
in the local dir. Local skills take precedence when names collide.
"""
hermes_home = get_hermes_home()
skills_dir = hermes_home / "skills"
external_dirs = get_all_skills_dirs()[1:] # skip local (index 0)
if not skills_dir.exists() and not external_dirs:
if not skills_dir.exists():
return ""
# ── Layer 1: in-process LRU cache ─────────────────────────────────
cache_key = (
str(skills_dir.resolve()),
tuple(str(d) for d in external_dirs),
tuple(sorted(str(t) for t in (available_tools or set()))),
tuple(sorted(str(ts) for ts in (available_toolsets or set()))),
)
@@ -548,56 +540,6 @@ def build_skills_system_prompt(
category_descriptions,
)
# ── External skill directories ─────────────────────────────────────
# Scan external dirs directly (no snapshot caching — they're read-only
# and typically small). Local skills already in skills_by_category take
# precedence: we track seen names and skip duplicates from external dirs.
seen_skill_names: set[str] = set()
for cat_skills in skills_by_category.values():
for name, _desc in cat_skills:
seen_skill_names.add(name)
for ext_dir in external_dirs:
if not ext_dir.exists():
continue
for skill_file in iter_skill_index_files(ext_dir, "SKILL.md"):
try:
is_compatible, frontmatter, desc = _parse_skill_file(skill_file)
if not is_compatible:
continue
entry = _build_snapshot_entry(skill_file, ext_dir, frontmatter, desc)
skill_name = entry["skill_name"]
if skill_name in seen_skill_names:
continue
if entry["frontmatter_name"] in disabled or skill_name in disabled:
continue
if not _skill_should_show(
extract_skill_conditions(frontmatter),
available_tools,
available_toolsets,
):
continue
seen_skill_names.add(skill_name)
skills_by_category.setdefault(entry["category"], []).append(
(skill_name, entry["description"])
)
except Exception as e:
logger.debug("Error reading external skill %s: %s", skill_file, e)
# External category descriptions
for desc_file in iter_skill_index_files(ext_dir, "DESCRIPTION.md"):
try:
content = desc_file.read_text(encoding="utf-8")
fm, _ = parse_frontmatter(content)
cat_desc = fm.get("description")
if not cat_desc:
continue
rel = desc_file.relative_to(ext_dir)
cat = "/".join(rel.parts[:-1]) if len(rel.parts) > 1 else "general"
category_descriptions.setdefault(cat, str(cat_desc).strip().strip("'\""))
except Exception as e:
logger.debug("Could not read external skill description %s: %s", desc_file, e)
if not skills_by_category:
result = ""
else:
+30 -45
View File
@@ -128,11 +128,7 @@ def _build_skill_message(
supporting.append(rel)
if supporting and skill_dir:
try:
skill_view_target = str(skill_dir.relative_to(SKILLS_DIR))
except ValueError:
# Skill is from an external dir — use the skill name instead
skill_view_target = skill_dir.name
skill_view_target = str(skill_dir.relative_to(SKILLS_DIR))
parts.append("")
parts.append("[This skill has supporting files you can load with the skill_view tool:]")
for sf in supporting:
@@ -162,49 +158,38 @@ def scan_skill_commands() -> Dict[str, Dict[str, Any]]:
_skill_commands = {}
try:
from tools.skills_tool import SKILLS_DIR, _parse_frontmatter, skill_matches_platform, _get_disabled_skill_names
from agent.skill_utils import get_external_skills_dirs
if not SKILLS_DIR.exists():
return _skill_commands
disabled = _get_disabled_skill_names()
seen_names: set = set()
# Scan local dir first, then external dirs
dirs_to_scan = []
if SKILLS_DIR.exists():
dirs_to_scan.append(SKILLS_DIR)
dirs_to_scan.extend(get_external_skills_dirs())
for scan_dir in dirs_to_scan:
for skill_md in scan_dir.rglob("SKILL.md"):
if any(part in ('.git', '.github', '.hub') for part in skill_md.parts):
for skill_md in SKILLS_DIR.rglob("SKILL.md"):
if any(part in ('.git', '.github', '.hub') for part in skill_md.parts):
continue
try:
content = skill_md.read_text(encoding='utf-8')
frontmatter, body = _parse_frontmatter(content)
# Skip skills incompatible with the current OS platform
if not skill_matches_platform(frontmatter):
continue
try:
content = skill_md.read_text(encoding='utf-8')
frontmatter, body = _parse_frontmatter(content)
# Skip skills incompatible with the current OS platform
if not skill_matches_platform(frontmatter):
continue
name = frontmatter.get('name', skill_md.parent.name)
if name in seen_names:
continue
# Respect user's disabled skills config
if name in disabled:
continue
description = frontmatter.get('description', '')
if not description:
for line in body.strip().split('\n'):
line = line.strip()
if line and not line.startswith('#'):
description = line[:80]
break
seen_names.add(name)
cmd_name = name.lower().replace(' ', '-').replace('_', '-')
_skill_commands[f"/{cmd_name}"] = {
"name": name,
"description": description or f"Invoke the {name} skill",
"skill_md_path": str(skill_md),
"skill_dir": str(skill_md.parent),
}
except Exception:
name = frontmatter.get('name', skill_md.parent.name)
# Respect user's disabled skills config
if name in disabled:
continue
description = frontmatter.get('description', '')
if not description:
for line in body.strip().split('\n'):
line = line.strip()
if line and not line.startswith('#'):
description = line[:80]
break
cmd_name = name.lower().replace(' ', '-').replace('_', '-')
_skill_commands[f"/{cmd_name}"] = {
"name": name,
"description": description or f"Invoke the {name} skill",
"skill_md_path": str(skill_md),
"skill_dir": str(skill_md.parent),
}
except Exception:
continue
except Exception:
pass
return _skill_commands
-67
View File
@@ -158,73 +158,6 @@ def _normalize_string_set(values) -> Set[str]:
return {str(v).strip() for v in values if str(v).strip()}
# ── External skills directories ──────────────────────────────────────────
def get_external_skills_dirs() -> List[Path]:
"""Read ``skills.external_dirs`` from config.yaml and return validated paths.
Each entry is expanded (``~`` and ``${VAR}``) and resolved to an absolute
path. Only directories that actually exist are returned. Duplicates and
paths that resolve to the local ``~/.hermes/skills/`` are silently skipped.
"""
config_path = get_hermes_home() / "config.yaml"
if not config_path.exists():
return []
try:
parsed = yaml_load(config_path.read_text(encoding="utf-8"))
except Exception:
return []
if not isinstance(parsed, dict):
return []
skills_cfg = parsed.get("skills")
if not isinstance(skills_cfg, dict):
return []
raw_dirs = skills_cfg.get("external_dirs")
if not raw_dirs:
return []
if isinstance(raw_dirs, str):
raw_dirs = [raw_dirs]
if not isinstance(raw_dirs, list):
return []
local_skills = (get_hermes_home() / "skills").resolve()
seen: Set[Path] = set()
result: List[Path] = []
for entry in raw_dirs:
entry = str(entry).strip()
if not entry:
continue
# Expand ~ and environment variables
expanded = os.path.expanduser(os.path.expandvars(entry))
p = Path(expanded).resolve()
if p == local_skills:
continue
if p in seen:
continue
if p.is_dir():
seen.add(p)
result.append(p)
else:
logger.debug("External skills dir does not exist, skipping: %s", p)
return result
def get_all_skills_dirs() -> List[Path]:
"""Return all skill directories: local ``~/.hermes/skills/`` first, then external.
The local dir is always first (and always included even if it doesn't exist
yet — callers handle that). External dirs follow in config order.
"""
dirs = [get_hermes_home() / "skills"]
dirs.extend(get_external_skills_dirs())
return dirs
# ── Condition extraction ──────────────────────────────────────────────────
-9
View File
@@ -402,15 +402,6 @@ skills:
# Set to 0 to disable.
creation_nudge_interval: 15
# External skill directories — share skills across tools/agents without
# copying them into ~/.hermes/skills/. Each path is expanded (~ and ${VAR})
# and resolved to an absolute path. External dirs are read-only: skill
# creation always writes to ~/.hermes/skills/. Local skills take precedence
# when names collide.
# external_dirs:
# - ~/.agents/skills
# - /home/shared/team-skills
# =============================================================================
# Agent Behavior
# =============================================================================
+5 -17
View File
@@ -70,7 +70,7 @@ _COMMAND_SPINNER_FRAMES = ("⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
from hermes_constants import get_hermes_home, display_hermes_home, OPENROUTER_BASE_URL
from hermes_constants import get_hermes_home, OPENROUTER_BASE_URL
from hermes_cli.env_loader import load_hermes_dotenv
_hermes_home = get_hermes_home()
@@ -3594,7 +3594,7 @@ class HermesCLI:
print(" To start the gateway:")
print(" python cli.py --gateway")
print()
print(f" Configuration file: {display_hermes_home()}/config.yaml")
print(" Configuration file: ~/.hermes/config.yaml")
print()
except Exception as e:
@@ -3604,7 +3604,7 @@ class HermesCLI:
print(" 1. Set environment variables:")
print(" TELEGRAM_BOT_TOKEN=your_token")
print(" DISCORD_BOT_TOKEN=your_token")
print(f" 2. Or configure settings in {display_hermes_home()}/config.yaml")
print(" 2. Or configure settings in ~/.hermes/config.yaml")
print()
def process_command(self, command: str) -> bool:
@@ -3811,7 +3811,7 @@ class HermesCLI:
plugins = mgr.list_plugins()
if not plugins:
print("No plugins installed.")
print(f"Drop plugin directories into {display_hermes_home()}/plugins/ to get started.")
print("Drop plugin directories into ~/.hermes/plugins/ to get started.")
else:
print(f"Plugins ({len(plugins)}):")
for p in plugins:
@@ -4340,7 +4340,7 @@ class HermesCLI:
source = f" ({s['source']})" if s["source"] == "user" else ""
print(f" {marker} {s['name']}{source}{s['description']}")
print("\n Usage: /skin <name>")
print(f" Custom skins: drop a YAML file in {display_hermes_home()}/skins/\n")
print(" Custom skins: drop a YAML file in ~/.hermes/skins/\n")
return
new_skin = parts[1].strip().lower()
@@ -5944,9 +5944,6 @@ class HermesCLI:
``normal_prompt`` is the full ``branding.prompt_symbol``.
``state_suffix`` is what special states (sudo/secret/approval/agent)
should render after their leading icon.
When a profile is active (not "default"), the profile name is
prepended to the prompt symbol: ``coder `` instead of ````.
"""
try:
from hermes_cli.skin_engine import get_active_prompt_symbol
@@ -5955,15 +5952,6 @@ class HermesCLI:
symbol = " "
symbol = (symbol or " ").rstrip() + " "
# Prepend profile name when not default
try:
from hermes_cli.profiles import get_active_profile_name
profile = get_active_profile_name()
if profile not in ("default", "custom"):
symbol = f"{profile} {symbol}"
except Exception:
pass
stripped = symbol.rstrip()
if not stripped:
return " ", " "
+2 -5
View File
@@ -5,11 +5,8 @@ set -e
HERMES_HOME="/opt/data"
INSTALL_DIR="/opt/hermes"
# Create essential directory structure. Cache and platform directories
# (cache/images, cache/audio, platforms/whatsapp, etc.) are created on
# demand by the application — don't pre-create them here so new installs
# get the consolidated layout from get_hermes_dir().
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,hooks,memories,skills}
# Create directory structure
mkdir -p "$HERMES_HOME"/{cron,sessions,logs,pairing,hooks,image_cache,audio_cache,memories,skills,whatsapp/session}
# .env
if [ ! -f "$HERMES_HOME/.env" ]; then
-11
View File
@@ -1261,17 +1261,6 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
# Port conflict detection — fail fast if port is already in use
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)
_s.connect(('127.0.0.1', self._port))
logger.error('[%s] Port %d already in use. Set a different port in config.yaml: platforms.api_server.port', self.name, self._port)
return False
except (ConnectionRefusedError, OSError):
pass # port is free
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self._host, self._port)
+8 -6
View File
@@ -1005,7 +1005,7 @@ class BasePlatformAdapter(ABC):
# simultaneous messages. Queue them without interrupting the active run,
# then process them immediately after the current task finishes.
if event.message_type == MessageType.PHOTO:
logger.debug("[%s] Queuing photo follow-up for session %s without interrupt", self.name, session_key)
print(f"[{self.name}] 🖼️ Queuing photo follow-up for session {session_key} without interrupt")
existing = self._pending_messages.get(session_key)
if existing and existing.message_type == MessageType.PHOTO:
existing.media_urls.extend(event.media_urls)
@@ -1020,7 +1020,7 @@ class BasePlatformAdapter(ABC):
return # Don't interrupt now - will run after current task completes
# Default behavior for non-photo follow-ups: interrupt the running agent
logger.debug("[%s] New message while session %s is active triggering interrupt", self.name, session_key)
print(f"[{self.name}] New message while session {session_key} is active - triggering interrupt")
self._pending_messages[session_key] = event
# Signal the interrupt (the processing task checks this)
self._active_sessions[session_key].set()
@@ -1206,9 +1206,9 @@ class BasePlatformAdapter(ABC):
)
if not media_result.success:
logger.warning("[%s] Failed to send media (%s): %s", self.name, ext, media_result.error)
print(f"[{self.name}] Failed to send media ({ext}): {media_result.error}")
except Exception as media_err:
logger.warning("[%s] Error sending media: %s", self.name, media_err)
print(f"[{self.name}] Error sending media: {media_err}")
# Send auto-detected local files as native attachments
for file_path in local_files:
@@ -1240,7 +1240,7 @@ class BasePlatformAdapter(ABC):
# Check if there's a pending message that was queued during our processing
if session_key in self._pending_messages:
pending_event = self._pending_messages.pop(session_key)
logger.debug("[%s] Processing queued message from interrupt", self.name)
print(f"[{self.name}] 📨 Processing queued message from interrupt")
# Clean up current session before processing pending
if session_key in self._active_sessions:
del self._active_sessions[session_key]
@@ -1254,7 +1254,9 @@ class BasePlatformAdapter(ABC):
return # Already cleaned up
except Exception as e:
logger.error("[%s] Error handling message: %s", self.name, e, exc_info=True)
print(f"[{self.name}] Error handling message: {e}")
import traceback
traceback.print_exc()
# Send the error to the user so they aren't left with radio silence
try:
error_type = type(e).__name__
+12 -34
View File
@@ -486,16 +486,6 @@ class DiscordAdapter(BasePlatformAdapter):
return False
try:
# Acquire scoped lock to prevent duplicate bot token usage
from gateway.status import acquire_scoped_lock
acquired, existing = acquire_scoped_lock('discord-bot-token', self.config.token, metadata={'platform': 'discord'})
if not acquired:
owner_pid = existing.get('pid') if isinstance(existing, dict) else None
message = f'Discord bot token already in use' + (f' (PID {owner_pid})' if owner_pid else '') + '. Stop the other gateway first.'
logger.error('[%s] %s', self.name, message)
self._set_fatal_error('discord_token_lock', message, retryable=False)
return False
# Set up intents -- members intent needed for username-to-ID resolution
intents = Intents.default()
intents.message_content = True
@@ -648,14 +638,6 @@ class DiscordAdapter(BasePlatformAdapter):
self._running = False
self._client = None
self._ready_event.clear()
# Release the token lock
try:
from gateway.status import release_scoped_lock
release_scoped_lock('discord-bot-token', self.config.token)
except Exception:
pass
logger.info("[%s] Disconnected", self.name)
async def send(
@@ -1447,23 +1429,15 @@ class DiscordAdapter(BasePlatformAdapter):
command_text: str,
followup_msg: str | None = None,
) -> None:
"""Common handler for simple slash commands that dispatch a command string.
Defers the interaction (shows "thinking..."), dispatches the command,
then cleans up the deferred response. If *followup_msg* is provided
the "thinking..." indicator is replaced with that text; otherwise it
is deleted so the channel isn't cluttered.
"""
"""Common handler for simple slash commands that dispatch a command string."""
await interaction.response.defer(ephemeral=True)
event = self._build_slash_event(interaction, command_text)
await self.handle_message(event)
try:
if followup_msg:
await interaction.edit_original_response(content=followup_msg)
else:
await interaction.delete_original_response()
except Exception as e:
logger.debug("Discord interaction cleanup failed: %s", e)
if followup_msg:
try:
await interaction.followup.send(followup_msg, ephemeral=True)
except Exception as e:
logger.debug("Discord followup failed: %s", e)
def _register_slash_commands(self) -> None:
"""Register Discord slash commands on the command tree."""
@@ -1488,7 +1462,9 @@ class DiscordAdapter(BasePlatformAdapter):
@tree.command(name="reasoning", description="Show or change reasoning effort")
@discord.app_commands.describe(effort="Reasoning effort: xhigh, high, medium, low, minimal, or none.")
async def slash_reasoning(interaction: discord.Interaction, effort: str = ""):
await self._run_simple_slash(interaction, f"/reasoning {effort}".strip())
await interaction.response.defer(ephemeral=True)
event = self._build_slash_event(interaction, f"/reasoning {effort}".strip())
await self.handle_message(event)
@tree.command(name="personality", description="Set a personality")
@discord.app_commands.describe(name="Personality name. Leave empty to list available.")
@@ -1561,7 +1537,9 @@ class DiscordAdapter(BasePlatformAdapter):
discord.app_commands.Choice(name="status — show current mode", value="status"),
])
async def slash_voice(interaction: discord.Interaction, mode: str = ""):
await self._run_simple_slash(interaction, f"/voice {mode}".strip())
await interaction.response.defer(ephemeral=True)
event = self._build_slash_event(interaction, f"/voice {mode}".strip())
await self.handle_message(event)
@tree.command(name="update", description="Update Hermes Agent to the latest version")
async def slash_update(interaction: discord.Interaction):
+3 -21
View File
@@ -603,19 +603,9 @@ class MattermostAdapter(BasePlatformAdapter):
# For DMs, user_id is sufficient. For channels, check for @mention.
message_text = post.get("message", "")
# Mention-gating for non-DM channels.
# Config (env vars):
# MATTERMOST_REQUIRE_MENTION: Require @mention in channels (default: true)
# MATTERMOST_FREE_RESPONSE_CHANNELS: Channel IDs where bot responds without mention
# Mention-only mode: skip channel messages that don't @mention the bot.
# DMs (type "D") are always processed.
if channel_type_raw != "D":
require_mention = os.getenv(
"MATTERMOST_REQUIRE_MENTION", "true"
).lower() not in ("false", "0", "no")
free_channels_raw = os.getenv("MATTERMOST_FREE_RESPONSE_CHANNELS", "")
free_channels = {ch.strip() for ch in free_channels_raw.split(",") if ch.strip()}
is_free_channel = channel_id in free_channels
mention_patterns = [
f"@{self._bot_username}",
f"@{self._bot_user_id}",
@@ -624,21 +614,13 @@ class MattermostAdapter(BasePlatformAdapter):
pattern.lower() in message_text.lower()
for pattern in mention_patterns
)
if require_mention and not is_free_channel and not has_mention:
if not has_mention:
logger.debug(
"Mattermost: skipping non-DM message without @mention (channel=%s)",
channel_id,
)
return
# Strip @mention from the message text so the agent sees clean input.
if has_mention:
for pattern in mention_patterns:
message_text = re.sub(
re.escape(pattern), "", message_text, flags=re.IGNORECASE
).strip()
# Resolve sender info.
sender_id = post.get("user_id", "")
sender_name = data.get("sender_name", "").lstrip("@") or sender_id
+3 -36
View File
@@ -22,7 +22,7 @@ import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any
from urllib.parse import quote, unquote
from urllib.parse import unquote
import httpx
@@ -184,8 +184,6 @@ class SignalAdapter(BasePlatformAdapter):
self._recent_sent_timestamps: set = set()
self._max_recent_timestamps = 50
self._phone_lock_identity: Optional[str] = None
logger.info("Signal adapter initialized: url=%s account=%s groups=%s",
self.http_url, _redact_phone(self.account),
"enabled" if self.group_allow_from else "disabled")
@@ -200,29 +198,6 @@ class SignalAdapter(BasePlatformAdapter):
logger.error("Signal: SIGNAL_HTTP_URL and SIGNAL_ACCOUNT are required")
return False
# Acquire scoped lock to prevent duplicate Signal listeners for the same phone
try:
from gateway.status import acquire_scoped_lock
self._phone_lock_identity = self.account
acquired, existing = acquire_scoped_lock(
"signal-phone",
self._phone_lock_identity,
metadata={"platform": self.platform.value},
)
if not acquired:
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
message = (
"Another local Hermes gateway is already using this Signal account"
+ (f" (PID {owner_pid})." if owner_pid else ".")
+ " Stop the other gateway before starting a second Signal listener."
)
logger.error("Signal: %s", message)
self._set_fatal_error("signal_phone_lock", message, retryable=False)
return False
except Exception as e:
logger.warning("Signal: Could not acquire phone lock (non-fatal): %s", e)
self.client = httpx.AsyncClient(timeout=30.0)
# Health check — verify signal-cli daemon is reachable
@@ -270,14 +245,6 @@ class SignalAdapter(BasePlatformAdapter):
await self.client.aclose()
self.client = None
if self._phone_lock_identity:
try:
from gateway.status import release_scoped_lock
release_scoped_lock("signal-phone", self._phone_lock_identity)
except Exception as e:
logger.warning("Signal: Error releasing phone lock: %s", e, exc_info=True)
self._phone_lock_identity = None
logger.info("Signal: disconnected")
# ------------------------------------------------------------------
@@ -286,7 +253,7 @@ class SignalAdapter(BasePlatformAdapter):
async def _sse_listener(self) -> None:
"""Listen for SSE events from signal-cli daemon."""
url = f"{self.http_url}/api/v1/events?account={quote(self.account, safe='')}"
url = f"{self.http_url}/api/v1/events?account={self.account}"
backoff = SSE_RETRY_DELAY_INITIAL
while self._running:
@@ -554,7 +521,7 @@ class SignalAdapter(BasePlatformAdapter):
"""Fetch an attachment via JSON-RPC and cache it. Returns (path, ext)."""
result = await self._rpc("getAttachment", {
"account": self.account,
"id": attachment_id,
"attachmentId": attachment_id,
})
if not result:
-20
View File
@@ -93,16 +93,6 @@ class SlackAdapter(BasePlatformAdapter):
return False
try:
# Acquire scoped lock to prevent duplicate app token usage
from gateway.status import acquire_scoped_lock
acquired, existing = acquire_scoped_lock('slack-app-token', app_token, metadata={'platform': 'slack'})
if not acquired:
owner_pid = existing.get('pid') if isinstance(existing, dict) else None
message = f'Slack app token already in use' + (f' (PID {owner_pid})' if owner_pid else '') + '. Stop the other gateway first.'
logger.error('[%s] %s', self.name, message)
self._set_fatal_error('slack_token_lock', message, retryable=False)
return False
self._app = AsyncApp(token=bot_token)
# Get our own bot user ID for mention detection
@@ -148,16 +138,6 @@ class SlackAdapter(BasePlatformAdapter):
except Exception as e: # pragma: no cover - defensive logging
logger.warning("[Slack] Error while closing Socket Mode handler: %s", e, exc_info=True)
self._running = False
# Release the token lock
try:
from gateway.status import release_scoped_lock
app_token = os.getenv("SLACK_APP_TOKEN")
if app_token:
release_scoped_lock('slack-app-token', app_token)
except Exception:
pass
logger.info("[Slack] Disconnected")
async def send(
-11
View File
@@ -118,17 +118,6 @@ class WebhookAdapter(BasePlatformAdapter):
app.router.add_get("/health", self._handle_health)
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
# Port conflict detection — fail fast if port is already in use
import socket as _socket
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)
_s.connect(('127.0.0.1', self._port))
logger.error('[webhook] Port %d already in use. Set a different port in config.yaml: platforms.webhook.port', self._port)
return False
except (ConnectionRefusedError, OSError):
pass # port is free
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self._host, self._port)
-38
View File
@@ -142,7 +142,6 @@ class WhatsAppAdapter(BasePlatformAdapter):
self._bridge_log_fh = None
self._bridge_log: Optional[Path] = None
self._poll_task: Optional[asyncio.Task] = None
self._session_lock_identity: Optional[str] = None
async def connect(self) -> bool:
"""
@@ -161,29 +160,6 @@ class WhatsAppAdapter(BasePlatformAdapter):
logger.info("[%s] Bridge found at %s", self.name, bridge_path)
# Acquire scoped lock to prevent duplicate sessions
try:
from gateway.status import acquire_scoped_lock
self._session_lock_identity = str(self._session_path)
acquired, existing = acquire_scoped_lock(
"whatsapp-session",
self._session_lock_identity,
metadata={"platform": self.platform.value},
)
if not acquired:
owner_pid = existing.get("pid") if isinstance(existing, dict) else None
message = (
"Another local Hermes gateway is already using this WhatsApp session"
+ (f" (PID {owner_pid})." if owner_pid else ".")
+ " Stop the other gateway before starting a second WhatsApp bridge."
)
logger.error("[%s] %s", self.name, message)
self._set_fatal_error("whatsapp_session_lock", message, retryable=False)
return False
except Exception as e:
logger.warning("[%s] Could not acquire session lock (non-fatal): %s", self.name, e)
# Auto-install npm dependencies if node_modules doesn't exist
bridge_dir = bridge_path.parent
if not (bridge_dir / "node_modules").exists():
@@ -337,12 +313,6 @@ class WhatsAppAdapter(BasePlatformAdapter):
return True
except Exception as e:
if self._session_lock_identity:
try:
from gateway.status import release_scoped_lock
release_scoped_lock("whatsapp-session", self._session_lock_identity)
except Exception:
pass
logger.error("[%s] Failed to start bridge: %s", self.name, e, exc_info=True)
self._close_bridge_log()
return False
@@ -401,17 +371,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
# Bridge was not started by us, don't kill it
print(f"[{self.name}] Disconnecting (external bridge left running)")
if self._session_lock_identity:
try:
from gateway.status import release_scoped_lock
release_scoped_lock("whatsapp-session", self._session_lock_identity)
except Exception as e:
logger.warning("[%s] Error releasing WhatsApp session lock: %s", self.name, e, exc_info=True)
self._mark_disconnected()
self._bridge_process = None
self._close_bridge_log()
self._session_lock_identity = None
print(f"[{self.name}] Disconnected")
async def send(
-7
View File
@@ -959,13 +959,6 @@ class GatewayRunner:
"""
logger.info("Starting Hermes Gateway...")
logger.info("Session storage: %s", self.config.sessions_dir)
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name()
if _profile and _profile != "default":
logger.info("Active profile: %s", _profile)
except Exception:
pass
try:
from gateway.status import write_runtime_status
write_runtime_status(gateway_state="starting", exit_reason=None)
+2 -2
View File
@@ -38,7 +38,7 @@ import httpx
import yaml
from hermes_cli.config import get_hermes_home, get_config_path
from hermes_constants import OPENROUTER_BASE_URL, display_hermes_home
from hermes_constants import OPENROUTER_BASE_URL
logger = logging.getLogger(__name__)
@@ -2021,7 +2021,7 @@ def _login_openai_codex(args, pconfig: ProviderConfig) -> None:
config_path = _update_config_for_provider("openai-codex", creds.get("base_url", DEFAULT_CODEX_BASE_URL))
print()
print("Login successful!")
print(f" Auth state: {display_hermes_home()}/auth.json")
print(" Auth state: ~/.hermes/auth.json")
print(f" Config updated: {config_path} (model.provider=openai-codex)")
-9
View File
@@ -403,15 +403,6 @@ def build_welcome_banner(console: Console, model: str, cwd: str,
if mcp_connected:
summary_parts.append(f"{mcp_connected} MCP servers")
summary_parts.append("/help for commands")
# Show active profile name when not 'default'
try:
from hermes_cli.profiles import get_active_profile_name
_profile_name = get_active_profile_name()
if _profile_name and _profile_name != "default":
right_lines.append(f"[bold {accent}]Profile:[/] [{text}]{_profile_name}[/]")
except Exception:
pass # Never break the banner over a profiles.py bug
right_lines.append(f"[dim {dim}]{' · '.join(summary_parts)}[/]")
# Update check — use prefetched result if available
+2 -5
View File
@@ -12,7 +12,6 @@ import getpass
from hermes_cli.banner import cprint, _DIM, _RST
from hermes_cli.config import save_env_value_secure
from hermes_constants import display_hermes_home
def clarify_callback(cli, question, choices):
@@ -132,8 +131,7 @@ def prompt_for_secret(cli, var_name: str, prompt: str, metadata=None) -> dict:
}
stored = save_env_value_secure(var_name, value)
_dhh = display_hermes_home()
cprint(f"\n{_DIM} ✓ Stored secret in {_dhh}/.env as {var_name}{_RST}")
cprint(f"\n{_DIM} ✓ Stored secret in ~/.hermes/.env as {var_name}{_RST}")
return {
**stored,
"skipped": False,
@@ -185,8 +183,7 @@ def prompt_for_secret(cli, var_name: str, prompt: str, metadata=None) -> dict:
}
stored = save_env_value_secure(var_name, value)
_dhh = display_hermes_home()
cprint(f"\n{_DIM} ✓ Stored secret in {_dhh}/.env as {var_name}{_RST}")
cprint(f"\n{_DIM} ✓ Stored secret in ~/.hermes/.env as {var_name}{_RST}")
return {
**stored,
"skipped": False,
-21
View File
@@ -366,13 +366,6 @@ DEFAULT_CONFIG = {
# Never saved to sessions, logs, or trajectories.
"prefill_messages_file": "",
# Skills — external skill directories for sharing skills across tools/agents.
# Each path is expanded (~, ${VAR}) and resolved. Read-only — skill creation
# always goes to ~/.hermes/skills/.
"skills": {
"external_dirs": [], # e.g. ["~/.agents/skills", "/shared/team-skills"]
},
# Honcho AI-native memory -- reads ~/.honcho/config.json as single source of truth.
# This section is only needed for hermes-specific overrides; everything else
# (apiKey, workspace, peerName, sessions, enabled) comes from the global config.
@@ -824,20 +817,6 @@ OPTIONAL_ENV_VARS = {
"password": False,
"category": "messaging",
},
"MATTERMOST_REQUIRE_MENTION": {
"description": "Require @mention in Mattermost channels (default: true). Set to false to respond to all messages.",
"prompt": "Require @mention in channels",
"url": None,
"password": False,
"category": "messaging",
},
"MATTERMOST_FREE_RESPONSE_CHANNELS": {
"description": "Comma-separated Mattermost channel IDs where bot responds without @mention",
"prompt": "Free-response channel IDs (comma-separated)",
"url": None,
"password": False,
"category": "messaging",
},
"MATRIX_HOMESERVER": {
"description": "Matrix homeserver URL (e.g. https://matrix.example.org)",
"prompt": "Matrix homeserver URL",
+24 -73
View File
@@ -10,11 +10,9 @@ import subprocess
import shutil
from hermes_cli.config import get_project_root, get_hermes_home, get_env_path
from hermes_constants import display_hermes_home
PROJECT_ROOT = get_project_root()
HERMES_HOME = get_hermes_home()
_DHH = display_hermes_home() # user-facing display path (e.g. ~/.hermes or ~/.hermes/profiles/coder)
# Load environment variables from ~/.hermes/.env so API key checks work
from dotenv import load_dotenv
@@ -211,14 +209,14 @@ def run_doctor(args):
# Check ~/.hermes/.env (primary location for user config)
env_path = HERMES_HOME / '.env'
if env_path.exists():
check_ok(f"{_DHH}/.env file exists")
check_ok("~/.hermes/.env file exists")
# Check for common issues
content = env_path.read_text()
if _has_provider_env_config(content):
check_ok("API key or custom endpoint configured")
else:
check_warn(f"No API key found in {_DHH}/.env")
check_warn("No API key found in ~/.hermes/.env")
issues.append("Run 'hermes setup' to configure API keys")
else:
# Also check project root as fallback
@@ -226,11 +224,11 @@ def run_doctor(args):
if fallback_env.exists():
check_ok(".env file exists (in project directory)")
else:
check_fail(f"{_DHH}/.env file missing")
check_fail("~/.hermes/.env file missing")
if should_fix:
env_path.parent.mkdir(parents=True, exist_ok=True)
env_path.touch()
check_ok(f"Created empty {_DHH}/.env")
check_ok("Created empty ~/.hermes/.env")
check_info("Run 'hermes setup' to configure API keys")
fixed_count += 1
else:
@@ -240,7 +238,7 @@ def run_doctor(args):
# Check ~/.hermes/config.yaml (primary) or project cli-config.yaml (fallback)
config_path = HERMES_HOME / 'config.yaml'
if config_path.exists():
check_ok(f"{_DHH}/config.yaml exists")
check_ok("~/.hermes/config.yaml exists")
else:
fallback_config = PROJECT_ROOT / 'cli-config.yaml'
if fallback_config.exists():
@@ -250,11 +248,11 @@ def run_doctor(args):
if should_fix and example_config.exists():
config_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(str(example_config), str(config_path))
check_ok(f"Created {_DHH}/config.yaml from cli-config.yaml.example")
check_ok("Created ~/.hermes/config.yaml from cli-config.yaml.example")
fixed_count += 1
elif should_fix:
check_warn("config.yaml not found and no example to copy from")
manual_issues.append(f"Create {_DHH}/config.yaml manually")
manual_issues.append("Create ~/.hermes/config.yaml manually")
else:
check_warn("config.yaml not found", "(using defaults)")
@@ -296,28 +294,28 @@ def run_doctor(args):
hermes_home = HERMES_HOME
if hermes_home.exists():
check_ok(f"{_DHH} directory exists")
check_ok("~/.hermes directory exists")
else:
if should_fix:
hermes_home.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH} directory")
check_ok("Created ~/.hermes directory")
fixed_count += 1
else:
check_warn(f"{_DHH} not found", "(will be created on first use)")
check_warn("~/.hermes not found", "(will be created on first use)")
# Check expected subdirectories
expected_subdirs = ["cron", "sessions", "logs", "skills", "memories"]
for subdir_name in expected_subdirs:
subdir_path = hermes_home / subdir_name
if subdir_path.exists():
check_ok(f"{_DHH}/{subdir_name}/ exists")
check_ok(f"~/.hermes/{subdir_name}/ exists")
else:
if should_fix:
subdir_path.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH}/{subdir_name}/")
check_ok(f"Created ~/.hermes/{subdir_name}/")
fixed_count += 1
else:
check_warn(f"{_DHH}/{subdir_name}/ not found", "(will be created on first use)")
check_warn(f"~/.hermes/{subdir_name}/ not found", "(will be created on first use)")
# Check for SOUL.md persona file
soul_path = hermes_home / "SOUL.md"
@@ -326,11 +324,11 @@ def run_doctor(args):
# Check if it's just the template comments (no real content)
lines = [l for l in content.splitlines() if l.strip() and not l.strip().startswith(("<!--", "-->", "#"))]
if lines:
check_ok(f"{_DHH}/SOUL.md exists (persona configured)")
check_ok("~/.hermes/SOUL.md exists (persona configured)")
else:
check_info(f"{_DHH}/SOUL.md exists but is empty — edit it to customize personality")
check_info("~/.hermes/SOUL.md exists but is empty — edit it to customize personality")
else:
check_warn(f"{_DHH}/SOUL.md not found", "(create it to give Hermes a custom personality)")
check_warn("~/.hermes/SOUL.md not found", "(create it to give Hermes a custom personality)")
if should_fix:
soul_path.parent.mkdir(parents=True, exist_ok=True)
soul_path.write_text(
@@ -339,13 +337,13 @@ def run_doctor(args):
"You are Hermes, a helpful AI assistant.\n",
encoding="utf-8",
)
check_ok(f"Created {_DHH}/SOUL.md with basic template")
check_ok("Created ~/.hermes/SOUL.md with basic template")
fixed_count += 1
# Check memory directory
memories_dir = hermes_home / "memories"
if memories_dir.exists():
check_ok(f"{_DHH}/memories/ directory exists")
check_ok("~/.hermes/memories/ directory exists")
memory_file = memories_dir / "MEMORY.md"
user_file = memories_dir / "USER.md"
if memory_file.exists():
@@ -359,10 +357,10 @@ def run_doctor(args):
else:
check_info("USER.md not created yet (will be created when the agent first writes a memory)")
else:
check_warn(f"{_DHH}/memories/ not found", "(will be created on first use)")
check_warn("~/.hermes/memories/ not found", "(will be created on first use)")
if should_fix:
memories_dir.mkdir(parents=True, exist_ok=True)
check_ok(f"Created {_DHH}/memories/")
check_ok("Created ~/.hermes/memories/")
fixed_count += 1
# Check SQLite session store
@@ -374,11 +372,11 @@ def run_doctor(args):
cursor = conn.execute("SELECT COUNT(*) FROM sessions")
count = cursor.fetchone()[0]
conn.close()
check_ok(f"{_DHH}/state.db exists ({count} sessions)")
check_ok(f"~/.hermes/state.db exists ({count} sessions)")
except Exception as e:
check_warn(f"{_DHH}/state.db exists but has issues: {e}")
check_warn(f"~/.hermes/state.db exists but has issues: {e}")
else:
check_info(f"{_DHH}/state.db not created yet (will be created on first session)")
check_info("~/.hermes/state.db not created yet (will be created on first session)")
_check_gateway_service_linger(issues)
@@ -693,7 +691,7 @@ def run_doctor(args):
if github_token:
check_ok("GitHub token configured (authenticated API access)")
else:
check_warn("No GITHUB_TOKEN", f"(60 req/hr rate limit — set in {_DHH}/.env for better rates)")
check_warn("No GITHUB_TOKEN", "(60 req/hr rate limit — set in ~/.hermes/.env for better rates)")
# =========================================================================
# Honcho memory
@@ -730,53 +728,6 @@ def run_doctor(args):
except Exception as _e:
check_warn("Honcho check failed", str(_e))
# =========================================================================
# Profiles
# =========================================================================
try:
from hermes_cli.profiles import list_profiles, _get_wrapper_dir, profile_exists
import re as _re
named_profiles = [p for p in list_profiles() if not p.is_default]
if named_profiles:
print()
print(color("◆ Profiles", Colors.CYAN, Colors.BOLD))
check_ok(f"{len(named_profiles)} profile(s) found")
wrapper_dir = _get_wrapper_dir()
for p in named_profiles:
parts = []
if p.gateway_running:
parts.append("gateway running")
if p.model:
parts.append(p.model[:30])
if not (p.path / "config.yaml").exists():
parts.append("⚠ missing config")
if not (p.path / ".env").exists():
parts.append("no .env")
wrapper = wrapper_dir / p.name
if not wrapper.exists():
parts.append("no alias")
status = ", ".join(parts) if parts else "configured"
check_ok(f" {p.name}: {status}")
# Check for orphan wrappers
if wrapper_dir.is_dir():
for wrapper in wrapper_dir.iterdir():
if not wrapper.is_file():
continue
try:
content = wrapper.read_text()
if "hermes -p" in content:
_m = _re.search(r"hermes -p (\S+)", content)
if _m and not profile_exists(_m.group(1)):
check_warn(f"Orphan alias: {wrapper.name} → profile '{_m.group(1)}' no longer exists")
except Exception:
pass
except ImportError:
pass
except Exception as _e:
logger.debug("Profile health check failed: %s", _e)
# =========================================================================
# Summary
# =========================================================================
+1 -2
View File
@@ -15,7 +15,6 @@ from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
from hermes_cli.config import get_env_value, get_hermes_home, save_env_value, is_managed, managed_error
from hermes_constants import display_hermes_home
from hermes_cli.setup import (
print_header, print_info, print_success, print_warning, print_error,
prompt, prompt_choice, prompt_yes_no,
@@ -936,7 +935,7 @@ def launchd_install(force: bool = False):
print()
print("Next steps:")
print(" hermes gateway status # Check status")
print(f" tail -f {display_hermes_home()}/logs/gateway.log # View logs")
print(" tail -f ~/.hermes/logs/gateway.log # View logs")
def launchd_uninstall():
plist_path = get_launchd_plist_path()
+3 -417
View File
@@ -54,71 +54,6 @@ from typing import Optional
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
sys.path.insert(0, str(PROJECT_ROOT))
# ---------------------------------------------------------------------------
# Profile override — MUST happen before any hermes module import.
#
# Many modules cache HERMES_HOME at import time (module-level constants).
# We intercept --profile/-p from sys.argv here and set the env var so that
# every subsequent ``os.getenv("HERMES_HOME", ...)`` resolves correctly.
# The flag is stripped from sys.argv so argparse never sees it.
# Falls back to ~/.hermes/active_profile for sticky default.
# ---------------------------------------------------------------------------
def _apply_profile_override() -> None:
"""Pre-parse --profile/-p and set HERMES_HOME before module imports."""
argv = sys.argv[1:]
profile_name = None
consume = 0
# 1. Check for explicit -p / --profile flag
for i, arg in enumerate(argv):
if arg in ("--profile", "-p") and i + 1 < len(argv):
profile_name = argv[i + 1]
consume = 2
break
elif arg.startswith("--profile="):
profile_name = arg.split("=", 1)[1]
consume = 1
break
# 2. If no flag, check ~/.hermes/active_profile
if profile_name is None:
try:
active_path = Path.home() / ".hermes" / "active_profile"
if active_path.exists():
name = active_path.read_text().strip()
if name and name != "default":
profile_name = name
consume = 0 # don't strip anything from argv
except (UnicodeDecodeError, OSError):
pass # corrupted file, skip
# 3. If we found a profile, resolve and set HERMES_HOME
if profile_name is not None:
try:
from hermes_cli.profiles import resolve_profile_env
hermes_home = resolve_profile_env(profile_name)
except (ValueError, FileNotFoundError) as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(1)
except Exception as exc:
# A bug in profiles.py must NEVER prevent hermes from starting
print(f"Warning: profile override failed ({exc}), using default", file=sys.stderr)
return
os.environ["HERMES_HOME"] = hermes_home
# Strip the flag from argv so argparse doesn't choke
if consume > 0:
for i, arg in enumerate(argv):
if arg in ("--profile", "-p"):
start = i + 1 # +1 because argv is sys.argv[1:]
sys.argv = sys.argv[:start] + sys.argv[start + consume:]
break
elif arg.startswith("--profile="):
start = i + 1
sys.argv = sys.argv[:start] + sys.argv[start + 1:]
break
_apply_profile_override()
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
from hermes_cli.config import get_hermes_home
@@ -1045,7 +980,6 @@ def _model_flow_openrouter(config, current_model=""):
cfg["model"] = model
model["provider"] = "openrouter"
model["base_url"] = OPENROUTER_BASE_URL
model["api_mode"] = "chat_completions"
save_config(cfg)
deactivate_provider()
print(f"Default model set to: {selected} (via OpenRouter)")
@@ -1269,7 +1203,6 @@ def _model_flow_custom(config):
cfg["model"] = model
model["provider"] = "custom"
model["base_url"] = effective_url
model["api_mode"] = "chat_completions"
save_config(cfg)
deactivate_provider()
@@ -2051,7 +1984,6 @@ def _model_flow_kimi(config, current_model=""):
cfg["model"] = model
model["provider"] = provider_id
model["base_url"] = effective_base
model["api_mode"] = "chat_completions"
save_config(cfg)
deactivate_provider()
@@ -2158,7 +2090,6 @@ def _model_flow_api_key_provider(config, provider_id, current_model=""):
cfg["model"] = model
model["provider"] = provider_id
model["base_url"] = effective_base
model["api_mode"] = "chat_completions"
save_config(cfg)
deactivate_provider()
@@ -2190,8 +2121,7 @@ def _run_anthropic_oauth_flow(save_env_value):
):
use_anthropic_claude_code_credentials(save_fn=save_env_value)
print(" ✓ Claude Code credentials linked.")
from hermes_constants import display_hermes_home as _dhh_fn
print(f" Hermes will use Claude's credential store directly instead of copying a setup-token into {_dhh_fn()}/.env.")
print(" Hermes will use Claude's credential store directly instead of copying a setup-token into ~/.hermes/.env.")
return True
return False
@@ -2989,35 +2919,7 @@ def cmd_update(args):
print(" ✓ Skills are up to date")
except Exception as e:
logger.debug("Skills sync during update failed: %s", e)
# Sync bundled skills to all other profiles
try:
from hermes_cli.profiles import list_profiles, get_active_profile_name, seed_profile_skills
active = get_active_profile_name()
other_profiles = [p for p in list_profiles() if not p.is_default and p.name != active]
if other_profiles:
print()
print("→ Syncing bundled skills to other profiles...")
for p in other_profiles:
try:
r = seed_profile_skills(p.path, quiet=True)
if r:
copied = len(r.get("copied", []))
updated = len(r.get("updated", []))
modified = len(r.get("user_modified", []))
parts = []
if copied: parts.append(f"+{copied} new")
if updated: parts.append(f"{updated} updated")
if modified: parts.append(f"~{modified} user-modified")
status = ", ".join(parts) if parts else "up to date"
else:
status = "sync failed"
print(f" {p.name}: {status}")
except Exception as pe:
print(f" {p.name}: error ({pe})")
except Exception:
pass # profiles module not available or no profiles
# Check for config migrations
print()
print("→ Checking configuration for new options...")
@@ -3215,7 +3117,6 @@ def _coalesce_session_name_args(argv: list) -> list:
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
"mcp", "sessions", "insights", "version", "update", "uninstall",
"profile",
}
_SESSION_FLAGS = {"-c", "--continue", "-r", "--resume"}
@@ -3239,253 +3140,6 @@ def _coalesce_session_name_args(argv: list) -> list:
return result
def cmd_profile(args):
"""Profile management — create, delete, list, switch, alias."""
from hermes_cli.profiles import (
list_profiles, create_profile, delete_profile, seed_profile_skills,
get_active_profile, set_active_profile, get_active_profile_name,
check_alias_collision, create_wrapper_script, remove_wrapper_script,
_is_wrapper_dir_in_path, _get_wrapper_dir,
)
from hermes_constants import display_hermes_home
action = getattr(args, "profile_action", None)
if action is None:
# Bare `hermes profile` — show current profile status
profile_name = get_active_profile_name()
dhh = display_hermes_home()
print(f"\nActive profile: {profile_name}")
print(f"Path: {dhh}")
profiles = list_profiles()
for p in profiles:
if p.name == profile_name or (profile_name == "default" and p.is_default):
if p.model:
print(f"Model: {p.model}" + (f" ({p.provider})" if p.provider else ""))
print(f"Gateway: {'running' if p.gateway_running else 'stopped'}")
print(f"Skills: {p.skill_count} installed")
if p.alias_path:
print(f"Alias: {p.name} → hermes -p {p.name}")
break
print()
return
if action == "list":
profiles = list_profiles()
active = get_active_profile_name()
if not profiles:
print("No profiles found.")
return
# Header
print(f"\n {'Profile':<16} {'Model':<28} {'Gateway':<12} {'Alias'}")
print(f" {'' * 15} {'' * 27} {'' * 11} {'' * 12}")
for p in profiles:
marker = "" if (p.name == active or (active == "default" and p.is_default)) else " "
name = p.name
model = (p.model or "")[:26]
gw = "running" if p.gateway_running else "stopped"
alias = p.name if p.alias_path else ""
if p.is_default:
alias = ""
print(f"{marker}{name:<15} {model:<28} {gw:<12} {alias}")
print()
elif action == "use":
name = args.profile_name
try:
set_active_profile(name)
if name == "default":
print(f"Switched to: default (~/.hermes)")
else:
print(f"Switched to: {name}")
except (ValueError, FileNotFoundError) as e:
print(f"Error: {e}")
sys.exit(1)
elif action == "create":
name = args.profile_name
clone = getattr(args, "clone", False)
clone_all = getattr(args, "clone_all", False)
no_alias = getattr(args, "no_alias", False)
try:
clone_from = getattr(args, "clone_from", None)
profile_dir = create_profile(
name=name,
clone_from=clone_from,
clone_all=clone_all,
clone_config=clone,
no_alias=no_alias,
)
print(f"\nProfile '{name}' created at {profile_dir}")
if clone or clone_all:
source_label = getattr(args, "clone_from", None) or get_active_profile_name()
if clone_all:
print(f"Full copy from {source_label}.")
else:
print(f"Cloned config, .env, SOUL.md from {source_label}.")
# Seed bundled skills (skip if --clone-all already copied them)
if not clone_all:
result = seed_profile_skills(profile_dir)
if result:
copied = len(result.get("copied", []))
print(f"{copied} bundled skills synced.")
else:
print("⚠ Skills could not be seeded. Run `{} update` to retry.".format(name))
# Create wrapper alias
if not no_alias:
collision = check_alias_collision(name)
if collision:
print(f"\n⚠ Cannot create alias '{name}'{collision}")
print(f" Choose a custom alias: hermes profile alias {name} --name <custom>")
print(f" Or access via flag: hermes -p {name} chat")
else:
wrapper_path = create_wrapper_script(name)
if wrapper_path:
print(f"Wrapper created: {wrapper_path}")
if not _is_wrapper_dir_in_path():
print(f"\n{_get_wrapper_dir()} is not in your PATH.")
print(f' Add to your shell config (~/.bashrc or ~/.zshrc):')
print(f' export PATH="$HOME/.local/bin:$PATH"')
# Next steps
print(f"\nNext steps:")
print(f" {name} setup Configure API keys and model")
print(f" {name} chat Start chatting")
print(f" {name} gateway start Start the messaging gateway")
if clone or clone_all:
from hermes_constants import get_hermes_home
profile_dir_display = f"~/.hermes/profiles/{name}"
print(f"\n Edit {profile_dir_display}/.env for different API keys")
print(f" Edit {profile_dir_display}/SOUL.md for different personality")
print()
except (ValueError, FileExistsError, FileNotFoundError) as e:
print(f"Error: {e}")
sys.exit(1)
elif action == "delete":
name = args.profile_name
yes = getattr(args, "yes", False)
try:
delete_profile(name, yes=yes)
except (ValueError, FileNotFoundError) as e:
print(f"Error: {e}")
sys.exit(1)
elif action == "show":
name = args.profile_name
from hermes_cli.profiles import get_profile_dir, profile_exists, _read_config_model, _check_gateway_running, _count_skills
if not profile_exists(name):
print(f"Error: Profile '{name}' does not exist.")
sys.exit(1)
profile_dir = get_profile_dir(name)
model, provider = _read_config_model(profile_dir)
gw = _check_gateway_running(profile_dir)
skills = _count_skills(profile_dir)
wrapper = _get_wrapper_dir() / name
print(f"\nProfile: {name}")
print(f"Path: {profile_dir}")
if model:
print(f"Model: {model}" + (f" ({provider})" if provider else ""))
print(f"Gateway: {'running' if gw else 'stopped'}")
print(f"Skills: {skills}")
print(f".env: {'exists' if (profile_dir / '.env').exists() else 'not configured'}")
print(f"SOUL.md: {'exists' if (profile_dir / 'SOUL.md').exists() else 'not configured'}")
if wrapper.exists():
print(f"Alias: {wrapper}")
print()
elif action == "alias":
name = args.profile_name
remove = getattr(args, "remove", False)
custom_name = getattr(args, "alias_name", None)
from hermes_cli.profiles import profile_exists
if not profile_exists(name):
print(f"Error: Profile '{name}' does not exist.")
sys.exit(1)
alias_name = custom_name or name
if remove:
if remove_wrapper_script(alias_name):
print(f"✓ Removed alias '{alias_name}'")
else:
print(f"No alias '{alias_name}' found to remove.")
else:
collision = check_alias_collision(alias_name)
if collision:
print(f"Error: {collision}")
sys.exit(1)
wrapper_path = create_wrapper_script(alias_name)
if wrapper_path:
# If custom name, write the profile name into the wrapper
if custom_name:
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {name} "$@"\n')
print(f"✓ Alias created: {wrapper_path}")
if not _is_wrapper_dir_in_path():
print(f"{_get_wrapper_dir()} is not in your PATH.")
elif action == "rename":
from hermes_cli.profiles import rename_profile
try:
new_dir = rename_profile(args.old_name, args.new_name)
print(f"\nProfile renamed: {args.old_name}{args.new_name}")
print(f"Path: {new_dir}\n")
except (ValueError, FileExistsError, FileNotFoundError) as e:
print(f"Error: {e}")
sys.exit(1)
elif action == "export":
from hermes_cli.profiles import export_profile
name = args.profile_name
output = args.output or f"{name}.tar.gz"
try:
result_path = export_profile(name, output)
print(f"✓ Exported '{name}' to {result_path}")
except (ValueError, FileNotFoundError) as e:
print(f"Error: {e}")
sys.exit(1)
elif action == "import":
from hermes_cli.profiles import import_profile
try:
profile_dir = import_profile(args.archive, name=getattr(args, "import_name", None))
name = profile_dir.name
print(f"✓ Imported profile '{name}' at {profile_dir}")
# Offer to create alias
collision = check_alias_collision(name)
if not collision:
wrapper_path = create_wrapper_script(name)
if wrapper_path:
print(f" Wrapper created: {wrapper_path}")
print()
except (ValueError, FileExistsError, FileNotFoundError) as e:
print(f"Error: {e}")
sys.exit(1)
def cmd_completion(args):
"""Print shell completion script."""
from hermes_cli.profiles import generate_bash_completion, generate_zsh_completion
shell = getattr(args, "shell", "bash")
if shell == "zsh":
print(generate_zsh_completion())
else:
print(generate_bash_completion())
def main():
"""Main entry point for hermes CLI."""
parser = argparse.ArgumentParser(
@@ -4673,75 +4327,7 @@ For more help on a command:
sys.exit(1)
acp_parser.set_defaults(func=cmd_acp)
# =========================================================================
# profile command
# =========================================================================
profile_parser = subparsers.add_parser(
"profile",
help="Manage profiles — multiple isolated Hermes instances",
)
profile_subparsers = profile_parser.add_subparsers(dest="profile_action")
profile_list = profile_subparsers.add_parser("list", help="List all profiles")
profile_use = profile_subparsers.add_parser("use", help="Set sticky default profile")
profile_use.add_argument("profile_name", help="Profile name (or 'default')")
profile_create = profile_subparsers.add_parser("create", help="Create a new profile")
profile_create.add_argument("profile_name", help="Profile name (lowercase, alphanumeric)")
profile_create.add_argument("--clone", action="store_true",
help="Copy config.yaml, .env, SOUL.md from active profile")
profile_create.add_argument("--clone-all", action="store_true",
help="Full copy of active profile (all state)")
profile_create.add_argument("--clone-from", metavar="SOURCE",
help="Source profile to clone from (default: active)")
profile_create.add_argument("--no-alias", action="store_true",
help="Skip wrapper script creation")
profile_delete = profile_subparsers.add_parser("delete", help="Delete a profile")
profile_delete.add_argument("profile_name", help="Profile to delete")
profile_delete.add_argument("-y", "--yes", action="store_true",
help="Skip confirmation prompt")
profile_show = profile_subparsers.add_parser("show", help="Show profile details")
profile_show.add_argument("profile_name", help="Profile to show")
profile_alias = profile_subparsers.add_parser("alias", help="Manage wrapper scripts")
profile_alias.add_argument("profile_name", help="Profile name")
profile_alias.add_argument("--remove", action="store_true",
help="Remove the wrapper script")
profile_alias.add_argument("--name", dest="alias_name", metavar="NAME",
help="Custom alias name (default: profile name)")
profile_rename = profile_subparsers.add_parser("rename", help="Rename a profile")
profile_rename.add_argument("old_name", help="Current profile name")
profile_rename.add_argument("new_name", help="New profile name")
profile_export = profile_subparsers.add_parser("export", help="Export a profile to archive")
profile_export.add_argument("profile_name", help="Profile to export")
profile_export.add_argument("-o", "--output", default=None,
help="Output file (default: <name>.tar.gz)")
profile_import = profile_subparsers.add_parser("import", help="Import a profile from archive")
profile_import.add_argument("archive", help="Path to .tar.gz archive")
profile_import.add_argument("--name", dest="import_name", metavar="NAME",
help="Profile name (default: inferred from archive)")
profile_parser.set_defaults(func=cmd_profile)
# =========================================================================
# completion command
# =========================================================================
completion_parser = subparsers.add_parser(
"completion",
help="Print shell completion script (bash or zsh)",
)
completion_parser.add_argument(
"shell", nargs="?", default="bash", choices=["bash", "zsh"],
help="Shell type (default: bash)",
)
completion_parser.set_defaults(func=cmd_completion)
# =========================================================================
# Parse and execute
# =========================================================================
+2 -3
View File
@@ -24,7 +24,6 @@ from hermes_cli.config import (
get_hermes_home, # noqa: F401 — used by test mocks
)
from hermes_cli.colors import Colors, color
from hermes_constants import display_hermes_home
logger = logging.getLogger(__name__)
@@ -245,7 +244,7 @@ def cmd_mcp_add(args):
api_key = _prompt("API key / Bearer token", password=True)
if api_key:
save_env_value(env_key, api_key)
_success(f"Saved to {display_hermes_home()}/.env as {env_key}")
_success(f"Saved to ~/.hermes/.env as {env_key}")
# Set header with env var interpolation
if api_key or existing_key:
@@ -333,7 +332,7 @@ def cmd_mcp_add(args):
_save_mcp_server(name, server_config)
print()
_success(f"Saved '{name}' to {display_hermes_home()}/config.yaml ({tool_count}/{total} tools enabled)")
_success(f"Saved '{name}' to ~/.hermes/config.yaml ({tool_count}/{total} tools enabled)")
_info("Start a new session to use these tools.")
-906
View File
@@ -1,906 +0,0 @@
"""
Profile management for multiple isolated Hermes instances.
Each profile is a fully independent HERMES_HOME directory with its own
config.yaml, .env, memory, sessions, skills, gateway, cron, and logs.
Profiles live under ``~/.hermes/profiles/<name>/`` by default.
The "default" profile is ``~/.hermes`` itself backward compatible,
zero migration needed.
Usage::
hermes profile create coder # fresh profile + bundled skills
hermes profile create coder --clone # also copy config, .env, SOUL.md
hermes profile create coder --clone-all # full copy of source profile
coder chat # use via wrapper alias
hermes -p coder chat # or via flag
hermes profile use coder # set as sticky default
hermes profile delete coder # remove profile + alias + service
"""
import json
import os
import re
import shutil
import stat
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional
_PROFILE_ID_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$")
# Directories bootstrapped inside every new profile
_PROFILE_DIRS = [
"memories",
"sessions",
"skills",
"skins",
"logs",
"plans",
"workspace",
"cron",
]
# Files copied during --clone (if they exist in the source)
_CLONE_CONFIG_FILES = [
"config.yaml",
".env",
"SOUL.md",
]
# Runtime files stripped after --clone-all (shouldn't carry over)
_CLONE_ALL_STRIP = [
"gateway.pid",
"gateway_state.json",
"processes.json",
]
# Names that cannot be used as profile aliases
_RESERVED_NAMES = frozenset({
"hermes", "default", "test", "tmp", "root", "sudo",
})
# Hermes subcommands that cannot be used as profile names/aliases
_HERMES_SUBCOMMANDS = frozenset({
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout",
"status", "cron", "doctor", "config", "pairing", "skills", "tools",
"mcp", "sessions", "insights", "version", "update", "uninstall",
"profile", "plugins", "honcho", "acp",
})
# ---------------------------------------------------------------------------
# Path helpers
# ---------------------------------------------------------------------------
def _get_profiles_root() -> Path:
"""Return the directory where named profiles are stored.
Always ``~/.hermes/profiles/`` anchored to the user's home,
NOT to the current HERMES_HOME (which may itself be a profile).
This ensures ``coder profile list`` can see all profiles.
"""
return Path.home() / ".hermes" / "profiles"
def _get_default_hermes_home() -> Path:
"""Return the default (pre-profile) HERMES_HOME path."""
return Path.home() / ".hermes"
def _get_active_profile_path() -> Path:
"""Return the path to the sticky active_profile file."""
return _get_default_hermes_home() / "active_profile"
def _get_wrapper_dir() -> Path:
"""Return the directory for wrapper scripts."""
return Path.home() / ".local" / "bin"
# ---------------------------------------------------------------------------
# Validation
# ---------------------------------------------------------------------------
def validate_profile_name(name: str) -> None:
"""Raise ``ValueError`` if *name* is not a valid profile identifier."""
if name == "default":
return # special alias for ~/.hermes
if not _PROFILE_ID_RE.match(name):
raise ValueError(
f"Invalid profile name {name!r}. Must match "
f"[a-z0-9][a-z0-9_-]{{0,63}}"
)
def get_profile_dir(name: str) -> Path:
"""Resolve a profile name to its HERMES_HOME directory."""
if name == "default":
return _get_default_hermes_home()
return _get_profiles_root() / name
def profile_exists(name: str) -> bool:
"""Check whether a profile directory exists."""
if name == "default":
return True
return get_profile_dir(name).is_dir()
# ---------------------------------------------------------------------------
# Alias / wrapper script management
# ---------------------------------------------------------------------------
def check_alias_collision(name: str) -> Optional[str]:
"""Return a human-readable collision message, or None if the name is safe.
Checks: reserved names, hermes subcommands, existing binaries in PATH.
"""
if name in _RESERVED_NAMES:
return f"'{name}' is a reserved name"
if name in _HERMES_SUBCOMMANDS:
return f"'{name}' conflicts with a hermes subcommand"
# Check existing commands in PATH
wrapper_dir = _get_wrapper_dir()
try:
result = subprocess.run(
["which", name], capture_output=True, text=True, timeout=5,
)
if result.returncode == 0:
existing_path = result.stdout.strip()
# Allow overwriting our own wrappers
if existing_path == str(wrapper_dir / name):
try:
content = (wrapper_dir / name).read_text()
if "hermes -p" in content:
return None # it's our wrapper, safe to overwrite
except Exception:
pass
return f"'{name}' conflicts with an existing command ({existing_path})"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None # safe
def _is_wrapper_dir_in_path() -> bool:
"""Check if ~/.local/bin is in PATH."""
wrapper_dir = str(_get_wrapper_dir())
return wrapper_dir in os.environ.get("PATH", "").split(os.pathsep)
def create_wrapper_script(name: str) -> Optional[Path]:
"""Create a shell wrapper script at ~/.local/bin/<name>.
Returns the path to the created wrapper, or None if creation failed.
"""
wrapper_dir = _get_wrapper_dir()
try:
wrapper_dir.mkdir(parents=True, exist_ok=True)
except OSError as e:
print(f"⚠ Could not create {wrapper_dir}: {e}")
return None
wrapper_path = wrapper_dir / name
try:
wrapper_path.write_text(f'#!/bin/sh\nexec hermes -p {name} "$@"\n')
wrapper_path.chmod(wrapper_path.stat().st_mode | stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)
return wrapper_path
except OSError as e:
print(f"⚠ Could not create wrapper at {wrapper_path}: {e}")
return None
def remove_wrapper_script(name: str) -> bool:
"""Remove the wrapper script for a profile. Returns True if removed."""
wrapper_path = _get_wrapper_dir() / name
if wrapper_path.exists():
try:
# Verify it's our wrapper before removing
content = wrapper_path.read_text()
if "hermes -p" in content:
wrapper_path.unlink()
return True
except Exception:
pass
return False
# ---------------------------------------------------------------------------
# ProfileInfo
# ---------------------------------------------------------------------------
@dataclass
class ProfileInfo:
"""Summary information about a profile."""
name: str
path: Path
is_default: bool
gateway_running: bool
model: Optional[str] = None
provider: Optional[str] = None
has_env: bool = False
skill_count: int = 0
alias_path: Optional[Path] = None
def _read_config_model(profile_dir: Path) -> tuple:
"""Read model/provider from a profile's config.yaml. Returns (model, provider)."""
config_path = profile_dir / "config.yaml"
if not config_path.exists():
return None, None
try:
import yaml
with open(config_path, "r") as f:
cfg = yaml.safe_load(f) or {}
model_cfg = cfg.get("model", {})
if isinstance(model_cfg, str):
return model_cfg, None
if isinstance(model_cfg, dict):
return model_cfg.get("model"), model_cfg.get("provider")
return None, None
except Exception:
return None, None
def _check_gateway_running(profile_dir: Path) -> bool:
"""Check if a gateway is running for a given profile directory."""
pid_file = profile_dir / "gateway.pid"
if not pid_file.exists():
return False
try:
raw = pid_file.read_text().strip()
if not raw:
return False
data = json.loads(raw) if raw.startswith("{") else {"pid": int(raw)}
pid = int(data["pid"])
os.kill(pid, 0) # existence check
return True
except (json.JSONDecodeError, KeyError, ValueError, TypeError,
ProcessLookupError, PermissionError, OSError):
return False
def _count_skills(profile_dir: Path) -> int:
"""Count installed skills in a profile."""
skills_dir = profile_dir / "skills"
if not skills_dir.is_dir():
return 0
count = 0
for md in skills_dir.rglob("SKILL.md"):
if "/.hub/" not in str(md) and "/.git/" not in str(md):
count += 1
return count
# ---------------------------------------------------------------------------
# CRUD operations
# ---------------------------------------------------------------------------
def list_profiles() -> List[ProfileInfo]:
"""Return info for all profiles, including the default."""
profiles = []
wrapper_dir = _get_wrapper_dir()
# Default profile
default_home = _get_default_hermes_home()
if default_home.is_dir():
model, provider = _read_config_model(default_home)
profiles.append(ProfileInfo(
name="default",
path=default_home,
is_default=True,
gateway_running=_check_gateway_running(default_home),
model=model,
provider=provider,
has_env=(default_home / ".env").exists(),
skill_count=_count_skills(default_home),
))
# Named profiles
profiles_root = _get_profiles_root()
if profiles_root.is_dir():
for entry in sorted(profiles_root.iterdir()):
if not entry.is_dir():
continue
name = entry.name
if not _PROFILE_ID_RE.match(name):
continue
model, provider = _read_config_model(entry)
alias_path = wrapper_dir / name
profiles.append(ProfileInfo(
name=name,
path=entry,
is_default=False,
gateway_running=_check_gateway_running(entry),
model=model,
provider=provider,
has_env=(entry / ".env").exists(),
skill_count=_count_skills(entry),
alias_path=alias_path if alias_path.exists() else None,
))
return profiles
def create_profile(
name: str,
clone_from: Optional[str] = None,
clone_all: bool = False,
clone_config: bool = False,
no_alias: bool = False,
) -> Path:
"""Create a new profile directory.
Parameters
----------
name:
Profile identifier (lowercase, alphanumeric, hyphens, underscores).
clone_from:
Source profile to clone from. If ``None`` and clone_config/clone_all
is True, defaults to the currently active profile.
clone_all:
If True, do a full copytree of the source (all state).
clone_config:
If True, copy only config files (config.yaml, .env, SOUL.md).
no_alias:
If True, skip wrapper script creation.
Returns
-------
Path
The newly created profile directory.
"""
validate_profile_name(name)
if name == "default":
raise ValueError(
"Cannot create a profile named 'default' — it is the built-in profile (~/.hermes)."
)
profile_dir = get_profile_dir(name)
if profile_dir.exists():
raise FileExistsError(f"Profile '{name}' already exists at {profile_dir}")
# Resolve clone source
source_dir = None
if clone_from is not None or clone_all or clone_config:
if clone_from is None:
# Default: clone from active profile
from hermes_constants import get_hermes_home
source_dir = get_hermes_home()
else:
validate_profile_name(clone_from)
source_dir = get_profile_dir(clone_from)
if not source_dir.is_dir():
raise FileNotFoundError(
f"Source profile '{clone_from or 'active'}' does not exist at {source_dir}"
)
if clone_all and source_dir:
# Full copy of source profile
shutil.copytree(source_dir, profile_dir)
# Strip runtime files
for stale in _CLONE_ALL_STRIP:
(profile_dir / stale).unlink(missing_ok=True)
else:
# Bootstrap directory structure
profile_dir.mkdir(parents=True, exist_ok=True)
for subdir in _PROFILE_DIRS:
(profile_dir / subdir).mkdir(parents=True, exist_ok=True)
# Clone config files from source
if source_dir is not None:
for filename in _CLONE_CONFIG_FILES:
src = source_dir / filename
if src.exists():
shutil.copy2(src, profile_dir / filename)
return profile_dir
def seed_profile_skills(profile_dir: Path, quiet: bool = False) -> Optional[dict]:
"""Seed bundled skills into a profile via subprocess.
Uses subprocess because sync_skills() caches HERMES_HOME at module level.
Returns the sync result dict, or None on failure.
"""
project_root = Path(__file__).parent.parent.resolve()
try:
result = subprocess.run(
[sys.executable, "-c",
"import json; from tools.skills_sync import sync_skills; "
"r = sync_skills(quiet=True); print(json.dumps(r))"],
env={**os.environ, "HERMES_HOME": str(profile_dir)},
cwd=str(project_root),
capture_output=True, text=True, timeout=60,
)
if result.returncode == 0 and result.stdout.strip():
return json.loads(result.stdout.strip())
if not quiet:
print(f"⚠ Skill seeding returned exit code {result.returncode}")
if result.stderr.strip():
print(f" {result.stderr.strip()[:200]}")
return None
except subprocess.TimeoutExpired:
if not quiet:
print("⚠ Skill seeding timed out (60s)")
return None
except Exception as e:
if not quiet:
print(f"⚠ Skill seeding failed: {e}")
return None
def delete_profile(name: str, yes: bool = False) -> Path:
"""Delete a profile, its wrapper script, and its gateway service.
Stops the gateway if running. Disables systemd/launchd service first
to prevent auto-restart.
Returns the path that was removed.
"""
validate_profile_name(name)
if name == "default":
raise ValueError(
"Cannot delete the default profile (~/.hermes).\n"
"To remove everything, use: hermes uninstall"
)
profile_dir = get_profile_dir(name)
if not profile_dir.is_dir():
raise FileNotFoundError(f"Profile '{name}' does not exist.")
# Show what will be deleted
model, provider = _read_config_model(profile_dir)
gw_running = _check_gateway_running(profile_dir)
skill_count = _count_skills(profile_dir)
print(f"\nProfile: {name}")
print(f"Path: {profile_dir}")
if model:
print(f"Model: {model}" + (f" ({provider})" if provider else ""))
if skill_count:
print(f"Skills: {skill_count}")
items = [
"All config, API keys, memories, sessions, skills, cron jobs",
]
# Check for service
from hermes_cli.gateway import _profile_suffix, get_service_name
wrapper_path = _get_wrapper_dir() / name
has_wrapper = wrapper_path.exists()
if has_wrapper:
items.append(f"Command alias ({wrapper_path})")
print(f"\nThis will permanently delete:")
for item in items:
print(f"{item}")
if gw_running:
print(f" ⚠ Gateway is running — it will be stopped.")
# Confirmation
if not yes:
print()
try:
confirm = input(f"Type '{name}' to confirm: ").strip()
except (KeyboardInterrupt, EOFError):
print("\nCancelled.")
return profile_dir
if confirm != name:
print("Cancelled.")
return profile_dir
# 1. Disable service (prevents auto-restart)
_cleanup_gateway_service(name, profile_dir)
# 2. Stop running gateway
if gw_running:
_stop_gateway_process(profile_dir)
# 3. Remove wrapper script
if has_wrapper:
if remove_wrapper_script(name):
print(f"✓ Removed {wrapper_path}")
# 4. Remove profile directory
try:
shutil.rmtree(profile_dir)
print(f"✓ Removed {profile_dir}")
except Exception as e:
print(f"⚠ Could not remove {profile_dir}: {e}")
# 5. Clear active_profile if it pointed to this profile
try:
active = get_active_profile()
if active == name:
set_active_profile("default")
print("✓ Active profile reset to default")
except Exception:
pass
print(f"\nProfile '{name}' deleted.")
return profile_dir
def _cleanup_gateway_service(name: str, profile_dir: Path) -> None:
"""Disable and remove systemd/launchd service for a profile."""
import platform as _platform
# Derive service name for this profile
# Temporarily set HERMES_HOME so _profile_suffix resolves correctly
old_home = os.environ.get("HERMES_HOME")
try:
os.environ["HERMES_HOME"] = str(profile_dir)
from hermes_cli.gateway import get_service_name, get_launchd_plist_path
if _platform.system() == "Linux":
svc_name = get_service_name()
svc_file = Path.home() / ".config" / "systemd" / "user" / f"{svc_name}.service"
if svc_file.exists():
subprocess.run(
["systemctl", "--user", "disable", svc_name],
capture_output=True, check=False, timeout=10,
)
subprocess.run(
["systemctl", "--user", "stop", svc_name],
capture_output=True, check=False, timeout=10,
)
svc_file.unlink(missing_ok=True)
subprocess.run(
["systemctl", "--user", "daemon-reload"],
capture_output=True, check=False, timeout=10,
)
print(f"✓ Service {svc_name} removed")
elif _platform.system() == "Darwin":
plist_path = get_launchd_plist_path()
if plist_path.exists():
subprocess.run(
["launchctl", "unload", str(plist_path)],
capture_output=True, check=False, timeout=10,
)
plist_path.unlink(missing_ok=True)
print(f"✓ Launchd service removed")
except Exception as e:
print(f"⚠ Service cleanup: {e}")
finally:
if old_home is not None:
os.environ["HERMES_HOME"] = old_home
elif "HERMES_HOME" in os.environ:
del os.environ["HERMES_HOME"]
def _stop_gateway_process(profile_dir: Path) -> None:
"""Stop a running gateway process via its PID file."""
import signal as _signal
import time as _time
pid_file = profile_dir / "gateway.pid"
if not pid_file.exists():
return
try:
raw = pid_file.read_text().strip()
data = json.loads(raw) if raw.startswith("{") else {"pid": int(raw)}
pid = int(data["pid"])
os.kill(pid, _signal.SIGTERM)
# Wait up to 10s for graceful shutdown
for _ in range(20):
_time.sleep(0.5)
try:
os.kill(pid, 0)
except ProcessLookupError:
print(f"✓ Gateway stopped (PID {pid})")
return
# Force kill
try:
os.kill(pid, _signal.SIGKILL)
except ProcessLookupError:
pass
print(f"✓ Gateway force-stopped (PID {pid})")
except (ProcessLookupError, PermissionError):
print("✓ Gateway already stopped")
except Exception as e:
print(f"⚠ Could not stop gateway: {e}")
# ---------------------------------------------------------------------------
# Active profile (sticky default)
# ---------------------------------------------------------------------------
def get_active_profile() -> str:
"""Read the sticky active profile name.
Returns ``"default"`` if no active_profile file exists or it's empty.
"""
path = _get_active_profile_path()
try:
name = path.read_text().strip()
if not name:
return "default"
return name
except (FileNotFoundError, UnicodeDecodeError, OSError):
return "default"
def set_active_profile(name: str) -> None:
"""Set the sticky active profile.
Writes to ``~/.hermes/active_profile``. Use ``"default"`` to clear.
"""
validate_profile_name(name)
if name != "default" and not profile_exists(name):
raise FileNotFoundError(
f"Profile '{name}' does not exist. "
f"Create it with: hermes profile create {name}"
)
path = _get_active_profile_path()
path.parent.mkdir(parents=True, exist_ok=True)
if name == "default":
# Remove the file to indicate default
path.unlink(missing_ok=True)
else:
# Atomic write
tmp = path.with_suffix(".tmp")
tmp.write_text(name + "\n")
tmp.replace(path)
def get_active_profile_name() -> str:
"""Infer the current profile name from HERMES_HOME.
Returns ``"default"`` if HERMES_HOME is not set or points to ``~/.hermes``.
Returns the profile name if HERMES_HOME points into ``~/.hermes/profiles/<name>``.
Returns ``"custom"`` if HERMES_HOME is set to an unrecognized path.
"""
from hermes_constants import get_hermes_home
hermes_home = get_hermes_home()
resolved = hermes_home.resolve()
default_resolved = _get_default_hermes_home().resolve()
if resolved == default_resolved:
return "default"
profiles_root = _get_profiles_root().resolve()
try:
rel = resolved.relative_to(profiles_root)
parts = rel.parts
if len(parts) == 1 and _PROFILE_ID_RE.match(parts[0]):
return parts[0]
except ValueError:
pass
return "custom"
# ---------------------------------------------------------------------------
# Export / Import
# ---------------------------------------------------------------------------
def export_profile(name: str, output_path: str) -> Path:
"""Export a profile to a tar.gz archive.
Returns the output file path.
"""
validate_profile_name(name)
profile_dir = get_profile_dir(name)
if not profile_dir.is_dir():
raise FileNotFoundError(f"Profile '{name}' does not exist.")
output = Path(output_path)
# shutil.make_archive wants the base name without extension
base = str(output).removesuffix(".tar.gz").removesuffix(".tgz")
result = shutil.make_archive(base, "gztar", str(profile_dir.parent), name)
return Path(result)
def import_profile(archive_path: str, name: Optional[str] = None) -> Path:
"""Import a profile from a tar.gz archive.
If *name* is not given, infers it from the archive's top-level directory.
Returns the imported profile directory.
"""
import tarfile
archive = Path(archive_path)
if not archive.exists():
raise FileNotFoundError(f"Archive not found: {archive}")
# Peek at the archive to find the top-level directory name
with tarfile.open(archive, "r:gz") as tf:
top_dirs = {m.name.split("/")[0] for m in tf.getmembers() if "/" in m.name}
if not top_dirs:
top_dirs = {m.name for m in tf.getmembers() if m.isdir()}
inferred_name = name or (top_dirs.pop() if len(top_dirs) == 1 else None)
if not inferred_name:
raise ValueError(
"Cannot determine profile name from archive. "
"Specify it explicitly: hermes profile import <archive> --name <name>"
)
validate_profile_name(inferred_name)
profile_dir = get_profile_dir(inferred_name)
if profile_dir.exists():
raise FileExistsError(f"Profile '{inferred_name}' already exists at {profile_dir}")
profiles_root = _get_profiles_root()
profiles_root.mkdir(parents=True, exist_ok=True)
shutil.unpack_archive(str(archive), str(profiles_root))
# If the archive extracted under a different name, rename
extracted = profiles_root / (top_dirs.pop() if top_dirs else inferred_name)
if extracted != profile_dir and extracted.exists():
extracted.rename(profile_dir)
return profile_dir
# ---------------------------------------------------------------------------
# Rename
# ---------------------------------------------------------------------------
def rename_profile(old_name: str, new_name: str) -> Path:
"""Rename a profile: directory, wrapper script, service, active_profile.
Returns the new profile directory.
"""
validate_profile_name(old_name)
validate_profile_name(new_name)
if old_name == "default":
raise ValueError("Cannot rename the default profile.")
if new_name == "default":
raise ValueError("Cannot rename to 'default' — it is reserved.")
old_dir = get_profile_dir(old_name)
new_dir = get_profile_dir(new_name)
if not old_dir.is_dir():
raise FileNotFoundError(f"Profile '{old_name}' does not exist.")
if new_dir.exists():
raise FileExistsError(f"Profile '{new_name}' already exists.")
# 1. Stop gateway if running
if _check_gateway_running(old_dir):
_cleanup_gateway_service(old_name, old_dir)
_stop_gateway_process(old_dir)
# 2. Rename directory
old_dir.rename(new_dir)
print(f"✓ Renamed {old_dir.name}{new_dir.name}")
# 3. Update wrapper script
remove_wrapper_script(old_name)
collision = check_alias_collision(new_name)
if not collision:
create_wrapper_script(new_name)
print(f"✓ Alias updated: {new_name}")
else:
print(f"⚠ Cannot create alias '{new_name}'{collision}")
# 4. Update active_profile if it pointed to old name
try:
if get_active_profile() == old_name:
set_active_profile(new_name)
print(f"✓ Active profile updated: {new_name}")
except Exception:
pass
return new_dir
# ---------------------------------------------------------------------------
# Tab completion
# ---------------------------------------------------------------------------
def generate_bash_completion() -> str:
"""Generate a bash completion script for hermes profile names."""
return '''# Hermes Agent profile completion
# Add to ~/.bashrc: eval "$(hermes completion bash)"
_hermes_profiles() {
local profiles_dir="$HOME/.hermes/profiles"
local profiles="default"
if [ -d "$profiles_dir" ]; then
profiles="$profiles $(ls "$profiles_dir" 2>/dev/null)"
fi
echo "$profiles"
}
_hermes_completion() {
local cur prev
cur="${COMP_WORDS[COMP_CWORD]}"
prev="${COMP_WORDS[COMP_CWORD-1]}"
# Complete profile names after -p / --profile
if [[ "$prev" == "-p" || "$prev" == "--profile" ]]; then
COMPREPLY=($(compgen -W "$(_hermes_profiles)" -- "$cur"))
return
fi
# Complete profile subcommands
if [[ "${COMP_WORDS[1]}" == "profile" ]]; then
case "$prev" in
profile)
COMPREPLY=($(compgen -W "list use create delete show alias rename export import" -- "$cur"))
return
;;
use|delete|show|alias|rename|export)
COMPREPLY=($(compgen -W "$(_hermes_profiles)" -- "$cur"))
return
;;
esac
fi
# Top-level subcommands
if [[ "$COMP_CWORD" == 1 ]]; then
local commands="chat model gateway setup status cron doctor config skills tools mcp sessions profile update version"
COMPREPLY=($(compgen -W "$commands" -- "$cur"))
fi
}
complete -F _hermes_completion hermes
'''
def generate_zsh_completion() -> str:
"""Generate a zsh completion script for hermes profile names."""
return '''#compdef hermes
# Hermes Agent profile completion
# Add to ~/.zshrc: eval "$(hermes completion zsh)"
_hermes() {
local -a profiles
profiles=(default)
if [[ -d "$HOME/.hermes/profiles" ]]; then
profiles+=("${(@f)$(ls $HOME/.hermes/profiles 2>/dev/null)}")
fi
_arguments \\
'-p[Profile name]:profile:($profiles)' \\
'--profile[Profile name]:profile:($profiles)' \\
'1:command:(chat model gateway setup status cron doctor config skills tools mcp sessions profile update version)' \\
'*::arg:->args'
case $words[1] in
profile)
_arguments '1:action:(list use create delete show alias rename export import)' \\
'2:profile:($profiles)'
;;
esac
}
_hermes "$@"
'''
# ---------------------------------------------------------------------------
# Profile env resolution (called from _apply_profile_override)
# ---------------------------------------------------------------------------
def resolve_profile_env(profile_name: str) -> str:
"""Resolve a profile name to a HERMES_HOME path string.
Called early in the CLI entry point, before any hermes modules
are imported, to set the HERMES_HOME environment variable.
"""
validate_profile_name(profile_name)
profile_dir = get_profile_dir(profile_name)
if profile_name != "default" and not profile_dir.is_dir():
raise FileNotFoundError(
f"Profile '{profile_name}' does not exist. "
f"Create it with: hermes profile create {profile_name}"
)
return str(profile_dir)
+3 -4
View File
@@ -289,7 +289,6 @@ from hermes_cli.config import (
get_env_value,
ensure_hermes_home,
)
from hermes_constants import display_hermes_home
from hermes_cli.colors import Colors, color
@@ -684,7 +683,7 @@ def _print_setup_summary(config: dict, hermes_home):
print_warning(
"Some tools are disabled. Run 'hermes setup tools' to configure them,"
)
print_warning(f"or edit {display_hermes_home()}/.env directly to add the missing API keys.")
print_warning("or edit ~/.hermes/.env directly to add the missing API keys.")
print()
# Done banner
@@ -707,7 +706,7 @@ def _print_setup_summary(config: dict, hermes_home):
print()
# Show file locations prominently
print(color(f"📁 All your files are in {display_hermes_home()}/:", Colors.CYAN, Colors.BOLD))
print(color("📁 All your files are in ~/.hermes/:", Colors.CYAN, Colors.BOLD))
print()
print(f" {color('Settings:', Colors.YELLOW)} {get_config_path()}")
print(f" {color('API Keys:', Colors.YELLOW)} {get_env_path()}")
@@ -2838,7 +2837,7 @@ def setup_gateway(config: dict):
save_env_value("WEBHOOK_ENABLED", "true")
print()
print_success("Webhooks enabled! Next steps:")
print_info(f" 1. Define webhook routes in {display_hermes_home()}/config.yaml")
print_info(" 1. Define webhook routes in ~/.hermes/config.yaml")
print_info(" 2. Point your service (GitHub, GitLab, etc.) at:")
print_info(" http://your-server:8644/webhooks/<route-name>")
print()
+3 -4
View File
@@ -21,7 +21,6 @@ from rich.table import Table
# Lazy imports to avoid circular dependencies and slow startup.
# tools.skills_hub and tools.skills_guard are imported inside functions.
from hermes_constants import display_hermes_home
_console = Console()
@@ -389,7 +388,7 @@ def do_install(identifier: str, category: str = "", force: bool = False,
"[bold bright_cyan]This is an official optional skill maintained by Nous Research.[/]\n\n"
"It ships with hermes-agent but is not activated by default.\n"
"Installing will copy it to your skills directory where the agent can use it.\n\n"
f"Files will be at: [cyan]{display_hermes_home()}/skills/{category + '/' if category else ''}{bundle.name}/[/]",
f"Files will be at: [cyan]~/.hermes/skills/{category + '/' if category else ''}{bundle.name}/[/]",
title="Official Skill",
border_style="bright_cyan",
))
@@ -399,7 +398,7 @@ def do_install(identifier: str, category: str = "", force: bool = False,
"External skills can contain instructions that influence agent behavior,\n"
"shell commands, and scripts. Even after automated scanning, you should\n"
"review the installed files before use.\n\n"
f"Files will be at: [cyan]{display_hermes_home()}/skills/{category + '/' if category else ''}{bundle.name}/[/]",
f"Files will be at: [cyan]~/.hermes/skills/{category + '/' if category else ''}{bundle.name}/[/]",
title="Disclaimer",
border_style="yellow",
))
@@ -745,7 +744,7 @@ def do_publish(skill_path: str, target: str = "github", repo: str = "",
auth = GitHubAuth()
if not auth.is_authenticated():
c.print("[bold red]Error:[/] GitHub authentication required.\n"
f"Set GITHUB_TOKEN in {display_hermes_home()}/.env or run 'gh auth login'.\n")
"Set GITHUB_TOKEN in ~/.hermes/.env or run 'gh auth login'.\n")
return
c.print(f"[bold]Publishing '{name}' to {repo}...[/]")
+2 -4
View File
@@ -326,8 +326,7 @@ def _run_post_setup(post_setup_key: str):
if result.returncode == 0:
_print_success(" Node.js dependencies installed")
else:
from hermes_constants import display_hermes_home
_print_warning(f" npm install failed - run manually: cd {display_hermes_home()}/hermes-agent && npm install")
_print_warning(" npm install failed - run manually: cd ~/.hermes/hermes-agent && npm install")
elif not node_modules.exists():
_print_warning(" Node.js not found - browser tools require: npm install (in hermes-agent directory)")
@@ -1265,8 +1264,7 @@ def tools_command(args=None, first_install: bool = False, config: dict = None):
platform_choices[idx] = f"Configure {pinfo['label']} ({new_count}/{total} enabled)"
print()
from hermes_constants import display_hermes_home
print(color(f" Tool configuration saved to {display_hermes_home()}/config.yaml", Colors.DIM))
print(color(" Tool configuration saved to ~/.hermes/config.yaml", Colors.DIM))
print(color(" Changes take effect on next 'hermes' or gateway restart.", Colors.DIM))
print()
+4 -8
View File
@@ -18,8 +18,6 @@ import time
from pathlib import Path
from typing import Dict, Optional
from hermes_constants import display_hermes_home
_SUBSCRIPTIONS_FILENAME = "webhook_subscriptions.json"
@@ -78,15 +76,13 @@ def _get_webhook_base_url() -> str:
return f"http://{display_host}:{port}"
def _setup_hint() -> str:
_dhh = display_hermes_home()
return f"""
_SETUP_HINT = """
Webhook platform is not enabled. To set it up:
1. Run the gateway setup wizard:
hermes gateway setup
2. Or manually add to {_dhh}/config.yaml:
2. Or manually add to ~/.hermes/config.yaml:
platforms:
webhook:
enabled: true
@@ -95,7 +91,7 @@ def _setup_hint() -> str:
port: 8644
secret: "your-global-hmac-secret"
3. Or set environment variables in {_dhh}/.env:
3. Or set environment variables in ~/.hermes/.env:
WEBHOOK_ENABLED=true
WEBHOOK_PORT=8644
WEBHOOK_SECRET=your-global-secret
@@ -108,7 +104,7 @@ def _require_webhook_enabled() -> bool:
"""Check webhook is enabled. Print setup guide and return False if not."""
if _is_webhook_enabled():
return True
print(_setup_hint())
print(_SETUP_HINT)
return False
-20
View File
@@ -38,26 +38,6 @@ def get_hermes_dir(new_subpath: str, old_name: str) -> Path:
return home / new_subpath
def display_hermes_home() -> str:
"""Return a user-friendly display string for the current HERMES_HOME.
Uses ``~/`` shorthand for readability::
default: ``~/.hermes``
profile: ``~/.hermes/profiles/coder``
custom: ``/opt/hermes-custom``
Use this in **user-facing** print/log messages instead of hardcoding
``~/.hermes``. For code that needs a real ``Path``, use
:func:`get_hermes_home` instead.
"""
home = get_hermes_home()
try:
return "~/" + str(home.relative_to(Path.home()))
except ValueError:
return str(home)
VALID_REASONING_EFFORTS = ("xhigh", "high", "medium", "low", "minimal")
+3 -4
View File
@@ -45,7 +45,7 @@ import fire
from datetime import datetime
from pathlib import Path
from hermes_constants import get_hermes_home, display_hermes_home
from hermes_constants import get_hermes_home
# Load .env from ~/.hermes/.env first, then project root as dev fallback.
# User-managed env files should override stale shell exports on restart.
@@ -6924,9 +6924,8 @@ class AIAgent:
print(f"{self.log_prefix} Auth method: {auth_method}")
print(f"{self.log_prefix} Token prefix: {key[:12]}..." if key and len(key) > 12 else f"{self.log_prefix} Token: (empty or short)")
print(f"{self.log_prefix} Troubleshooting:")
_dhh = display_hermes_home()
print(f"{self.log_prefix} • Check ANTHROPIC_TOKEN in {_dhh}/.env for Hermes-managed OAuth/setup tokens")
print(f"{self.log_prefix} • Check ANTHROPIC_API_KEY in {_dhh}/.env for API keys or legacy token values")
print(f"{self.log_prefix} • Check ANTHROPIC_TOKEN in ~/.hermes/.env for Hermes-managed OAuth/setup tokens")
print(f"{self.log_prefix} • Check ANTHROPIC_API_KEY in ~/.hermes/.env for API keys or legacy token values")
print(f"{self.log_prefix} • For API keys: verify at https://console.anthropic.com/settings/keys")
print(f"{self.log_prefix} • For Claude Code: run 'claude /login' to refresh, then retry")
print(f"{self.log_prefix} • Clear stale keys: hermes config set ANTHROPIC_TOKEN \"\"")
@@ -4,11 +4,6 @@ description: Gmail, Calendar, Drive, Contacts, Sheets, and Docs integration via
version: 1.0.0
author: Nous Research
license: MIT
required_credential_files:
- path: google_token.json
description: Google OAuth2 token (created by setup script)
- path: google_client_secret.json
description: Google OAuth2 client credentials (downloaded from Google Cloud Console)
metadata:
hermes:
tags: [Google, Gmail, Calendar, Drive, Sheets, Docs, Contacts, Email, OAuth]
+59 -108
View File
@@ -1,7 +1,7 @@
---
name: duckduckgo-search
description: Free web search via DuckDuckGo — text, news, images, videos. No API key needed. Prefer the `ddgs` CLI when installed; use the Python DDGS library only after verifying that `ddgs` is available in the current runtime.
version: 1.3.0
description: Free web search via DuckDuckGo — text, news, images, videos. No API key needed. Use the Python DDGS library or CLI to search, then web_extract for full content.
version: 1.2.0
author: gamedevCloudy
license: MIT
metadata:
@@ -9,96 +9,26 @@ metadata:
tags: [search, duckduckgo, web-search, free, fallback]
related_skills: [arxiv]
fallback_for_toolsets: [web]
prerequisites:
commands: [ddgs]
---
# DuckDuckGo Search
Free web search using DuckDuckGo. **No API key required.**
Preferred when `web_search` is unavailable or unsuitable (for example when `FIRECRAWL_API_KEY` is not set). Can also be used as a standalone search path when DuckDuckGo results are specifically desired.
Preferred when `web_search` tool is unavailable or unsuitable (no `FIRECRAWL_API_KEY` set). Can also be used as a standalone search tool.
## Detection Flow
Check what is actually available before choosing an approach:
## Setup
```bash
# Check CLI availability
command -v ddgs >/dev/null && echo "DDGS_CLI=installed" || echo "DDGS_CLI=missing"
```
Decision tree:
1. If `ddgs` CLI is installed, prefer `terminal` + `ddgs`
2. If `ddgs` CLI is missing, do not assume `execute_code` can import `ddgs`
3. If the user wants DuckDuckGo specifically, install `ddgs` first in the relevant environment
4. Otherwise fall back to built-in web/browser tools
Important runtime note:
- Terminal and `execute_code` are separate runtimes
- A successful shell install does not guarantee `execute_code` can import `ddgs`
- Never assume third-party Python packages are preinstalled inside `execute_code`
## Installation
Install `ddgs` only when DuckDuckGo search is specifically needed and the runtime does not already provide it.
```bash
# Python package + CLI entrypoint
# Install the ddgs package (one-time)
pip install ddgs
# Verify CLI
ddgs --help
```
If a workflow depends on Python imports, verify that same runtime can import `ddgs` before using `from ddgs import DDGS`.
## Python API (Primary)
## Method 1: CLI Search (Preferred)
Use the `ddgs` command via `terminal` when it exists. This is the preferred path because it avoids assuming the `execute_code` sandbox has the `ddgs` Python package installed.
```bash
# Text search
ddgs text -k "python async programming" -m 5
# News search
ddgs news -k "artificial intelligence" -m 5
# Image search
ddgs images -k "landscape photography" -m 10
# Video search
ddgs videos -k "python tutorial" -m 5
# With region filter
ddgs text -k "best restaurants" -m 5 -r us-en
# Recent results only (d=day, w=week, m=month, y=year)
ddgs text -k "latest AI news" -m 5 -t w
# JSON output for parsing
ddgs text -k "fastapi tutorial" -m 5 -o json
```
### CLI Flags
| Flag | Description | Example |
|------|-------------|---------|
| `-k` | Keywords (query) — **required** | `-k "search terms"` |
| `-m` | Max results | `-m 5` |
| `-r` | Region | `-r us-en` |
| `-t` | Time limit | `-t w` (week) |
| `-s` | Safe search | `-s off` |
| `-o` | Output format | `-o json` |
## Method 2: Python API (Only After Verification)
Use the `DDGS` class in `execute_code` or another Python runtime only after verifying that `ddgs` is installed there. Do not assume `execute_code` includes third-party packages by default.
Safe wording:
- "Use `execute_code` with `ddgs` after installing or verifying the package if needed"
Avoid saying:
- "`execute_code` includes `ddgs`"
- "DuckDuckGo search works by default in `execute_code`"
Use the `DDGS` class in `execute_code` for structured results with typed fields.
**Important:** `max_results` must always be passed as a **keyword argument** — positional usage raises an error on all methods.
@@ -146,7 +76,7 @@ from ddgs import DDGS
with DDGS() as ddgs:
for r in ddgs.images("semiconductor chip", max_results=5):
print(r["title"])
print(r["image"])
print(r["image"]) # direct image URL
print(r.get("thumbnail", ""))
print(r.get("source", ""))
print()
@@ -164,9 +94,9 @@ from ddgs import DDGS
with DDGS() as ddgs:
for r in ddgs.videos("FastAPI tutorial", max_results=5):
print(r["title"])
print(r.get("content", ""))
print(r.get("duration", ""))
print(r.get("provider", ""))
print(r.get("content", "")) # video URL
print(r.get("duration", "")) # e.g. "26:03"
print(r.get("provider", "")) # YouTube, etc.
print(r.get("published", ""))
print()
```
@@ -182,17 +112,50 @@ Returns: `title`, `content`, `description`, `duration`, `provider`, `published`,
| `images()` | Visuals, diagrams | title, image, thumbnail, url |
| `videos()` | Tutorials, demos | title, content, duration, provider |
## Workflow: Search then Extract
## CLI (Alternative)
DuckDuckGo returns titles, URLs, and snippets — not full page content. To get full page content, search first and then extract the most relevant URL with `web_extract`, browser tools, or curl.
CLI example:
Use the `ddgs` command via terminal when you don't need structured field access.
```bash
ddgs text -k "fastapi deployment guide" -m 3 -o json
# Text search
ddgs text -k "python async programming" -m 5
# News search
ddgs news -k "artificial intelligence" -m 5
# Image search
ddgs images -k "landscape photography" -m 10
# Video search
ddgs videos -k "python tutorial" -m 5
# With region filter
ddgs text -k "best restaurants" -m 5 -r us-en
# Recent results only (d=day, w=week, m=month, y=year)
ddgs text -k "latest AI news" -m 5 -t w
# JSON output for parsing
ddgs text -k "fastapi tutorial" -m 5 -o json
```
Python example, only after verifying `ddgs` is installed in that runtime:
### CLI Flags
| Flag | Description | Example |
|------|-------------|---------|
| `-k` | Keywords (query) — **required** | `-k "search terms"` |
| `-m` | Max results | `-m 5` |
| `-r` | Region | `-r us-en` |
| `-t` | Time limit | `-t w` (week) |
| `-s` | Safe search | `-s off` |
| `-o` | Output format | `-o json` |
## Workflow: Search then Extract
DuckDuckGo returns titles, URLs, and snippets — not full page content. To get full content, follow up with `web_extract`:
1. **Search** with ddgs to find relevant URLs
2. **Extract** content using the `web_extract` tool (if available) or curl
```python
from ddgs import DDGS
@@ -201,37 +164,25 @@ with DDGS() as ddgs:
results = list(ddgs.text("fastapi deployment guide", max_results=3))
for r in results:
print(r["title"], "->", r["href"])
```
Then extract the best URL with `web_extract` or another content-retrieval tool.
# Then use web_extract tool on the best URL
```
## Limitations
- **Rate limiting**: DuckDuckGo may throttle after many rapid requests. Add a short delay between searches if needed.
- **No content extraction**: `ddgs` returns snippets, not full page content. Use `web_extract`, browser tools, or curl for the full article/page.
- **No content extraction**: ddgs returns snippets, not full page content. Use `web_extract` or curl for that.
- **Results quality**: Generally good but less configurable than Firecrawl's search.
- **Availability**: DuckDuckGo may block requests from some cloud IPs. If searches return empty, try different keywords or wait a few seconds.
- **Field variability**: Return fields may vary between results or `ddgs` versions. Use `.get()` for optional fields to avoid `KeyError`.
- **Separate runtimes**: A successful `ddgs` install in terminal does not automatically mean `execute_code` can import it.
## Troubleshooting
| Problem | Likely Cause | What To Do |
|---------|--------------|------------|
| `ddgs: command not found` | CLI not installed in the shell environment | Install `ddgs`, or use built-in web/browser tools instead |
| `ModuleNotFoundError: No module named 'ddgs'` | Python runtime does not have the package installed | Do not use Python DDGS there until that runtime is prepared |
| Search returns nothing | Temporary rate limiting or poor query | Wait a few seconds, retry, or adjust the query |
| CLI works but `execute_code` import fails | Terminal and `execute_code` are different runtimes | Keep using CLI, or separately prepare the Python runtime |
- **Field variability**: Return fields may vary between results or ddgs versions. Use `.get()` for optional fields to avoid KeyError.
## Pitfalls
- **`max_results` is keyword-only**: `ddgs.text("query", 5)` raises an error. Use `ddgs.text("query", max_results=5)`.
- **Do not assume the CLI exists**: Check `command -v ddgs` before using it.
- **Do not assume `execute_code` can import `ddgs`**: `from ddgs import DDGS` may fail with `ModuleNotFoundError` unless that runtime was prepared separately.
- **Package name**: The package is `ddgs` (previously `duckduckgo-search`). Install with `pip install ddgs`.
- **Don't confuse `-k` and `-m`** (CLI): `-k` is for keywords, `-m` is for max results count.
- **Empty results**: If `ddgs` returns nothing, it may be rate-limited. Wait a few seconds and retry.
- **Package name**: The package is `ddgs` (was previously `duckduckgo-search`). Install with `pip install ddgs`.
- **Empty results**: If ddgs returns nothing, it may be rate-limited. Wait a few seconds and retry.
## Validated With
Validated examples against `ddgs==9.11.2` semantics. Skill guidance now treats CLI availability and Python import availability as separate concerns so the documented workflow matches actual runtime behavior.
Smoke-tested with `ddgs==9.11.2` on Python 3.13. All four methods (text, news, images, videos) confirmed working with keyword `max_results`.
-20
View File
@@ -1,20 +0,0 @@
"""Tests for acp_adapter.entry startup wiring."""
import acp
from acp_adapter import entry
def test_main_enables_unstable_protocol(monkeypatch):
calls = {}
async def fake_run_agent(agent, **kwargs):
calls["kwargs"] = kwargs
monkeypatch.setattr(entry, "_setup_logging", lambda: None)
monkeypatch.setattr(entry, "_load_env", lambda: None)
monkeypatch.setattr(acp, "run_agent", fake_run_agent)
entry.main()
assert calls["kwargs"]["use_unstable_protocol"] is True
-71
View File
@@ -8,7 +8,6 @@ from unittest.mock import MagicMock, AsyncMock, patch
import pytest
import acp
from acp.agent.router import build_agent_router
from acp.schema import (
AgentCapabilities,
AuthenticateResponse,
@@ -19,8 +18,6 @@ from acp.schema import (
NewSessionResponse,
PromptResponse,
ResumeSessionResponse,
SetSessionConfigOptionResponse,
SetSessionModeResponse,
SessionInfo,
TextContentBlock,
Usage,
@@ -171,74 +168,6 @@ class TestListAndFork:
assert fork_resp.session_id != new_resp.session_id
# ---------------------------------------------------------------------------
# session configuration / model routing
# ---------------------------------------------------------------------------
class TestSessionConfiguration:
@pytest.mark.asyncio
async def test_set_session_mode_returns_response(self, agent):
new_resp = await agent.new_session(cwd="/tmp")
resp = await agent.set_session_mode(mode_id="chat", session_id=new_resp.session_id)
state = agent.session_manager.get_session(new_resp.session_id)
assert isinstance(resp, SetSessionModeResponse)
assert getattr(state, "mode", None) == "chat"
@pytest.mark.asyncio
async def test_set_config_option_returns_response(self, agent):
new_resp = await agent.new_session(cwd="/tmp")
resp = await agent.set_config_option(
config_id="approval_mode",
session_id=new_resp.session_id,
value="auto",
)
state = agent.session_manager.get_session(new_resp.session_id)
assert isinstance(resp, SetSessionConfigOptionResponse)
assert getattr(state, "config_options", {}) == {"approval_mode": "auto"}
assert resp.config_options == []
@pytest.mark.asyncio
async def test_router_accepts_stable_session_config_methods(self, agent):
new_resp = await agent.new_session(cwd="/tmp")
router = build_agent_router(agent)
mode_result = await router(
"session/set_mode",
{"modeId": "chat", "sessionId": new_resp.session_id},
False,
)
config_result = await router(
"session/set_config_option",
{
"configId": "approval_mode",
"sessionId": new_resp.session_id,
"value": "auto",
},
False,
)
assert mode_result == {}
assert config_result == {"configOptions": []}
@pytest.mark.asyncio
async def test_router_accepts_unstable_model_switch_when_enabled(self, agent):
new_resp = await agent.new_session(cwd="/tmp")
router = build_agent_router(agent, use_unstable_protocol=True)
result = await router(
"session/set_model",
{"modelId": "gpt-5.4", "sessionId": new_resp.session_id},
False,
)
state = agent.session_manager.get_session(new_resp.session_id)
assert result == {}
assert state.model == "gpt-5.4"
# ---------------------------------------------------------------------------
# prompt
# ---------------------------------------------------------------------------
-157
View File
@@ -1,157 +0,0 @@
"""Tests for external skill directories (skills.external_dirs config)."""
import json
import os
from pathlib import Path
from unittest.mock import patch
import pytest
@pytest.fixture
def external_skills_dir(tmp_path):
"""Create a temp dir with a sample external skill."""
ext_dir = tmp_path / "external-skills"
skill_dir = ext_dir / "my-external-skill"
skill_dir.mkdir(parents=True)
(skill_dir / "SKILL.md").write_text(
"---\nname: my-external-skill\ndescription: A skill from an external directory\n---\n\n# My External Skill\n\nDo external things.\n"
)
return ext_dir
@pytest.fixture
def hermes_home(tmp_path):
"""Create a minimal HERMES_HOME with config."""
home = tmp_path / ".hermes"
home.mkdir()
(home / "skills").mkdir()
return home
class TestGetExternalSkillsDirs:
def test_empty_config(self, hermes_home):
(hermes_home / "config.yaml").write_text("skills:\n external_dirs: []\n")
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert result == []
def test_nonexistent_dir_skipped(self, hermes_home):
(hermes_home / "config.yaml").write_text(
"skills:\n external_dirs:\n - /nonexistent/path\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert result == []
def test_valid_dir_returned(self, hermes_home, external_skills_dir):
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {external_skills_dir}\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert len(result) == 1
assert result[0] == external_skills_dir.resolve()
def test_duplicate_dirs_deduplicated(self, hermes_home, external_skills_dir):
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {external_skills_dir}\n - {external_skills_dir}\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert len(result) == 1
def test_local_skills_dir_excluded(self, hermes_home):
local_skills = hermes_home / "skills"
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {local_skills}\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert result == []
def test_no_config_file(self, hermes_home):
# No config.yaml at all
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert result == []
def test_string_value_converted_to_list(self, hermes_home, external_skills_dir):
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs: {external_skills_dir}\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_external_skills_dirs
result = get_external_skills_dirs()
assert len(result) == 1
class TestGetAllSkillsDirs:
def test_local_always_first(self, hermes_home, external_skills_dir):
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {external_skills_dir}\n"
)
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from agent.skill_utils import get_all_skills_dirs
result = get_all_skills_dirs()
assert result[0] == hermes_home / "skills"
assert result[1] == external_skills_dir.resolve()
class TestExternalSkillsInFindAll:
def test_external_skills_found(self, hermes_home, external_skills_dir):
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {external_skills_dir}\n"
)
local_skills = hermes_home / "skills"
with (
patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}),
patch("tools.skills_tool.SKILLS_DIR", local_skills),
):
from tools.skills_tool import _find_all_skills
skills = _find_all_skills()
names = [s["name"] for s in skills]
assert "my-external-skill" in names
def test_local_takes_precedence(self, hermes_home, external_skills_dir):
"""If the same skill name exists locally and externally, local wins."""
local_skills = hermes_home / "skills"
local_skill = local_skills / "my-external-skill"
local_skill.mkdir(parents=True)
(local_skill / "SKILL.md").write_text(
"---\nname: my-external-skill\ndescription: Local version\n---\n\nLocal.\n"
)
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {external_skills_dir}\n"
)
with (
patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}),
patch("tools.skills_tool.SKILLS_DIR", local_skills),
):
from tools.skills_tool import _find_all_skills
skills = _find_all_skills()
matching = [s for s in skills if s["name"] == "my-external-skill"]
assert len(matching) == 1
assert matching[0]["description"] == "Local version"
class TestExternalSkillView:
def test_skill_view_finds_external(self, hermes_home, external_skills_dir):
(hermes_home / "config.yaml").write_text(
f"skills:\n external_dirs:\n - {external_skills_dir}\n"
)
local_skills = hermes_home / "skills"
with (
patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}),
patch("tools.skills_tool.SKILLS_DIR", local_skills),
):
from tools.skills_tool import skill_view
result = json.loads(skill_view("my-external-skill"))
assert result["success"] is True
assert "external things" in result["content"]
+5 -8
View File
@@ -5,8 +5,6 @@ import importlib
import logging
import sys
import pytest
from agent.prompt_builder import (
_scan_context_content,
_truncate_content,
@@ -196,7 +194,7 @@ class TestParseSkillFile:
)
from unittest.mock import patch
with patch("agent.skill_utils.sys") as mock_sys:
with patch("tools.skills_tool.sys") as mock_sys:
mock_sys.platform = "linux"
is_compat, _, _ = _parse_skill_file(skill_file)
assert is_compat is False
@@ -236,6 +234,9 @@ class TestPromptBuilderImports:
# =========================================================================
import pytest
class TestBuildSkillsSystemPrompt:
@pytest.fixture(autouse=True)
def _clear_skills_cache(self):
@@ -295,7 +296,7 @@ class TestBuildSkillsSystemPrompt:
from unittest.mock import patch
with patch("agent.skill_utils.sys") as mock_sys:
with patch("tools.skills_tool.sys") as mock_sys:
mock_sys.platform = "linux"
result = build_skills_system_prompt()
@@ -573,10 +574,6 @@ class TestBuildContextFilesPrompt:
result = build_context_files_prompt(cwd=str(tmp_path))
assert "Lowercase claude rules" in result
@pytest.mark.skipif(
sys.platform == "darwin",
reason="APFS default volume is case-insensitive; CLAUDE.md and claude.md alias the same path",
)
def test_claude_md_uppercase_takes_priority(self, tmp_path):
(tmp_path / "CLAUDE.md").write_text("From uppercase.")
(tmp_path / "claude.md").write_text("From lowercase.")
+11 -1
View File
@@ -246,10 +246,20 @@ Generate some audio.
def test_preserves_remaining_remote_setup_warning(self, tmp_path, monkeypatch):
monkeypatch.setenv("TERMINAL_ENV", "ssh")
monkeypatch.delenv("TENOR_API_KEY", raising=False)
def fake_secret_callback(var_name, prompt, metadata=None):
os.environ[var_name] = "stored-in-test"
return {
"success": True,
"stored_as": var_name,
"validated": False,
"skipped": False,
}
monkeypatch.setattr(
skills_tool_module,
"_secret_capture_callback",
None,
fake_secret_callback,
raising=False,
)
+1 -85
View File
@@ -1,6 +1,5 @@
"""Tests for Mattermost platform adapter."""
import json
import os
import time
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
@@ -270,7 +269,6 @@ class TestMattermostWebSocketParsing:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._bot_user_id = "bot_user_id"
self.adapter._bot_username = "hermes-bot"
# Mock handle_message to capture the MessageEvent without processing
self.adapter.handle_message = AsyncMock()
@@ -295,8 +293,7 @@ class TestMattermostWebSocketParsing:
await self.adapter._handle_ws_event(event)
assert self.adapter.handle_message.called
msg_event = self.adapter.handle_message.call_args[0][0]
# @mention is stripped from the message text
assert msg_event.text == "Hello from Matrix!"
assert msg_event.text == "@bot_user_id Hello from Matrix!"
assert msg_event.message_id == "post_abc"
@pytest.mark.asyncio
@@ -413,87 +410,6 @@ class TestMattermostWebSocketParsing:
assert not self.adapter.handle_message.called
# ---------------------------------------------------------------------------
# Mention behavior (require_mention + free_response_channels)
# ---------------------------------------------------------------------------
class TestMattermostMentionBehavior:
def setup_method(self):
self.adapter = _make_adapter()
self.adapter._bot_user_id = "bot_user_id"
self.adapter._bot_username = "hermes-bot"
self.adapter.handle_message = AsyncMock()
def _make_event(self, message, channel_type="O", channel_id="chan_456"):
post_data = {
"id": "post_mention",
"user_id": "user_123",
"channel_id": channel_id,
"message": message,
}
return {
"event": "posted",
"data": {
"post": json.dumps(post_data),
"channel_type": channel_type,
"sender_name": "@alice",
},
}
@pytest.mark.asyncio
async def test_require_mention_true_skips_without_mention(self):
"""Default: messages without @mention in channels are skipped."""
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("MATTERMOST_REQUIRE_MENTION", None)
os.environ.pop("MATTERMOST_FREE_RESPONSE_CHANNELS", None)
await self.adapter._handle_ws_event(self._make_event("hello"))
assert not self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_require_mention_false_responds_to_all(self):
"""MATTERMOST_REQUIRE_MENTION=false: respond to all channel messages."""
with patch.dict(os.environ, {"MATTERMOST_REQUIRE_MENTION": "false"}):
await self.adapter._handle_ws_event(self._make_event("hello"))
assert self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_free_response_channel_responds_without_mention(self):
"""Messages in free-response channels don't need @mention."""
with patch.dict(os.environ, {"MATTERMOST_FREE_RESPONSE_CHANNELS": "chan_456,chan_789"}):
os.environ.pop("MATTERMOST_REQUIRE_MENTION", None)
await self.adapter._handle_ws_event(self._make_event("hello", channel_id="chan_456"))
assert self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_non_free_channel_still_requires_mention(self):
"""Channels NOT in free-response list still require @mention."""
with patch.dict(os.environ, {"MATTERMOST_FREE_RESPONSE_CHANNELS": "chan_789"}):
os.environ.pop("MATTERMOST_REQUIRE_MENTION", None)
await self.adapter._handle_ws_event(self._make_event("hello", channel_id="chan_456"))
assert not self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_dm_always_responds(self):
"""DMs (channel_type=D) always respond regardless of mention settings."""
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("MATTERMOST_REQUIRE_MENTION", None)
await self.adapter._handle_ws_event(self._make_event("hello", channel_type="D"))
assert self.adapter.handle_message.called
@pytest.mark.asyncio
async def test_mention_stripped_from_text(self):
"""@mention is stripped from message text."""
with patch.dict(os.environ, {}, clear=False):
os.environ.pop("MATTERMOST_REQUIRE_MENTION", None)
await self.adapter._handle_ws_event(
self._make_event("@hermes-bot what is 2+2")
)
assert self.adapter.handle_message.called
msg = self.adapter.handle_message.call_args[0][0]
assert "@hermes-bot" not in msg.text
assert "2+2" in msg.text
# ---------------------------------------------------------------------------
# File upload (send_image)
# ---------------------------------------------------------------------------
+30 -102
View File
@@ -1,42 +1,11 @@
"""Tests for Signal messenger platform adapter."""
import base64
import json
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from urllib.parse import quote
from gateway.config import Platform, PlatformConfig
# ---------------------------------------------------------------------------
# Shared Helpers
# ---------------------------------------------------------------------------
def _make_signal_adapter(monkeypatch, account="+15551234567", **extra):
"""Create a SignalAdapter with sensible test defaults."""
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", extra.pop("group_allowed", ""))
from gateway.platforms.signal import SignalAdapter
config = PlatformConfig()
config.enabled = True
config.extra = {
"http_url": "http://localhost:8080",
"account": account,
**extra,
}
return SignalAdapter(config)
def _stub_rpc(return_value):
"""Return an async mock for SignalAdapter._rpc that captures call params."""
captured = []
async def mock_rpc(method, params, rpc_id=None):
captured.append({"method": method, "params": dict(params)})
return return_value
return mock_rpc, captured
# ---------------------------------------------------------------------------
# Platform & Config
# ---------------------------------------------------------------------------
@@ -92,22 +61,48 @@ class TestSignalConfigLoading:
# ---------------------------------------------------------------------------
class TestSignalAdapterInit:
def _make_config(self, **extra):
config = PlatformConfig()
config.enabled = True
config.extra = {
"http_url": "http://localhost:8080",
"account": "+15551234567",
**extra,
}
return config
def test_init_parses_config(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch, group_allowed="group123,group456")
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "group123,group456")
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config())
assert adapter.http_url == "http://localhost:8080"
assert adapter.account == "+15551234567"
assert "group123" in adapter.group_allow_from
def test_init_empty_allowlist(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config())
assert len(adapter.group_allow_from) == 0
def test_init_strips_trailing_slash(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch, http_url="http://localhost:8080/")
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config(http_url="http://localhost:8080/"))
assert adapter.http_url == "http://localhost:8080"
def test_self_message_filtering(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
monkeypatch.setenv("SIGNAL_GROUP_ALLOWED_USERS", "")
from gateway.platforms.signal import SignalAdapter
adapter = SignalAdapter(self._make_config())
assert adapter._account_normalized == "+15551234567"
@@ -194,73 +189,6 @@ class TestSignalHelpers:
assert check_signal_requirements() is False
# ---------------------------------------------------------------------------
# SSE URL Encoding (Bug Fix: phone numbers with + must be URL-encoded)
# ---------------------------------------------------------------------------
class TestSignalSSEUrlEncoding:
"""Verify that phone numbers with + are URL-encoded in the SSE endpoint."""
def test_sse_url_encodes_plus_in_account(self):
"""The + in E.164 phone numbers must be percent-encoded in the SSE query string."""
encoded = quote("+31612345678", safe="")
assert encoded == "%2B31612345678"
def test_sse_url_encoding_preserves_digits(self):
"""Digits and country codes should pass through URL encoding unchanged."""
assert quote("+15551234567", safe="") == "%2B15551234567"
# ---------------------------------------------------------------------------
# Attachment Fetch (Bug Fix: parameter must be "id" not "attachmentId")
# ---------------------------------------------------------------------------
class TestSignalAttachmentFetch:
"""Verify that _fetch_attachment uses the correct RPC parameter name."""
@pytest.mark.asyncio
async def test_fetch_attachment_uses_id_parameter(self, monkeypatch):
"""RPC getAttachment must use 'id', not 'attachmentId' (signal-cli requirement)."""
adapter = _make_signal_adapter(monkeypatch)
png_data = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100
b64_data = base64.b64encode(png_data).decode()
adapter._rpc, captured = _stub_rpc({"data": b64_data})
with patch("gateway.platforms.signal.cache_image_from_bytes", return_value="/tmp/test.png"):
await adapter._fetch_attachment("attachment-123")
call = captured[0]
assert call["method"] == "getAttachment"
assert call["params"]["id"] == "attachment-123"
assert "attachmentId" not in call["params"], "Must NOT use 'attachmentId' — causes NullPointerException in signal-cli"
assert call["params"]["account"] == "+15551234567"
@pytest.mark.asyncio
async def test_fetch_attachment_returns_none_on_empty(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
adapter._rpc, _ = _stub_rpc(None)
path, ext = await adapter._fetch_attachment("missing-id")
assert path is None
assert ext == ""
@pytest.mark.asyncio
async def test_fetch_attachment_handles_dict_response(self, monkeypatch):
adapter = _make_signal_adapter(monkeypatch)
pdf_data = b"%PDF-1.4" + b"\x00" * 100
b64_data = base64.b64encode(pdf_data).decode()
adapter._rpc, _ = _stub_rpc({"data": b64_data})
with patch("gateway.platforms.signal.cache_document_from_bytes", return_value="/tmp/test.pdf"):
path, ext = await adapter._fetch_attachment("doc-456")
assert path == "/tmp/test.pdf"
assert ext == ".pdf"
# ---------------------------------------------------------------------------
# Session Source
# ---------------------------------------------------------------------------
-622
View File
@@ -1,622 +0,0 @@
"""Comprehensive tests for hermes_cli.profiles module.
Tests cover: validation, directory resolution, CRUD operations, active profile
management, export/import, renaming, alias collision checks, profile isolation,
and shell completion generation.
"""
import json
import os
import tarfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
from hermes_cli.profiles import (
validate_profile_name,
get_profile_dir,
create_profile,
delete_profile,
list_profiles,
set_active_profile,
get_active_profile,
get_active_profile_name,
resolve_profile_env,
check_alias_collision,
rename_profile,
export_profile,
import_profile,
generate_bash_completion,
generate_zsh_completion,
_get_profiles_root,
_get_default_hermes_home,
)
# ---------------------------------------------------------------------------
# Shared fixture: redirect Path.home() and HERMES_HOME for profile tests
# ---------------------------------------------------------------------------
@pytest.fixture()
def profile_env(tmp_path, monkeypatch):
"""Set up an isolated environment for profile tests.
* Path.home() -> tmp_path (so _get_profiles_root() = tmp_path/.hermes/profiles)
* HERMES_HOME -> tmp_path/.hermes (so get_hermes_home() agrees)
* Creates the bare-minimum ~/.hermes directory.
"""
monkeypatch.setattr(Path, "home", lambda: tmp_path)
default_home = tmp_path / ".hermes"
default_home.mkdir(exist_ok=True)
monkeypatch.setenv("HERMES_HOME", str(default_home))
return tmp_path
# ===================================================================
# TestValidateProfileName
# ===================================================================
class TestValidateProfileName:
"""Tests for validate_profile_name()."""
@pytest.mark.parametrize("name", ["coder", "work-bot", "a1", "my_agent"])
def test_valid_names_accepted(self, name):
# Should not raise
validate_profile_name(name)
@pytest.mark.parametrize("name", ["UPPER", "has space", ".hidden", "-leading"])
def test_invalid_names_rejected(self, name):
with pytest.raises(ValueError):
validate_profile_name(name)
def test_too_long_rejected(self):
long_name = "a" * 65
with pytest.raises(ValueError):
validate_profile_name(long_name)
def test_max_length_accepted(self):
# 64 chars total: 1 leading + 63 remaining = 64, within [0,63] range
name = "a" * 64
validate_profile_name(name)
def test_default_accepted(self):
# 'default' is a special-case pass-through
validate_profile_name("default")
def test_empty_string_rejected(self):
with pytest.raises(ValueError):
validate_profile_name("")
# ===================================================================
# TestGetProfileDir
# ===================================================================
class TestGetProfileDir:
"""Tests for get_profile_dir()."""
def test_default_returns_hermes_home(self, profile_env):
tmp_path = profile_env
result = get_profile_dir("default")
assert result == tmp_path / ".hermes"
def test_named_profile_returns_profiles_subdir(self, profile_env):
tmp_path = profile_env
result = get_profile_dir("coder")
assert result == tmp_path / ".hermes" / "profiles" / "coder"
# ===================================================================
# TestCreateProfile
# ===================================================================
class TestCreateProfile:
"""Tests for create_profile()."""
def test_creates_directory_with_subdirs(self, profile_env):
profile_dir = create_profile("coder", no_alias=True)
assert profile_dir.is_dir()
for subdir in ["memories", "sessions", "skills", "skins", "logs",
"plans", "workspace", "cron"]:
assert (profile_dir / subdir).is_dir(), f"Missing subdir: {subdir}"
def test_duplicate_raises_file_exists(self, profile_env):
create_profile("coder", no_alias=True)
with pytest.raises(FileExistsError):
create_profile("coder", no_alias=True)
def test_default_raises_value_error(self, profile_env):
with pytest.raises(ValueError, match="default"):
create_profile("default", no_alias=True)
def test_invalid_name_raises_value_error(self, profile_env):
with pytest.raises(ValueError):
create_profile("INVALID!", no_alias=True)
def test_clone_config_copies_files(self, profile_env):
tmp_path = profile_env
default_home = tmp_path / ".hermes"
# Create source config files in default profile
(default_home / "config.yaml").write_text("model: test")
(default_home / ".env").write_text("KEY=val")
(default_home / "SOUL.md").write_text("Be helpful.")
profile_dir = create_profile("coder", clone_config=True, no_alias=True)
assert (profile_dir / "config.yaml").read_text() == "model: test"
assert (profile_dir / ".env").read_text() == "KEY=val"
assert (profile_dir / "SOUL.md").read_text() == "Be helpful."
def test_clone_all_copies_entire_tree(self, profile_env):
tmp_path = profile_env
default_home = tmp_path / ".hermes"
# Populate default with some content
(default_home / "memories").mkdir(exist_ok=True)
(default_home / "memories" / "note.md").write_text("remember this")
(default_home / "config.yaml").write_text("model: gpt-4")
# Runtime files that should be stripped
(default_home / "gateway.pid").write_text("12345")
(default_home / "gateway_state.json").write_text("{}")
(default_home / "processes.json").write_text("[]")
profile_dir = create_profile("coder", clone_all=True, no_alias=True)
# Content should be copied
assert (profile_dir / "memories" / "note.md").read_text() == "remember this"
assert (profile_dir / "config.yaml").read_text() == "model: gpt-4"
# Runtime files should be stripped
assert not (profile_dir / "gateway.pid").exists()
assert not (profile_dir / "gateway_state.json").exists()
assert not (profile_dir / "processes.json").exists()
def test_clone_config_missing_files_skipped(self, profile_env):
"""Clone config gracefully skips files that don't exist in source."""
profile_dir = create_profile("coder", clone_config=True, no_alias=True)
# No error; optional files just not copied
assert not (profile_dir / "config.yaml").exists()
assert not (profile_dir / ".env").exists()
assert not (profile_dir / "SOUL.md").exists()
# ===================================================================
# TestDeleteProfile
# ===================================================================
class TestDeleteProfile:
"""Tests for delete_profile()."""
def test_removes_directory(self, profile_env):
profile_dir = create_profile("coder", no_alias=True)
assert profile_dir.is_dir()
# Mock gateway import to avoid real systemd/launchd interaction
with patch("hermes_cli.profiles._cleanup_gateway_service"):
delete_profile("coder", yes=True)
assert not profile_dir.is_dir()
def test_default_raises_value_error(self, profile_env):
with pytest.raises(ValueError, match="default"):
delete_profile("default", yes=True)
def test_nonexistent_raises_file_not_found(self, profile_env):
with pytest.raises(FileNotFoundError):
delete_profile("nonexistent", yes=True)
# ===================================================================
# TestListProfiles
# ===================================================================
class TestListProfiles:
"""Tests for list_profiles()."""
def test_returns_default_when_no_named_profiles(self, profile_env):
profiles = list_profiles()
names = [p.name for p in profiles]
assert "default" in names
def test_includes_named_profiles(self, profile_env):
create_profile("alpha", no_alias=True)
create_profile("beta", no_alias=True)
profiles = list_profiles()
names = [p.name for p in profiles]
assert "alpha" in names
assert "beta" in names
def test_sorted_alphabetically(self, profile_env):
create_profile("zebra", no_alias=True)
create_profile("alpha", no_alias=True)
create_profile("middle", no_alias=True)
profiles = list_profiles()
named = [p.name for p in profiles if not p.is_default]
assert named == sorted(named)
def test_default_is_first(self, profile_env):
create_profile("alpha", no_alias=True)
profiles = list_profiles()
assert profiles[0].name == "default"
assert profiles[0].is_default is True
# ===================================================================
# TestActiveProfile
# ===================================================================
class TestActiveProfile:
"""Tests for set_active_profile() / get_active_profile()."""
def test_set_and_get_roundtrip(self, profile_env):
create_profile("coder", no_alias=True)
set_active_profile("coder")
assert get_active_profile() == "coder"
def test_no_file_returns_default(self, profile_env):
assert get_active_profile() == "default"
def test_empty_file_returns_default(self, profile_env):
tmp_path = profile_env
active_path = tmp_path / ".hermes" / "active_profile"
active_path.write_text("")
assert get_active_profile() == "default"
def test_set_to_default_removes_file(self, profile_env):
tmp_path = profile_env
create_profile("coder", no_alias=True)
set_active_profile("coder")
active_path = tmp_path / ".hermes" / "active_profile"
assert active_path.exists()
set_active_profile("default")
assert not active_path.exists()
def test_set_nonexistent_raises(self, profile_env):
with pytest.raises(FileNotFoundError):
set_active_profile("nonexistent")
# ===================================================================
# TestGetActiveProfileName
# ===================================================================
class TestGetActiveProfileName:
"""Tests for get_active_profile_name()."""
def test_default_hermes_home_returns_default(self, profile_env):
# HERMES_HOME points to tmp_path/.hermes which is the default
assert get_active_profile_name() == "default"
def test_profile_path_returns_profile_name(self, profile_env, monkeypatch):
tmp_path = profile_env
create_profile("coder", no_alias=True)
profile_dir = tmp_path / ".hermes" / "profiles" / "coder"
monkeypatch.setenv("HERMES_HOME", str(profile_dir))
assert get_active_profile_name() == "coder"
def test_custom_path_returns_custom(self, profile_env, monkeypatch):
tmp_path = profile_env
custom = tmp_path / "some" / "other" / "path"
custom.mkdir(parents=True)
monkeypatch.setenv("HERMES_HOME", str(custom))
assert get_active_profile_name() == "custom"
# ===================================================================
# TestResolveProfileEnv
# ===================================================================
class TestResolveProfileEnv:
"""Tests for resolve_profile_env()."""
def test_existing_profile_returns_path(self, profile_env):
tmp_path = profile_env
create_profile("coder", no_alias=True)
result = resolve_profile_env("coder")
assert result == str(tmp_path / ".hermes" / "profiles" / "coder")
def test_default_returns_default_home(self, profile_env):
tmp_path = profile_env
result = resolve_profile_env("default")
assert result == str(tmp_path / ".hermes")
def test_nonexistent_raises_file_not_found(self, profile_env):
with pytest.raises(FileNotFoundError):
resolve_profile_env("nonexistent")
def test_invalid_name_raises_value_error(self, profile_env):
with pytest.raises(ValueError):
resolve_profile_env("INVALID!")
# ===================================================================
# TestAliasCollision
# ===================================================================
class TestAliasCollision:
"""Tests for check_alias_collision()."""
def test_normal_name_returns_none(self, profile_env):
# Mock 'which' to return not-found
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=1, stdout="")
result = check_alias_collision("mybot")
assert result is None
def test_reserved_name_returns_message(self, profile_env):
result = check_alias_collision("hermes")
assert result is not None
assert "reserved" in result.lower()
def test_subcommand_returns_message(self, profile_env):
result = check_alias_collision("chat")
assert result is not None
assert "subcommand" in result.lower()
def test_default_is_reserved(self, profile_env):
result = check_alias_collision("default")
assert result is not None
assert "reserved" in result.lower()
# ===================================================================
# TestRenameProfile
# ===================================================================
class TestRenameProfile:
"""Tests for rename_profile()."""
def test_renames_directory(self, profile_env):
tmp_path = profile_env
create_profile("oldname", no_alias=True)
old_dir = tmp_path / ".hermes" / "profiles" / "oldname"
assert old_dir.is_dir()
# Mock alias collision to avoid subprocess calls
with patch("hermes_cli.profiles.check_alias_collision", return_value="skip"):
new_dir = rename_profile("oldname", "newname")
assert not old_dir.is_dir()
assert new_dir.is_dir()
assert new_dir == tmp_path / ".hermes" / "profiles" / "newname"
def test_default_raises_value_error(self, profile_env):
with pytest.raises(ValueError, match="default"):
rename_profile("default", "newname")
def test_rename_to_default_raises_value_error(self, profile_env):
create_profile("coder", no_alias=True)
with pytest.raises(ValueError, match="default"):
rename_profile("coder", "default")
def test_nonexistent_raises_file_not_found(self, profile_env):
with pytest.raises(FileNotFoundError):
rename_profile("nonexistent", "newname")
def test_target_exists_raises_file_exists(self, profile_env):
create_profile("alpha", no_alias=True)
create_profile("beta", no_alias=True)
with pytest.raises(FileExistsError):
rename_profile("alpha", "beta")
# ===================================================================
# TestExportImport
# ===================================================================
class TestExportImport:
"""Tests for export_profile() / import_profile()."""
def test_export_creates_tar_gz(self, profile_env, tmp_path):
create_profile("coder", no_alias=True)
# Put a marker file so we can verify content
profile_dir = get_profile_dir("coder")
(profile_dir / "marker.txt").write_text("hello")
output = tmp_path / "export" / "coder.tar.gz"
output.parent.mkdir(parents=True, exist_ok=True)
result = export_profile("coder", str(output))
assert Path(result).exists()
assert tarfile.is_tarfile(str(result))
def test_import_restores_from_archive(self, profile_env, tmp_path):
# Create and export a profile
create_profile("coder", no_alias=True)
profile_dir = get_profile_dir("coder")
(profile_dir / "marker.txt").write_text("hello")
archive_path = tmp_path / "export" / "coder.tar.gz"
archive_path.parent.mkdir(parents=True, exist_ok=True)
export_profile("coder", str(archive_path))
# Delete the profile, then import it back under a new name
import shutil
shutil.rmtree(profile_dir)
assert not profile_dir.is_dir()
imported = import_profile(str(archive_path), name="coder")
assert imported.is_dir()
assert (imported / "marker.txt").read_text() == "hello"
def test_import_to_existing_name_raises(self, profile_env, tmp_path):
create_profile("coder", no_alias=True)
profile_dir = get_profile_dir("coder")
archive_path = tmp_path / "export" / "coder.tar.gz"
archive_path.parent.mkdir(parents=True, exist_ok=True)
export_profile("coder", str(archive_path))
# Importing to same existing name should fail
with pytest.raises(FileExistsError):
import_profile(str(archive_path), name="coder")
def test_export_nonexistent_raises(self, profile_env, tmp_path):
with pytest.raises(FileNotFoundError):
export_profile("nonexistent", str(tmp_path / "out.tar.gz"))
# ===================================================================
# TestProfileIsolation
# ===================================================================
class TestProfileIsolation:
"""Verify that two profiles have completely separate paths."""
def test_separate_config_paths(self, profile_env):
create_profile("alpha", no_alias=True)
create_profile("beta", no_alias=True)
alpha_dir = get_profile_dir("alpha")
beta_dir = get_profile_dir("beta")
assert alpha_dir / "config.yaml" != beta_dir / "config.yaml"
assert str(alpha_dir) not in str(beta_dir)
def test_separate_state_db_paths(self, profile_env):
alpha_dir = get_profile_dir("alpha")
beta_dir = get_profile_dir("beta")
assert alpha_dir / "state.db" != beta_dir / "state.db"
def test_separate_skills_paths(self, profile_env):
create_profile("alpha", no_alias=True)
create_profile("beta", no_alias=True)
alpha_dir = get_profile_dir("alpha")
beta_dir = get_profile_dir("beta")
assert alpha_dir / "skills" != beta_dir / "skills"
# Verify both exist and are independent dirs
assert (alpha_dir / "skills").is_dir()
assert (beta_dir / "skills").is_dir()
# ===================================================================
# TestCompletion
# ===================================================================
class TestCompletion:
"""Tests for bash/zsh completion generators."""
def test_bash_completion_contains_complete(self):
script = generate_bash_completion()
assert len(script) > 0
assert "complete" in script
def test_zsh_completion_contains_compdef(self):
script = generate_zsh_completion()
assert len(script) > 0
assert "compdef" in script
def test_bash_completion_has_hermes_profiles_function(self):
script = generate_bash_completion()
assert "_hermes_profiles" in script
def test_zsh_completion_has_hermes_function(self):
script = generate_zsh_completion()
assert "_hermes" in script
# ===================================================================
# TestGetProfilesRoot / TestGetDefaultHermesHome (internal helpers)
# ===================================================================
class TestInternalHelpers:
"""Tests for _get_profiles_root() and _get_default_hermes_home()."""
def test_profiles_root_under_home(self, profile_env):
tmp_path = profile_env
root = _get_profiles_root()
assert root == tmp_path / ".hermes" / "profiles"
def test_default_hermes_home(self, profile_env):
tmp_path = profile_env
home = _get_default_hermes_home()
assert home == tmp_path / ".hermes"
# ===================================================================
# Edge cases and additional coverage
# ===================================================================
class TestEdgeCases:
"""Additional edge-case tests."""
def test_create_profile_returns_correct_path(self, profile_env):
tmp_path = profile_env
result = create_profile("mybot", no_alias=True)
expected = tmp_path / ".hermes" / "profiles" / "mybot"
assert result == expected
def test_list_profiles_default_info_fields(self, profile_env):
profiles = list_profiles()
default = [p for p in profiles if p.name == "default"][0]
assert default.is_default is True
assert default.gateway_running is False
assert default.skill_count == 0
def test_gateway_running_check_with_pid_file(self, profile_env):
"""Verify _check_gateway_running reads pid file and probes os.kill."""
from hermes_cli.profiles import _check_gateway_running
tmp_path = profile_env
default_home = tmp_path / ".hermes"
# No pid file -> not running
assert _check_gateway_running(default_home) is False
# Write a PID file with a JSON payload
pid_file = default_home / "gateway.pid"
pid_file.write_text(json.dumps({"pid": 99999}))
# os.kill(99999, 0) should raise ProcessLookupError -> not running
assert _check_gateway_running(default_home) is False
# Mock os.kill to simulate a running process
with patch("os.kill", return_value=None):
assert _check_gateway_running(default_home) is True
def test_gateway_running_check_plain_pid(self, profile_env):
"""Pid file containing just a number (legacy format)."""
from hermes_cli.profiles import _check_gateway_running
tmp_path = profile_env
default_home = tmp_path / ".hermes"
pid_file = default_home / "gateway.pid"
pid_file.write_text("99999")
with patch("os.kill", return_value=None):
assert _check_gateway_running(default_home) is True
def test_profile_name_boundary_single_char(self):
"""Single alphanumeric character is valid."""
validate_profile_name("a")
validate_profile_name("1")
def test_profile_name_boundary_all_hyphens(self):
"""Name starting with hyphen is invalid."""
with pytest.raises(ValueError):
validate_profile_name("-abc")
def test_profile_name_underscore_start(self):
"""Name starting with underscore is invalid (must start with [a-z0-9])."""
with pytest.raises(ValueError):
validate_profile_name("_abc")
def test_clone_from_named_profile(self, profile_env):
"""Clone config from a named (non-default) profile."""
tmp_path = profile_env
# Create source profile with config
source_dir = create_profile("source", no_alias=True)
(source_dir / "config.yaml").write_text("model: cloned")
(source_dir / ".env").write_text("SECRET=yes")
target_dir = create_profile(
"target", clone_from="source", clone_config=True, no_alias=True,
)
assert (target_dir / "config.yaml").read_text() == "model: cloned"
assert (target_dir / ".env").read_text() == "SECRET=yes"
def test_delete_clears_active_profile(self, profile_env):
"""Deleting the active profile resets active to default."""
tmp_path = profile_env
create_profile("coder", no_alias=True)
set_active_profile("coder")
assert get_active_profile() == "coder"
with patch("hermes_cli.profiles._cleanup_gateway_service"):
delete_profile("coder", yes=True)
assert get_active_profile() == "default"
-158
View File
@@ -1,158 +0,0 @@
"""Tests for credential file passthrough registry (tools/credential_files.py)."""
import os
from pathlib import Path
import pytest
from tools.credential_files import (
clear_credential_files,
get_credential_file_mounts,
register_credential_file,
register_credential_files,
reset_config_cache,
)
@pytest.fixture(autouse=True)
def _clean_registry():
"""Reset registry between tests."""
clear_credential_files()
reset_config_cache()
yield
clear_credential_files()
reset_config_cache()
class TestRegisterCredentialFile:
def test_registers_existing_file(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "token.json").write_text('{"token": "abc"}')
result = register_credential_file("token.json")
assert result is True
mounts = get_credential_file_mounts()
assert len(mounts) == 1
assert mounts[0]["host_path"] == str(tmp_path / "token.json")
assert mounts[0]["container_path"] == "/root/.hermes/token.json"
def test_skips_missing_file(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
result = register_credential_file("nonexistent.json")
assert result is False
assert get_credential_file_mounts() == []
def test_custom_container_base(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "cred.json").write_text("{}")
register_credential_file("cred.json", container_base="/home/user/.hermes")
mounts = get_credential_file_mounts()
assert mounts[0]["container_path"] == "/home/user/.hermes/cred.json"
def test_deduplicates_by_container_path(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "token.json").write_text("{}")
register_credential_file("token.json")
register_credential_file("token.json")
mounts = get_credential_file_mounts()
assert len(mounts) == 1
class TestRegisterCredentialFiles:
def test_string_entries(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "a.json").write_text("{}")
(tmp_path / "b.json").write_text("{}")
missing = register_credential_files(["a.json", "b.json"])
assert missing == []
assert len(get_credential_file_mounts()) == 2
def test_dict_entries(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "token.json").write_text("{}")
missing = register_credential_files([
{"path": "token.json", "description": "OAuth token"},
])
assert missing == []
assert len(get_credential_file_mounts()) == 1
def test_returns_missing_files(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "exists.json").write_text("{}")
missing = register_credential_files([
"exists.json",
"missing.json",
{"path": "also_missing.json"},
])
assert missing == ["missing.json", "also_missing.json"]
assert len(get_credential_file_mounts()) == 1
def test_empty_list(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
assert register_credential_files([]) == []
class TestConfigCredentialFiles:
def test_loads_from_config(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "oauth.json").write_text("{}")
(tmp_path / "config.yaml").write_text(
"terminal:\n credential_files:\n - oauth.json\n"
)
mounts = get_credential_file_mounts()
assert len(mounts) == 1
assert mounts[0]["host_path"] == str(tmp_path / "oauth.json")
def test_config_skips_missing_files(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "config.yaml").write_text(
"terminal:\n credential_files:\n - nonexistent.json\n"
)
mounts = get_credential_file_mounts()
assert mounts == []
def test_combines_skill_and_config(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
(tmp_path / "skill_token.json").write_text("{}")
(tmp_path / "config_token.json").write_text("{}")
(tmp_path / "config.yaml").write_text(
"terminal:\n credential_files:\n - config_token.json\n"
)
register_credential_file("skill_token.json")
mounts = get_credential_file_mounts()
assert len(mounts) == 2
paths = {m["container_path"] for m in mounts}
assert "/root/.hermes/skill_token.json" in paths
assert "/root/.hermes/config_token.json" in paths
class TestGetMountsRechecksExistence:
def test_removed_file_excluded_from_mounts(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
token = tmp_path / "token.json"
token.write_text("{}")
register_credential_file("token.json")
assert len(get_credential_file_mounts()) == 1
# Delete the file after registration
token.unlink()
assert get_credential_file_mounts() == []
+1 -6
View File
@@ -96,7 +96,6 @@ class TestGetProviderFallbackPriority:
monkeypatch.setenv("GROQ_API_KEY", "gsk-test")
monkeypatch.setenv("VOICE_TOOLS_OPENAI_KEY", "sk-test")
with patch("tools.transcription_tools._HAS_FASTER_WHISPER", False), \
patch("tools.transcription_tools._has_local_command", return_value=False), \
patch("tools.transcription_tools._HAS_OPENAI", True):
from tools.transcription_tools import _get_provider
assert _get_provider({}) == "groq"
@@ -131,10 +130,9 @@ class TestExplicitProviderRespected:
def test_explicit_local_no_fallback_to_openai(self, monkeypatch):
"""GH-1774: provider=local must not silently fall back to openai
even when an OpenAI API key is set."""
monkeypatch.setenv("OPENAI_API_KEY", "***")
monkeypatch.setenv("OPENAI_API_KEY", "sk-real-key-here")
monkeypatch.delenv("GROQ_API_KEY", raising=False)
with patch("tools.transcription_tools._HAS_FASTER_WHISPER", False), \
patch("tools.transcription_tools._has_local_command", return_value=False), \
patch("tools.transcription_tools._HAS_OPENAI", True):
from tools.transcription_tools import _get_provider
result = _get_provider({"provider": "local"})
@@ -143,7 +141,6 @@ class TestExplicitProviderRespected:
def test_explicit_local_no_fallback_to_groq(self, monkeypatch):
monkeypatch.setenv("GROQ_API_KEY", "gsk-test")
with patch("tools.transcription_tools._HAS_FASTER_WHISPER", False), \
patch("tools.transcription_tools._has_local_command", return_value=False), \
patch("tools.transcription_tools._HAS_OPENAI", True):
from tools.transcription_tools import _get_provider
result = _get_provider({"provider": "local"})
@@ -184,7 +181,6 @@ class TestExplicitProviderRespected:
monkeypatch.setenv("OPENAI_API_KEY", "sk-real-key")
monkeypatch.delenv("GROQ_API_KEY", raising=False)
with patch("tools.transcription_tools._HAS_FASTER_WHISPER", False), \
patch("tools.transcription_tools._has_local_command", return_value=False), \
patch("tools.transcription_tools._HAS_OPENAI", True):
from tools.transcription_tools import _get_provider
# Empty dict = no explicit provider, uses DEFAULT_PROVIDER auto-detect
@@ -195,7 +191,6 @@ class TestExplicitProviderRespected:
monkeypatch.setenv("GROQ_API_KEY", "gsk-test")
monkeypatch.setenv("OPENAI_API_KEY", "sk-real-key")
with patch("tools.transcription_tools._HAS_FASTER_WHISPER", False), \
patch("tools.transcription_tools._has_local_command", return_value=False), \
patch("tools.transcription_tools._HAS_OPENAI", True):
from tools.transcription_tools import _get_provider
result = _get_provider({})
-163
View File
@@ -1,163 +0,0 @@
"""Credential file passthrough registry for remote terminal backends.
Skills that declare ``required_credential_files`` in their frontmatter need
those files available inside sandboxed execution environments (Modal, Docker).
By default remote backends create bare containers with no host files.
This module provides a session-scoped registry so skill-declared credential
files (and user-configured overrides) are mounted into remote sandboxes.
Two sources feed the registry:
1. **Skill declarations** when a skill is loaded via ``skill_view``, its
``required_credential_files`` entries are registered here if the files
exist on the host.
2. **User config** ``terminal.credential_files`` in config.yaml lets users
explicitly list additional files to mount.
Remote backends (``tools/environments/modal.py``, ``docker.py``) call
:func:`get_credential_file_mounts` at sandbox creation time.
Each registered entry is a dict::
{
"host_path": "/home/user/.hermes/google_token.json",
"container_path": "/root/.hermes/google_token.json",
}
"""
from __future__ import annotations
import logging
import os
from pathlib import Path
from typing import Dict, List
logger = logging.getLogger(__name__)
# Session-scoped list of credential files to mount.
# Key: container_path (deduplicated), Value: host_path
_registered_files: Dict[str, str] = {}
# Cache for config-based file list (loaded once per process).
_config_files: List[Dict[str, str]] | None = None
def _resolve_hermes_home() -> Path:
return Path(os.environ.get("HERMES_HOME", Path.home() / ".hermes"))
def register_credential_file(
relative_path: str,
container_base: str = "/root/.hermes",
) -> bool:
"""Register a credential file for mounting into remote sandboxes.
*relative_path* is relative to ``HERMES_HOME`` (e.g. ``google_token.json``).
Returns True if the file exists on the host and was registered.
"""
hermes_home = _resolve_hermes_home()
host_path = hermes_home / relative_path
if not host_path.is_file():
logger.debug("credential_files: skipping %s (not found)", host_path)
return False
container_path = f"{container_base.rstrip('/')}/{relative_path}"
_registered_files[container_path] = str(host_path)
logger.debug("credential_files: registered %s -> %s", host_path, container_path)
return True
def register_credential_files(
entries: list,
container_base: str = "/root/.hermes",
) -> List[str]:
"""Register multiple credential files from skill frontmatter entries.
Each entry is either a string (relative path) or a dict with a ``path``
key. Returns the list of relative paths that were NOT found on the host
(i.e. missing files).
"""
missing = []
for entry in entries:
if isinstance(entry, str):
rel_path = entry.strip()
elif isinstance(entry, dict):
rel_path = (entry.get("path") or "").strip()
else:
continue
if not rel_path:
continue
if not register_credential_file(rel_path, container_base):
missing.append(rel_path)
return missing
def _load_config_files() -> List[Dict[str, str]]:
"""Load ``terminal.credential_files`` from config.yaml (cached)."""
global _config_files
if _config_files is not None:
return _config_files
result: List[Dict[str, str]] = []
try:
hermes_home = _resolve_hermes_home()
config_path = hermes_home / "config.yaml"
if config_path.exists():
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
cred_files = cfg.get("terminal", {}).get("credential_files")
if isinstance(cred_files, list):
for item in cred_files:
if isinstance(item, str) and item.strip():
host_path = hermes_home / item.strip()
if host_path.is_file():
container_path = f"/root/.hermes/{item.strip()}"
result.append({
"host_path": str(host_path),
"container_path": container_path,
})
except Exception as e:
logger.debug("Could not read terminal.credential_files from config: %s", e)
_config_files = result
return _config_files
def get_credential_file_mounts() -> List[Dict[str, str]]:
"""Return all credential files that should be mounted into remote sandboxes.
Each item has ``host_path`` and ``container_path`` keys.
Combines skill-registered files and user config.
"""
mounts: Dict[str, str] = {}
# Skill-registered files
for container_path, host_path in _registered_files.items():
# Re-check existence (file may have been deleted since registration)
if Path(host_path).is_file():
mounts[container_path] = host_path
# Config-based files
for entry in _load_config_files():
cp = entry["container_path"]
if cp not in mounts and Path(entry["host_path"]).is_file():
mounts[cp] = entry["host_path"]
return [
{"host_path": hp, "container_path": cp}
for cp, hp in mounts.items()
]
def clear_credential_files() -> None:
"""Reset the skill-scoped registry (e.g. on session reset)."""
_registered_files.clear()
def reset_config_cache() -> None:
"""Force re-read of config on next access (for testing)."""
global _config_files
_config_files = None
+2 -29
View File
@@ -312,24 +312,6 @@ class DockerEnvironment(BaseEnvironment):
elif workspace_explicitly_mounted:
logger.debug("Skipping docker cwd mount: /workspace already mounted by user config")
# Mount credential files (OAuth tokens, etc.) declared by skills.
# Read-only so the container can authenticate but not modify host creds.
try:
from tools.credential_files import get_credential_file_mounts
for mount_entry in get_credential_file_mounts():
volume_args.extend([
"-v",
f"{mount_entry['host_path']}:{mount_entry['container_path']}:ro",
])
logger.info(
"Docker: mounting credential %s -> %s",
mount_entry["host_path"],
mount_entry["container_path"],
)
except Exception as e:
logger.debug("Docker: could not load credential file mounts: %s", e)
logger.info(f"Docker volume_args: {volume_args}")
all_run_args = list(_SECURITY_ARGS) + writable_args + resource_args + volume_args
logger.info(f"Docker run_args: {all_run_args}")
@@ -424,17 +406,8 @@ class DockerEnvironment(BaseEnvironment):
if effective_stdin is not None:
cmd.append("-i")
cmd.extend(["-w", work_dir])
# Combine explicit docker_forward_env with skill-declared env_passthrough
# vars so skills that declare required_environment_variables (e.g. Notion)
# have their keys forwarded into the container automatically.
forward_keys = set(self._forward_env)
try:
from tools.env_passthrough import get_all_passthrough
forward_keys |= get_all_passthrough()
except Exception:
pass
hermes_env = _load_hermes_env_vars() if forward_keys else {}
for key in sorted(forward_keys):
hermes_env = _load_hermes_env_vars() if self._forward_env else {}
for key in self._forward_env:
value = os.getenv(key)
if value is None:
value = hermes_env.get(key)
+2 -91
View File
@@ -137,28 +137,6 @@ class ModalEnvironment(BaseEnvironment):
],
)
# Mount credential files (OAuth tokens, etc.) declared by skills.
# These are read-only copies so the sandbox can authenticate with
# external services but can't modify the host's credentials.
cred_mounts = []
try:
from tools.credential_files import get_credential_file_mounts
for mount_entry in get_credential_file_mounts():
cred_mounts.append(
_modal.Mount.from_local_file(
mount_entry["host_path"],
remote_path=mount_entry["container_path"],
)
)
logger.info(
"Modal: mounting credential %s -> %s",
mount_entry["host_path"],
mount_entry["container_path"],
)
except Exception as e:
logger.debug("Modal: could not load credential file mounts: %s", e)
# Start the async worker thread and create sandbox on it
# so all gRPC channels are bound to the worker's event loop.
self._worker.start()
@@ -167,90 +145,23 @@ class ModalEnvironment(BaseEnvironment):
app = await _modal.App.lookup.aio(
"hermes-agent", create_if_missing=True
)
create_kwargs = dict(sandbox_kwargs)
if cred_mounts:
existing_mounts = list(create_kwargs.pop("mounts", []))
existing_mounts.extend(cred_mounts)
create_kwargs["mounts"] = existing_mounts
sandbox = await _modal.Sandbox.create.aio(
"sleep", "infinity",
image=effective_image,
app=app,
timeout=int(create_kwargs.pop("timeout", 3600)),
**create_kwargs,
timeout=int(sandbox_kwargs.pop("timeout", 3600)),
**sandbox_kwargs,
)
return app, sandbox
self._app, self._sandbox = self._worker.run_coroutine(
_create_sandbox(), timeout=300
)
# Track synced credential files to avoid redundant pushes.
# Key: container_path, Value: (mtime, size) of last synced version.
self._synced_creds: Dict[str, tuple] = {}
logger.info("Modal: sandbox created (task=%s)", self._task_id)
def _sync_credential_files(self) -> None:
"""Push credential files into the running sandbox.
Mounts are set at sandbox creation, but credentials may be created
later (e.g. OAuth setup mid-session). This writes the current file
content into the sandbox via exec(), so new/updated credentials are
available without recreating the sandbox.
"""
try:
from tools.credential_files import get_credential_file_mounts
mounts = get_credential_file_mounts()
if not mounts:
return
for entry in mounts:
host_path = entry["host_path"]
container_path = entry["container_path"]
hp = Path(host_path)
try:
stat = hp.stat()
file_key = (stat.st_mtime, stat.st_size)
except OSError:
continue
# Skip if already synced with same mtime+size
if self._synced_creds.get(container_path) == file_key:
continue
try:
content = hp.read_text(encoding="utf-8")
except Exception:
continue
# Write via base64 to avoid shell escaping issues with JSON
import base64
b64 = base64.b64encode(content.encode("utf-8")).decode("ascii")
container_dir = str(Path(container_path).parent)
cmd = (
f"mkdir -p {shlex.quote(container_dir)} && "
f"echo {shlex.quote(b64)} | base64 -d > {shlex.quote(container_path)}"
)
_cp = container_path # capture for closure
async def _write():
proc = await self._sandbox.exec.aio("bash", "-c", cmd)
await proc.wait.aio()
self._worker.run_coroutine(_write(), timeout=15)
self._synced_creds[container_path] = file_key
logger.debug("Modal: synced credential %s -> %s", host_path, container_path)
except Exception as e:
logger.debug("Modal: credential file sync failed: %s", e)
def execute(self, command: str, cwd: str = "", *,
timeout: int | None = None,
stdin_data: str | None = None) -> dict:
# Sync credential files before each command so mid-session
# OAuth setups are picked up without requiring a restart.
self._sync_credential_files()
if stdin_data is not None:
marker = f"HERMES_EOF_{uuid.uuid4().hex[:8]}"
while marker in stdin_data:
+67 -131
View File
@@ -494,7 +494,7 @@ def _is_skill_disabled(name: str, platform: str = None) -> bool:
def _find_all_skills(*, skip_disabled: bool = False) -> List[Dict[str, Any]]:
"""Recursively find all skills in ~/.hermes/skills/ and external dirs.
"""Recursively find all skills in ~/.hermes/skills/.
Args:
skip_disabled: If True, return ALL skills regardless of disabled
@@ -504,69 +504,60 @@ def _find_all_skills(*, skip_disabled: bool = False) -> List[Dict[str, Any]]:
Returns:
List of skill metadata dicts (name, description, category).
"""
from agent.skill_utils import get_external_skills_dirs
skills = []
seen_names: set = set()
if not SKILLS_DIR.exists():
return skills
# Load disabled set once (not per-skill)
disabled = set() if skip_disabled else _get_disabled_skill_names()
# Scan local dir first, then external dirs (local takes precedence)
dirs_to_scan = []
if SKILLS_DIR.exists():
dirs_to_scan.append(SKILLS_DIR)
dirs_to_scan.extend(get_external_skills_dirs())
for scan_dir in dirs_to_scan:
for skill_md in scan_dir.rglob("SKILL.md"):
if any(part in _EXCLUDED_SKILL_DIRS for part in skill_md.parts):
for skill_md in SKILLS_DIR.rglob("SKILL.md"):
if any(part in _EXCLUDED_SKILL_DIRS for part in skill_md.parts):
continue
skill_dir = skill_md.parent
try:
content = skill_md.read_text(encoding="utf-8")[:4000]
frontmatter, body = _parse_frontmatter(content)
if not skill_matches_platform(frontmatter):
continue
skill_dir = skill_md.parent
try:
content = skill_md.read_text(encoding="utf-8")[:4000]
frontmatter, body = _parse_frontmatter(content)
if not skill_matches_platform(frontmatter):
continue
name = frontmatter.get("name", skill_dir.name)[:MAX_NAME_LENGTH]
if name in seen_names:
continue
if name in disabled:
continue
description = frontmatter.get("description", "")
if not description:
for line in body.strip().split("\n"):
line = line.strip()
if line and not line.startswith("#"):
description = line
break
if len(description) > MAX_DESCRIPTION_LENGTH:
description = description[:MAX_DESCRIPTION_LENGTH - 3] + "..."
category = _get_category_from_path(skill_md)
seen_names.add(name)
skills.append({
"name": name,
"description": description,
"category": category,
})
except (UnicodeDecodeError, PermissionError) as e:
logger.debug("Failed to read skill file %s: %s", skill_md, e)
continue
except Exception as e:
logger.debug(
"Skipping skill at %s: failed to parse: %s", skill_md, e, exc_info=True
)
name = frontmatter.get("name", skill_dir.name)[:MAX_NAME_LENGTH]
if name in disabled:
continue
description = frontmatter.get("description", "")
if not description:
for line in body.strip().split("\n"):
line = line.strip()
if line and not line.startswith("#"):
description = line
break
if len(description) > MAX_DESCRIPTION_LENGTH:
description = description[:MAX_DESCRIPTION_LENGTH - 3] + "..."
category = _get_category_from_path(skill_md)
skills.append({
"name": name,
"description": description,
"category": category,
})
except (UnicodeDecodeError, PermissionError) as e:
logger.debug("Failed to read skill file %s: %s", skill_md, e)
continue
except Exception as e:
logger.debug(
"Skipping skill at %s: failed to parse: %s", skill_md, e, exc_info=True
)
continue
return skills
@@ -765,15 +756,7 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
JSON string with skill content or error message
"""
try:
from agent.skill_utils import get_external_skills_dirs
# Build list of all skill directories to search
all_dirs = []
if SKILLS_DIR.exists():
all_dirs.append(SKILLS_DIR)
all_dirs.extend(get_external_skills_dirs())
if not all_dirs:
if not SKILLS_DIR.exists():
return json.dumps(
{
"success": False,
@@ -785,37 +768,27 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
skill_dir = None
skill_md = None
# Search all dirs: local first, then external (first match wins)
for search_dir in all_dirs:
# Try direct path first (e.g., "mlops/axolotl")
direct_path = search_dir / name
if direct_path.is_dir() and (direct_path / "SKILL.md").exists():
skill_dir = direct_path
skill_md = direct_path / "SKILL.md"
break
elif direct_path.with_suffix(".md").exists():
skill_md = direct_path.with_suffix(".md")
break
# Try direct path first (e.g., "mlops/axolotl")
direct_path = SKILLS_DIR / name
if direct_path.is_dir() and (direct_path / "SKILL.md").exists():
skill_dir = direct_path
skill_md = direct_path / "SKILL.md"
elif direct_path.with_suffix(".md").exists():
skill_md = direct_path.with_suffix(".md")
# Search by directory name across all dirs
# Search by directory name
if not skill_md:
for search_dir in all_dirs:
for found_skill_md in search_dir.rglob("SKILL.md"):
if found_skill_md.parent.name == name:
skill_dir = found_skill_md.parent
skill_md = found_skill_md
break
if skill_md:
for found_skill_md in SKILLS_DIR.rglob("SKILL.md"):
if found_skill_md.parent.name == name:
skill_dir = found_skill_md.parent
skill_md = found_skill_md
break
# Legacy: flat .md files
if not skill_md:
for search_dir in all_dirs:
for found_md in search_dir.rglob(f"{name}.md"):
if found_md.name != "SKILL.md":
skill_md = found_md
break
if skill_md:
for found_md in SKILLS_DIR.rglob(f"{name}.md"):
if found_md.name != "SKILL.md":
skill_md = found_md
break
if not skill_md or not skill_md.exists():
@@ -842,21 +815,12 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
ensure_ascii=False,
)
# Security: warn if skill is loaded from outside trusted directories
# (local skills dir + configured external_dirs are all trusted)
_outside_skills_dir = True
_trusted_dirs = [SKILLS_DIR.resolve()]
# Security: warn if skill is loaded from outside the trusted skills directory
try:
_trusted_dirs.extend(d.resolve() for d in all_dirs[1:])
except Exception:
pass
for _td in _trusted_dirs:
try:
skill_md.resolve().relative_to(_td)
_outside_skills_dir = False
break
except ValueError:
continue
skill_md.resolve().relative_to(SKILLS_DIR.resolve())
_outside_skills_dir = False
except ValueError:
_outside_skills_dir = True
# Security: detect common prompt injection patterns
_INJECTION_PATTERNS = [
@@ -1094,11 +1058,7 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
if script_files:
linked_files["scripts"] = script_files
try:
rel_path = str(skill_md.relative_to(SKILLS_DIR))
except ValueError:
# External skill — use path relative to the skill's own parent dir
rel_path = str(skill_md.relative_to(skill_md.parent.parent)) if skill_md.parent.parent else skill_md.name
rel_path = str(skill_md.relative_to(SKILLS_DIR))
skill_name = frontmatter.get(
"name", skill_md.stem if not skill_dir else skill_dir.name
)
@@ -1146,27 +1106,6 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
exc_info=True,
)
# Register credential files for mounting into remote sandboxes
# (Modal, Docker). Files that exist on the host are registered;
# missing ones are added to the setup_needed indicators.
required_cred_files_raw = frontmatter.get("required_credential_files", [])
if not isinstance(required_cred_files_raw, list):
required_cred_files_raw = []
missing_cred_files: list = []
if required_cred_files_raw:
try:
from tools.credential_files import register_credential_files
missing_cred_files = register_credential_files(required_cred_files_raw)
if missing_cred_files:
setup_needed = True
except Exception:
logger.debug(
"Could not register credential files for skill %s",
skill_name,
exc_info=True,
)
result = {
"success": True,
"name": skill_name,
@@ -1182,7 +1121,6 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
"required_environment_variables": required_env_vars,
"required_commands": [],
"missing_required_environment_variables": remaining_missing_required_envs,
"missing_credential_files": missing_cred_files,
"missing_required_commands": [],
"setup_needed": setup_needed,
"setup_skipped": capture_result["setup_skipped"],
@@ -1201,8 +1139,6 @@ def skill_view(name: str, file_path: str = None, task_id: str = None) -> str:
if setup_needed:
missing_items = [
f"env ${env_name}" for env_name in remaining_missing_required_envs
] + [
f"file {path}" for path in missing_cred_files
]
setup_note = _build_setup_note(
SkillReadinessStatus.SETUP_NEEDED,
+3 -4
View File
@@ -48,7 +48,6 @@ logger = logging.getLogger(__name__)
# long-running subprocesses immediately instead of blocking until timeout.
# ---------------------------------------------------------------------------
from tools.interrupt import is_interrupted, _interrupt_event # noqa: F401 — re-exported
from hermes_constants import display_hermes_home
# =============================================================================
@@ -158,7 +157,7 @@ def _handle_sudo_failure(output: str, env_type: str) -> str:
for failure in sudo_failures:
if failure in output:
return output + f"\n\n💡 Tip: To enable sudo over messaging, add SUDO_PASSWORD to {display_hermes_home()}/.env on the agent machine."
return output + "\n\n💡 Tip: To enable sudo over messaging, add SUDO_PASSWORD to ~/.hermes/.env on the agent machine."
return output
@@ -444,7 +443,7 @@ def _parse_env_var(name: str, default: str, converter=int, type_label: str = "in
except (ValueError, json.JSONDecodeError):
raise ValueError(
f"Invalid value for {name}: {raw!r} (expected {type_label}). "
f"Check {display_hermes_home()}/.env or environment variables."
f"Check ~/.hermes/.env or environment variables."
)
@@ -1284,7 +1283,7 @@ if __name__ == "__main__":
print(f" TERMINAL_MODAL_IMAGE: {os.getenv('TERMINAL_MODAL_IMAGE', default_img)}")
print(f" TERMINAL_DAYTONA_IMAGE: {os.getenv('TERMINAL_DAYTONA_IMAGE', default_img)}")
print(f" TERMINAL_CWD: {os.getenv('TERMINAL_CWD', os.getcwd())}")
print(f" TERMINAL_SANDBOX_DIR: {os.getenv('TERMINAL_SANDBOX_DIR', f'{display_hermes_home()}/sandboxes')}")
print(f" TERMINAL_SANDBOX_DIR: {os.getenv('TERMINAL_SANDBOX_DIR', '~/.hermes/sandboxes')}")
print(f" TERMINAL_TIMEOUT: {os.getenv('TERMINAL_TIMEOUT', '60')}")
print(f" TERMINAL_LIFETIME_SECONDS: {os.getenv('TERMINAL_LIFETIME_SECONDS', '300')}")
+2 -2
View File
@@ -33,7 +33,7 @@ import subprocess
import tempfile
import threading
from pathlib import Path
from hermes_constants import get_hermes_home, display_hermes_home
from hermes_constants import get_hermes_home
from typing import Callable, Dict, Any, Optional
logger = logging.getLogger(__name__)
@@ -832,7 +832,7 @@ TTS_SCHEMA = {
},
"output_path": {
"type": "string",
"description": f"Optional custom file path to save the audio. Defaults to {display_hermes_home()}/cache/audio/<timestamp>.mp3"
"description": "Optional custom file path to save the audio. Defaults to ~/.hermes/cache/audio/<timestamp>.mp3"
}
},
"required": ["text"]
@@ -90,7 +90,6 @@ pytest tests/ -v
- **Comments**: Only when explaining non-obvious intent, trade-offs, or API quirks
- **Error handling**: Catch specific exceptions. Use `logger.warning()`/`logger.error()` with `exc_info=True` for unexpected errors
- **Cross-platform**: Never assume Unix (see below)
- **Profile-safe paths**: Never hardcode `~/.hermes` — use `get_hermes_home()` from `hermes_constants` for code paths and `display_hermes_home()` for user-facing messages. See [AGENTS.md](https://github.com/NousResearch/hermes-agent/blob/main/AGENTS.md#profiles-multi-instance-support) for full rules.
## Cross-Platform Compatibility
@@ -168,38 +168,11 @@ required_environment_variables:
The user can skip setup and keep loading the skill. Hermes never exposes the raw secret value to the model. Gateway and messaging sessions show local setup guidance instead of collecting secrets in-band.
:::tip Sandbox Passthrough
When your skill is loaded, any declared `required_environment_variables` that are set are **automatically passed through** to `execute_code` and `terminal` sandboxes — including remote backends like Docker and Modal. Your skill's scripts can access `$TENOR_API_KEY` (or `os.environ["TENOR_API_KEY"]` in Python) without the user needing to configure anything extra. See [Environment Variable Passthrough](/docs/user-guide/security#environment-variable-passthrough) for details.
When your skill is loaded, any declared `required_environment_variables` that are set are **automatically passed through** to `execute_code` and `terminal` sandboxes. Your skill's scripts can access `$TENOR_API_KEY` (or `os.environ["TENOR_API_KEY"]` in Python) without the user needing to configure anything extra. See [Environment Variable Passthrough](/docs/user-guide/security#environment-variable-passthrough) for details.
:::
Legacy `prerequisites.env_vars` remains supported as a backward-compatible alias.
### Credential File Requirements (OAuth tokens, etc.)
Skills that use OAuth or file-based credentials can declare files that need to be mounted into remote sandboxes. This is for credentials stored as **files** (not env vars) — typically OAuth token files produced by a setup script.
```yaml
required_credential_files:
- path: google_token.json
description: Google OAuth2 token (created by setup script)
- path: google_client_secret.json
description: Google OAuth2 client credentials
```
Each entry supports:
- `path` (required) — file path relative to `~/.hermes/`
- `description` (optional) — explains what the file is and how it's created
When loaded, Hermes checks if these files exist. Missing files trigger `setup_needed`. Existing files are automatically:
- **Mounted into Docker** containers as read-only bind mounts
- **Synced into Modal** sandboxes (at creation + before each command, so mid-session OAuth works)
- Available on **local** backend without any special handling
:::tip When to use which
Use `required_environment_variables` for simple API keys and tokens (strings stored in `~/.hermes/.env`). Use `required_credential_files` for OAuth token files, client secrets, service account JSON, certificates, or any credential that's a file on disk.
:::
See the `skills/productivity/google-workspace/SKILL.md` for a complete example using both.
## Skill Guidelines
### No External Dependencies
@@ -105,7 +105,7 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
|----------|-------------|
| `TERMINAL_ENV` | Backend: `local`, `docker`, `ssh`, `singularity`, `modal`, `daytona` |
| `TERMINAL_DOCKER_IMAGE` | Docker image (default: `python:3.11`) |
| `TERMINAL_DOCKER_FORWARD_ENV` | JSON array of env var names to explicitly forward into Docker terminal sessions. Note: skill-declared `required_environment_variables` are forwarded automatically — you only need this for vars not declared by any skill. |
| `TERMINAL_DOCKER_FORWARD_ENV` | JSON array of env var names to explicitly forward into Docker terminal sessions |
| `TERMINAL_DOCKER_VOLUMES` | Additional Docker volume mounts (comma-separated `host:container` pairs) |
| `TERMINAL_DOCKER_MOUNT_CWD_TO_WORKSPACE` | Advanced opt-in: mount the launch cwd into Docker `/workspace` (`true`/`false`, default: `false`) |
| `TERMINAL_SINGULARITY_IMAGE` | Singularity image or `.sif` path |
@@ -200,8 +200,6 @@ For native Anthropic auth, Hermes prefers Claude Code's own credential files whe
| `MATTERMOST_TOKEN` | Bot token or personal access token for Mattermost |
| `MATTERMOST_ALLOWED_USERS` | Comma-separated Mattermost user IDs allowed to message the bot |
| `MATTERMOST_HOME_CHANNEL` | Channel ID for proactive message delivery (cron, notifications) |
| `MATTERMOST_REQUIRE_MENTION` | Require `@mention` in channels (default: `true`). Set to `false` to respond to all messages. |
| `MATTERMOST_FREE_RESPONSE_CHANNELS` | Comma-separated channel IDs where bot responds without `@mention` |
| `MATTERMOST_REPLY_MODE` | Reply style: `thread` (threaded replies) or `off` (flat messages, default) |
| `MATRIX_HOMESERVER` | Matrix homeserver URL (e.g. `https://matrix.org`) |
| `MATRIX_ACCESS_TOKEN` | Matrix access token for bot authentication |
-38
View File
@@ -489,44 +489,6 @@ If an MCP server crashes mid-request, Hermes will report a timeout. Check the se
---
## Profiles
### How do profiles differ from just setting HERMES_HOME?
Profiles are a managed layer on top of `HERMES_HOME`. You *could* manually set `HERMES_HOME=/some/path` before every command, but profiles handle all the plumbing for you: creating the directory structure, generating shell aliases (`hermes-work`), tracking the active profile in `~/.hermes/active_profile`, and syncing skill updates across all profiles automatically. They also integrate with tab completion so you don't have to remember paths.
### Can two profiles share the same bot token?
No. Each messaging platform (Telegram, Discord, etc.) requires exclusive access to a bot token. If two profiles try to use the same token simultaneously, the second gateway will fail to connect. Create a separate bot per profile — for Telegram, talk to [@BotFather](https://t.me/BotFather) to make additional bots.
### Do profiles share memory or sessions?
No. Each profile has its own memory store, session database, and skills directory. They are completely isolated. If you want to start a new profile with existing memories and sessions, use `hermes profile create newname --clone-all` to copy everything from the current profile.
### What happens when I run `hermes update`?
`hermes update` pulls the latest code and reinstalls dependencies **once** (not per-profile). It then syncs updated skills to all profiles automatically. You only need to run `hermes update` once — it covers every profile on the machine.
### Can I move a profile to a different machine?
Yes. Export the profile to a portable archive and import it on the other machine:
```bash
# On the source machine
hermes profile export work ./work-backup.tar.gz
# Copy the file to the target machine, then:
hermes profile import ./work-backup.tar.gz work
```
The imported profile will have all config, memories, sessions, and skills from the export. You may need to update paths or re-authenticate with providers if the new machine has a different setup.
### How many profiles can I run?
There is no hard limit. Each profile is just a directory under `~/.hermes/profiles/`. The practical limit depends on your disk space and how many concurrent gateways your system can handle (each gateway is a lightweight Python process). Running dozens of profiles is fine; each idle profile uses no resources.
---
## Still Stuck?
If your issue isn't covered here:
-280
View File
@@ -1,280 +0,0 @@
---
sidebar_position: 7
---
# Profile Commands Reference
This page covers all commands related to [Hermes profiles](../user-guide/profiles.md). For general CLI commands, see [CLI Commands Reference](./cli-commands.md).
## `hermes profile`
```bash
hermes profile <subcommand>
```
Top-level command for managing profiles. Running `hermes profile` without a subcommand shows help.
| Subcommand | Description |
|------------|-------------|
| `list` | List all profiles. |
| `use` | Set the active (default) profile. |
| `create` | Create a new profile. |
| `delete` | Delete a profile. |
| `show` | Show details about a profile. |
| `alias` | Regenerate the shell alias for a profile. |
| `rename` | Rename a profile. |
| `export` | Export a profile to a tar.gz archive. |
| `import` | Import a profile from a tar.gz archive. |
## `hermes profile list`
```bash
hermes profile list
```
Lists all profiles. The currently active profile is marked with `*`.
**Example:**
```bash
$ hermes profile list
default
* work
dev
personal
```
No options.
## `hermes profile use`
```bash
hermes profile use <name>
```
Sets `<name>` as the active profile. All subsequent `hermes` commands (without `-p`) will use this profile.
| Argument | Description |
|----------|-------------|
| `<name>` | Profile name to activate. Use `default` to return to the base profile. |
**Example:**
```bash
hermes profile use work
hermes profile use default
```
## `hermes profile create`
```bash
hermes profile create <name> [options]
```
Creates a new profile.
| Argument / Option | Description |
|-------------------|-------------|
| `<name>` | Name for the new profile. Must be a valid directory name (alphanumeric, hyphens, underscores). |
| `--clone` | Copy `config.yaml`, `.env`, and `SOUL.md` from the current profile. |
| `--clone-all` | Copy everything (config, memories, skills, sessions, state) from the current profile. |
| `--from <profile>` | Clone from a specific profile instead of the current one. Used with `--clone` or `--clone-all`. |
**Examples:**
```bash
# Blank profile — needs full setup
hermes profile create mybot
# Clone config only from current profile
hermes profile create work --clone
# Clone everything from current profile
hermes profile create backup --clone-all
# Clone config from a specific profile
hermes profile create work2 --clone --from work
```
## `hermes profile delete`
```bash
hermes profile delete <name> [options]
```
Deletes a profile and removes its shell alias.
| Argument / Option | Description |
|-------------------|-------------|
| `<name>` | Profile to delete. |
| `--yes`, `-y` | Skip confirmation prompt. |
**Example:**
```bash
hermes profile delete mybot
hermes profile delete mybot --yes
```
:::warning
This permanently deletes the profile's entire directory including all config, memories, sessions, and skills. Cannot delete the currently active profile.
:::
## `hermes profile show`
```bash
hermes profile show [name]
```
Displays details about a profile including its home directory, configured model, active platforms, and disk usage.
| Argument | Description |
|----------|-------------|
| `[name]` | Profile to inspect. Defaults to the current active profile if omitted. |
**Example:**
```bash
$ hermes profile show work
Profile: work
Home: ~/.hermes/profiles/work
Model: anthropic/claude-sonnet-4
Platforms: telegram, discord
Skills: 12 installed
Disk: 48 MB
```
## `hermes profile alias`
```bash
hermes profile alias <name>
```
Regenerates the shell alias script at `~/.local/bin/hermes-<name>`. Useful if the alias was accidentally deleted or if you need to update it after moving your Hermes installation.
| Argument | Description |
|----------|-------------|
| `<name>` | Profile to create/update the alias for. |
**Example:**
```bash
hermes profile alias work
# Creates/updates ~/.local/bin/hermes-work
```
## `hermes profile rename`
```bash
hermes profile rename <old-name> <new-name>
```
Renames a profile. Updates the directory and shell alias.
| Argument | Description |
|----------|-------------|
| `<old-name>` | Current profile name. |
| `<new-name>` | New profile name. |
**Example:**
```bash
hermes profile rename mybot assistant
# ~/.hermes/profiles/mybot → ~/.hermes/profiles/assistant
# ~/.local/bin/hermes-mybot → ~/.local/bin/hermes-assistant
```
## `hermes profile export`
```bash
hermes profile export <name> <output-path>
```
Exports a profile as a compressed tar.gz archive.
| Argument | Description |
|----------|-------------|
| `<name>` | Profile to export. |
| `<output-path>` | Path for the output archive (e.g., `./work-backup.tar.gz`). |
**Example:**
```bash
hermes profile export work ./work-2026-03-29.tar.gz
```
## `hermes profile import`
```bash
hermes profile import <archive-path> [name]
```
Imports a profile from a tar.gz archive.
| Argument | Description |
|----------|-------------|
| `<archive-path>` | Path to the tar.gz archive to import. |
| `[name]` | Name for the imported profile. Defaults to the original profile name from the archive. |
**Example:**
```bash
hermes profile import ./work-2026-03-29.tar.gz work-restored
```
## `hermes -p` / `hermes --profile`
```bash
hermes -p <name> <command> [options]
hermes --profile <name> <command> [options]
```
Global flag to run any Hermes command under a specific profile without changing the sticky default. This overrides the active profile for the duration of the command.
| Option | Description |
|--------|-------------|
| `-p <name>`, `--profile <name>` | Profile to use for this command. |
**Examples:**
```bash
hermes -p work chat -q "Check the server status"
hermes --profile dev gateway start
hermes -p personal skills list
hermes -p work config edit
```
## `hermes completion`
```bash
hermes completion <shell>
```
Generates shell completion scripts. Includes completions for profile names and profile subcommands.
| Argument | Description |
|----------|-------------|
| `<shell>` | Shell to generate completions for: `bash`, `zsh`, or `fish`. |
**Examples:**
```bash
# Install completions
hermes completion bash >> ~/.bashrc
hermes completion zsh >> ~/.zshrc
hermes completion fish > ~/.config/fish/completions/hermes.fish
# Reload shell
source ~/.bashrc
```
After installation, tab completion works for:
- `hermes profile <TAB>` — subcommands (list, use, create, etc.)
- `hermes profile use <TAB>` — profile names
- `hermes -p <TAB>` — profile names
## See also
- [Profiles User Guide](../user-guide/profiles.md)
- [CLI Commands Reference](./cli-commands.md)
- [FAQ — Profiles section](./faq.md#profiles)
+2 -1
View File
@@ -253,7 +253,8 @@ Skills for academic research, paper discovery, literature review, domain reconna
| `arxiv` | Search and retrieve academic papers from arXiv using their free REST API. No API key needed. Search by keyword, author, category, or ID. Combine with web_extract or the ocr-and-documents skill to read full paper content. | `research/arxiv` |
| `blogwatcher` | Monitor blogs and RSS/Atom feeds for updates using the blogwatcher CLI. Add blogs, scan for new articles, and track what you've read. | `research/blogwatcher` |
| `domain-intel` | Passive domain reconnaissance using Python stdlib. Subdomain discovery, SSL certificate inspection, WHOIS lookups, DNS records, domain availability checks, and bulk multi-domain analysis. No API keys required. | `research/domain-intel` |
| `duckduckgo-search` | Free web search via DuckDuckGo — text, news, images, videos. No API key needed. Prefer the `ddgs` CLI when installed; use the Python DDGS library only after verifying that `ddgs` is available in the current runtime. | `research/duckduckgo-search` |
| `duckduckgo-search` | Free web search via DuckDuckGo — text, news, images, videos. No API key needed. Use the Python DDGS library or CLI to search, then web_extract for full content. | `research/duckduckgo-search` |
| `parallel-cli` | Optional vendor skill for Parallel CLI — agent-native web search, extraction, deep research, enrichment, FindAll, and monitoring. | `research/parallel-cli` |
| `ml-paper-writing` | Write publication-ready ML/AI papers for NeurIPS, ICML, ICLR, ACL, AAAI, COLM. Use when drafting papers from research repos, structuring arguments, verifying citations, or preparing camera-ready submissions. Includes LaTeX templates, reviewer guidelines, and citation verificatio… | `research/ml-paper-writing` |
| `polymarket` | Query Polymarket prediction market data — search markets, get prices, orderbooks, and price history. Read-only via public REST APIs, no API key needed. | `research/polymarket` |
+1 -44
View File
@@ -8,9 +8,7 @@ description: "On-demand knowledge documents — progressive disclosure, agent-ma
Skills are on-demand knowledge documents the agent can load when needed. They follow a **progressive disclosure** pattern to minimize token usage and are compatible with the [agentskills.io](https://agentskills.io/specification) open standard.
All skills live in **`~/.hermes/skills/`** — the primary directory and source of truth. On fresh install, bundled skills are copied from the repo. Hub-installed and agent-created skills also go here. The agent can modify or delete any skill.
You can also point Hermes at **external skill directories** — additional folders scanned alongside the local one. See [External Skill Directories](#external-skill-directories) below.
All skills live in **`~/.hermes/skills/`** — a single directory that serves as the source of truth. On fresh install, bundled skills are copied from the repo. Hub-installed and agent-created skills also go here. The agent can modify or delete any skill.
See also:
@@ -166,47 +164,6 @@ Once set, declared env vars are **automatically passed through** to `execute_cod
└── .bundled_manifest # Tracks seeded bundled skills
```
## External Skill Directories
If you maintain skills outside of Hermes — for example, a shared `~/.agents/skills/` directory used by multiple AI tools — you can tell Hermes to scan those directories too.
Add `external_dirs` under the `skills` section in `~/.hermes/config.yaml`:
```yaml
skills:
external_dirs:
- ~/.agents/skills
- /home/shared/team-skills
- ${SKILLS_REPO}/skills
```
Paths support `~` expansion and `${VAR}` environment variable substitution.
### How it works
- **Read-only**: External dirs are only scanned for skill discovery. When the agent creates or edits a skill, it always writes to `~/.hermes/skills/`.
- **Local precedence**: If the same skill name exists in both the local dir and an external dir, the local version wins.
- **Full integration**: External skills appear in the system prompt index, `skills_list`, `skill_view`, and as `/skill-name` slash commands — no different from local skills.
- **Non-existent paths are silently skipped**: If a configured directory doesn't exist, Hermes ignores it without errors. Useful for optional shared directories that may not be present on every machine.
### Example
```text
~/.hermes/skills/ # Local (primary, read-write)
├── devops/deploy-k8s/
│ └── SKILL.md
└── mlops/axolotl/
└── SKILL.md
~/.agents/skills/ # External (read-only, shared)
├── my-custom-workflow/
│ └── SKILL.md
└── team-conventions/
└── SKILL.md
```
All four skills appear in your skill index. If you create a new skill called `my-custom-workflow` locally, it shadows the external version.
## Agent-Managed Skills (skill_manage tool)
The agent can create, update, and delete its own skills via the `skill_manage` tool. This is the agent's **procedural memory** — when it figures out a non-trivial workflow, it saves the approach as a skill for future reuse.
@@ -149,12 +149,6 @@ MATTERMOST_ALLOWED_USERS=3uo8dkh1p7g1mfk49ear5fzs5c
# Optional: reply mode (thread or off, default: off)
# MATTERMOST_REPLY_MODE=thread
# Optional: respond without @mention (default: true = require mention)
# MATTERMOST_REQUIRE_MENTION=false
# Optional: channels where bot responds without @mention (comma-separated channel IDs)
# MATTERMOST_FREE_RESPONSE_CHANNELS=channel_id_1,channel_id_2
```
Optional behavior settings in `~/.hermes/config.yaml`:
@@ -212,19 +206,6 @@ Set it in your `~/.hermes/.env`:
MATTERMOST_REPLY_MODE=thread
```
## Mention Behavior
By default, the bot only responds in channels when `@mentioned`. You can change this:
| Variable | Default | Description |
|----------|---------|-------------|
| `MATTERMOST_REQUIRE_MENTION` | `true` | Set to `false` to respond to all messages in channels (DMs always work). |
| `MATTERMOST_FREE_RESPONSE_CHANNELS` | _(none)_ | Comma-separated channel IDs where the bot responds without `@mention`, even when require_mention is true. |
To find a channel ID in Mattermost: open the channel, click the channel name header, and look for the ID in the URL or channel details.
When the bot is `@mentioned`, the mention is automatically stripped from the message before processing.
## Troubleshooting
### Bot is not responding to messages
-244
View File
@@ -1,244 +0,0 @@
---
sidebar_position: 2
---
# Profiles: Running Multiple Agents
Run multiple independent Hermes agents on the same machine — each with its own config, memory, sessions, and gateway.
## What are profiles?
A profile is a fully isolated Hermes environment. Each profile gets its own `HERMES_HOME` directory containing its own `config.yaml`, `.env`, `SOUL.md`, memories, sessions, skills, and state database. Profiles let you run separate agents for different purposes — a personal assistant, a work bot, a dev agent — without any cross-contamination.
Each profile also gets a shell alias (e.g., `hermes-work`) so you can launch it directly without flags.
## Quick start
```bash
# Create a profile called "work"
hermes profile create work
# Switch to it as the default
hermes profile use work
# Launch — now everything uses the "work" environment
hermes
```
That's it. From now on, `hermes` uses the "work" profile until you switch back.
## Creating a profile
### Blank profile
```bash
hermes profile create mybot
```
Creates a fresh, empty profile. You'll need to run `hermes setup` (or `hermes-mybot setup`) to configure it from scratch — provider, model, gateway tokens, etc.
### Clone config only (`--clone`)
```bash
hermes profile create work --clone
```
Copies your current profile's `config.yaml`, `.env`, and `SOUL.md` into the new profile. This gives you the same provider/model setup without copying memories, sessions, or skills. Useful when you want a second agent with the same API keys but different personality or gateway tokens.
### Clone everything (`--clone-all`)
```bash
hermes profile create backup --clone-all
```
Copies **everything** — config, memories, sessions, skills, state database, the lot. This is a full snapshot of your current profile. Useful for creating a backup or forking an agent that already has learned context.
## Using profiles
### Shell aliases
Every profile gets an alias installed to `~/.local/bin/`:
```bash
hermes-work # Runs hermes with the "work" profile
hermes-mybot # Runs hermes with the "mybot" profile
hermes-backup # Runs hermes with the "backup" profile
```
These aliases work with all subcommands:
```bash
hermes-work chat -q "Check my calendar"
hermes-work gateway start
hermes-work skills list
```
### Sticky default (`hermes profile use`)
```bash
hermes profile use work
```
Sets "work" as the active profile. Now plain `hermes` uses the work profile — no alias or flag needed. The active profile is stored in `~/.hermes/active_profile`.
Switch back to the default profile:
```bash
hermes profile use default
```
### One-off with `-p` flag
```bash
hermes -p work chat -q "Summarize my inbox"
hermes -p mybot gateway status
```
The `-p` / `--profile` flag overrides the sticky default for a single command without changing it.
## Running gateways
Each profile runs its own independent gateway. This means you can have multiple bots online simultaneously — for example, a personal Telegram bot and a team Discord bot:
```bash
hermes-personal gateway start # Starts personal bot's gateway
hermes-work gateway start # Starts work bot's gateway
```
Each gateway uses the tokens and platform config from its own profile's `config.yaml` and `.env`. There are no port or token conflicts because each profile is fully isolated.
:::warning
Each bot token (Telegram, Discord, etc.) can only be used by **one** profile at a time. If two profiles try to use the same token, the second gateway will fail to connect. Use a separate bot token per profile.
:::
## Configuring profiles
Each profile has its own independent configuration files:
```
~/.hermes/profiles/work/
├── config.yaml # Model, provider, gateway settings
├── .env # API keys, bot tokens
├── SOUL.md # Personality / system prompt
├── skills/ # Installed skills
├── memories/ # Agent memories
├── state.db # Sessions, conversation history
└── logs/ # Gateway and agent logs
```
Edit a profile's config directly:
```bash
hermes-work config edit # Opens work profile's config.yaml
hermes -p work setup # Run setup wizard for work profile
```
Or edit the files manually:
```bash
nano ~/.hermes/profiles/work/config.yaml
nano ~/.hermes/profiles/work/.env
nano ~/.hermes/profiles/work/SOUL.md
```
The default profile lives at `~/.hermes/` (not in the `profiles/` subdirectory).
## Updating
```bash
hermes update
```
`hermes update` pulls the latest code and reinstalls dependencies once. It then syncs the updated skills to **all** profiles automatically. You don't need to run update separately for each profile — one update covers everything.
## Managing profiles
### List profiles
```bash
hermes profile list
```
Shows all profiles with their status. The active profile is marked with an asterisk:
```
default
* work
mybot
backup
```
### Show profile details
```bash
hermes profile show work
```
Displays the profile's home directory, config path, active model, configured platforms, and other details.
### Rename a profile
```bash
hermes profile rename mybot assistant
```
Renames the profile directory and updates the shell alias from `hermes-mybot` to `hermes-assistant`.
### Export a profile
```bash
hermes profile export work ./work-backup.tar.gz
```
Packages the entire profile into a portable archive. Useful for backups or transferring to another machine.
### Import a profile
```bash
hermes profile import ./work-backup.tar.gz work-restored
```
Imports a previously exported profile archive as a new profile.
## Deleting a profile
```bash
hermes profile delete mybot
```
Removes the profile directory and its shell alias. You'll be prompted to confirm. This permanently deletes all config, memories, sessions, and skills for that profile.
:::warning
Deletion is irreversible. Export the profile first if you might need it later: `hermes profile export mybot ./mybot-backup.tar.gz`
:::
You cannot delete the currently active profile. Switch to a different one first:
```bash
hermes profile use default
hermes profile delete mybot
```
## Tab completion
Enable shell completions for profile names and subcommands:
```bash
# Generate completions for your shell
hermes completion bash >> ~/.bashrc
hermes completion zsh >> ~/.zshrc
hermes completion fish > ~/.config/fish/completions/hermes.fish
# Reload your shell
source ~/.bashrc # or ~/.zshrc
```
After setup, `hermes profile <TAB>` autocompletes subcommands and `hermes -p <TAB>` autocompletes profile names.
## How it works
Under the hood, each profile is just a separate `HERMES_HOME` directory. When you run `hermes -p work` or `hermes-work`, Hermes sets `HERMES_HOME=~/.hermes/profiles/work` before starting. Everything — config loading, memory access, session storage, gateway operation — reads from and writes to that directory.
The sticky default (`hermes profile use`) writes the profile name to `~/.hermes/active_profile`. On startup, if no `-p` flag is given, Hermes checks this file and sets `HERMES_HOME` accordingly.
Profile aliases in `~/.local/bin/` are thin wrapper scripts that set `HERMES_HOME` and exec the real `hermes` binary. This means profiles work with all existing Hermes commands, flags, and features without any special handling.
+1 -37
View File
@@ -278,11 +278,7 @@ required_environment_variables:
help: Get a key from https://developers.google.com/tenor
```
After loading this skill, `TENOR_API_KEY` passes through to `execute_code`, `terminal` (local), **and remote backends (Docker, Modal)** — no manual configuration needed.
:::info Docker & Modal
Prior to v0.5.1, Docker's `forward_env` was a separate system from the skill passthrough. They are now merged — skill-declared env vars are automatically forwarded into Docker containers and Modal sandboxes without needing to add them to `docker_forward_env` manually.
:::
After loading this skill, `TENOR_API_KEY` passes through to both `execute_code` and `terminal` subprocesses — no manual configuration needed.
**2. Config-based passthrough (manual)**
@@ -295,49 +291,17 @@ terminal:
- ANOTHER_TOKEN
```
### Credential File Passthrough (OAuth tokens, etc.) {#credential-file-passthrough}
Some skills need **files** (not just env vars) in the sandbox — for example, Google Workspace stores OAuth tokens as `google_token.json` in `~/.hermes/`. Skills declare these in frontmatter:
```yaml
required_credential_files:
- path: google_token.json
description: Google OAuth2 token (created by setup script)
- path: google_client_secret.json
description: Google OAuth2 client credentials
```
When loaded, Hermes checks if these files exist in `~/.hermes/` and registers them for mounting:
- **Docker**: Read-only bind mounts (`-v host:container:ro`)
- **Modal**: Mounted at sandbox creation + synced before each command (handles mid-session OAuth setup)
- **Local**: No action needed (files already accessible)
You can also list credential files manually in `config.yaml`:
```yaml
terminal:
credential_files:
- google_token.json
- my_custom_oauth_token.json
```
Paths are relative to `~/.hermes/`. Files are mounted to `/root/.hermes/` inside the container.
### What Each Sandbox Filters
| Sandbox | Default Filter | Passthrough Override |
|---------|---------------|---------------------|
| **execute_code** | Blocks vars containing `KEY`, `TOKEN`, `SECRET`, `PASSWORD`, `CREDENTIAL`, `PASSWD`, `AUTH` in name; only allows safe-prefix vars through | ✅ Passthrough vars bypass both checks |
| **terminal** (local) | Blocks explicit Hermes infrastructure vars (provider keys, gateway tokens, tool API keys) | ✅ Passthrough vars bypass the blocklist |
| **terminal** (Docker) | No host env vars by default | ✅ Passthrough vars + `docker_forward_env` forwarded via `-e` |
| **terminal** (Modal) | No host env/files by default | ✅ Credential files mounted; env passthrough via sync |
| **MCP** | Blocks everything except safe system vars + explicitly configured `env` | ❌ Not affected by passthrough (use MCP `env` config instead) |
### Security Considerations
- The passthrough only affects vars you or your skills explicitly declare — the default security posture is unchanged for arbitrary LLM-generated code
- Credential files are mounted **read-only** into Docker containers
- Skills Guard scans skill content for suspicious env access patterns before installation
- Missing/unset vars are never registered (you can't leak what doesn't exist)
- Hermes infrastructure secrets (provider API keys, gateway tokens) should never be added to `env_passthrough` — they have dedicated mechanisms
-3
View File
@@ -37,8 +37,6 @@ const sidebars: SidebarsConfig = {
'user-guide/configuration',
'user-guide/sessions',
'user-guide/security',
'user-guide/docker',
'user-guide/profiles',
{
type: 'category',
label: 'Messaging Gateway',
@@ -154,7 +152,6 @@ const sidebars: SidebarsConfig = {
'reference/mcp-config-reference',
'reference/skills-catalog',
'reference/optional-skills-catalog',
'reference/profile-commands',
'reference/environment-variables',
'reference/faq',
],