Compare commits

..

15 Commits

Author SHA1 Message Date
teknium1
fea3a5bdcf feat: unify hermes tools and hermes setup tools into single flow
Both 'hermes tools' and 'hermes setup tools' now use the same unified
flow in tools_config.py:

1. Select platform (CLI, Telegram, Discord, etc.)
2. Toggle all 18 toolsets on/off in checklist
3. Newly enabled tools that need API keys → provider-aware config
   (e.g., TTS shows Edge/OpenAI/ElevenLabs picker)
4. Already-configured tools that stay enabled → silent, no prompts
5. Menu option: 'Reconfigure an existing tool' for updating
   providers or API keys on tools that are already set up

Key changes:
- Move TOOL_CATEGORIES, provider config, and post-setup hooks from
  setup.py to tools_config.py
- Replace flat _check_and_prompt_requirements() with provider-aware
  _configure_toolset() that uses TOOL_CATEGORIES
- Add _reconfigure_tool() flow for updating existing configs
- setup.py's setup_tools() now delegates to tools_command()
- tools_command() menu adds 'Reconfigure' option alongside platforms
- Only prompt for API keys on tools that are NEWLY toggled on AND
  don't already have keys configured

No breaking changes. All 2013 tests pass.
2026-03-06 18:11:35 -08:00
teknium1
93dd869eab fix: remove ANSI codes and em dashes from menu labels
simple_term_menu miscalculates string widths when labels contain
ANSI escape codes (from color()) or em dashes, causing duplicated
and garbled lines on arrow key navigation.

Replace color() status indicators with plain text [configured]/[active]
and em dashes with regular dashes in all prompt_choice/prompt_checklist
labels.
2026-03-06 17:55:44 -08:00
teknium1
50ee4aa672 feat: modular setup wizard with section subcommands and tool-first UX
Restructure the monolithic hermes setup wizard into independently-runnable
sections with a category-first tool configuration experience.

Changes:
- Break setup into 5 sections: model, terminal, gateway, tools, agent
- Each section is a standalone function, runnable individually via
  'hermes setup model', 'hermes setup terminal', etc.
- Returning users get a menu: Quick Setup / Full Setup / individual sections
- First-time users get a guided walkthrough of all sections

Tool Configuration UX overhaul:
- Replace flat API key checklist with category-first approach
- Show tool types (TTS, Web Search, Image Gen, etc.) as top-level items
- Within each category, let users pick a provider:
  - TTS: Microsoft Edge (Free), OpenAI, ElevenLabs
  - Web: Firecrawl Cloud, Firecrawl Self-Hosted
  - Image Gen: FAL.ai
  - Browser: Browserbase
  - Smart Home: Home Assistant
  - RL Training: Tinker/Atropos
  - GitHub: Personal Access Token
- Shows configured status on each tool and provider
- Only prompts for API keys after provider selection

Also:
- Add section argument to setup argparse parser in main.py
- Update summary to show new section commands
- Add self-hosted Firecrawl and Home Assistant to tool setup
- All 2013 tests pass
2026-03-06 17:46:31 -08:00
teknium1
f75b1d21b4 fix: execute_code and delegate_task now respect disabled toolsets
When a user disables the web toolset via 'hermes tools', the execute_code
schema description still hardcoded web_search/web_extract as available,
causing the model to keep trying to use them. Similarly, delegate_task
always defaulted to ['terminal', 'file', 'web'] for subagents regardless
of the parent's config.

Changes:
- execute_code schema is now built dynamically via build_execute_code_schema()
  based on which sandbox tools are actually enabled
- model_tools.py rebuilds the execute_code schema at definition time using
  the intersection of sandbox-allowed and session-enabled tools
- delegate_task now inherits the parent agent's enabled_toolsets instead of
  hardcoding DEFAULT_TOOLSETS when no explicit toolsets are specified
- delegate_task description updated to say 'inherits your enabled toolsets'

Reported by kotyKD on Discord.
2026-03-06 17:36:14 -08:00
teknium1
94053d75a6 fix: custom endpoint no longer leaks OPENROUTER_API_KEY (#560)
API key selection is now base_url-aware: when the resolved base_url
targets OpenRouter, OPENROUTER_API_KEY takes priority (preserving the
#289 fix). When hitting any other endpoint (Z.ai, vLLM, custom, etc.),
OPENAI_API_KEY takes priority so the OpenRouter key doesn't leak.

Applied in both the runtime provider resolver (the real code path) and
the CLI initial default (for consistency).

Fixes #560.
2026-03-06 17:16:14 -08:00
teknium1
2a68099675 fix(tests): isolate tests from user ~/.hermes/ config and SOUL.md
_make_cli() now patches CLI_CONFIG with clean defaults so
test_cli_init tests don't depend on the developer's local config.yaml.
test_empty_dir_returns_empty now mocks Path.home() so it doesn't pick
up a global SOUL.md.

Credit to teyrebaz33 for identifying and fixing these in PR #557.
Fixes #555.
2026-03-06 17:10:35 -08:00
teknium1
6cd3bc6640 Merge PR #563: fix: prevent data loss in skills sync on copy/update failure
Authored by 0xbyt4. Two bugs fixed:
1. Failed copytree no longer poisons the manifest (skill gets retried)
2. Failed update no longer destroys user's copy (backup + restore)
2026-03-06 17:01:30 -08:00
0xbyt4
211b55815e fix: prevent data loss in skills sync on copy/update failure
Two bugs in sync_skills():

1. Failed copytree poisons manifest: when shutil.copytree fails (disk
   full, permission error), the skill is still recorded in the manifest.
   On the next sync, the skill appears as "in manifest but not on disk"
   which is interpreted as "user deliberately deleted it" — the skill
   is never retried.  Fix: only write to manifest on successful copy.

2. Failed update destroys user copy: rmtree deletes the existing skill
   directory before copytree runs. If copytree then fails, the user's
   skill is gone with no way to recover.  Fix: move to .bak before
   copying, restore from backup if copytree fails.

Both bugs are proven by new regression tests that fail on the old code
and pass on the fix.
2026-03-07 03:58:32 +03:00
teknium1
8ae4a6f824 fix: improve handling of empty responses after tool calls
- Added fallback mechanism to utilize previous content when the model generates an empty response after tool calls, reducing unnecessary API retries.
- Enhanced logging to indicate when prior content is used as a final response.
- Updated logic to ensure that genuine empty responses are retried appropriately, maintaining user experience.
2026-03-06 16:54:31 -08:00
teknium1
b98301677a docs: add /insights to all help menus and documentation
- website/docs/reference/cli-commands.md: Added 'hermes insights' terminal
  command section with --days and --source flags, plus /insights slash command
  in the Conversation section
- website/docs/user-guide/cli.md: Added /insights to slash commands table
- website/docs/user-guide/messaging/index.md: Added /insights to gateway
  chat commands table
- website/docs/user-guide/sessions.md: Added cross-reference to hermes
  insights from the sessions stats section
2026-03-06 16:48:58 -08:00
teknium1
f2fdde5ba4 fix: show user-modified skills count in hermes update output 2026-03-06 16:14:43 -08:00
teknium1
4f56e31dc7 fix: track origin hashes in skills manifest to preserve user modifications
Upgrade skills_sync manifest to v2 format (name:origin_hash). The origin
hash records the MD5 of the bundled skill at the time it was last synced.

On update, the user's copy is compared against the origin hash:
- User copy == origin hash → unmodified → safe to update from bundled
- User copy != origin hash → user customized → skip (preserve changes)

v1 manifests (plain names) are auto-migrated: the user's current hash
becomes the baseline, so future syncs can detect modifications.

Output now shows user-modified skills:
  ~ whisper (user-modified, skipping)

27 tests covering all scenarios including v1→v2 migration, user
modification detection, update after migration, and origin hash tracking.
2009 tests pass.
2026-03-06 16:13:58 -08:00
Teknium
6d3804770c Merge pull request #552 from NousResearch/feat/insights
feat: /insights command — usage analytics, cost estimation & activity patterns
2026-03-06 16:00:28 -08:00
teknium1
ab0f4126cf fix: restore all removed bundled skills + fix skills sync system
- Restored 21 skills removed in commits 757d012 and 740dd92:
  accelerate, audiocraft, code-review, faiss, flash-attention, gguf,
  grpo-rl-training, guidance, llava, nemo-curator, obliteratus, peft,
  pytorch-fsdp, pytorch-lightning, simpo, slime, stable-diffusion,
  tensorrt-llm, torchtitan, trl-fine-tuning, whisper

- Rewrote sync_skills() with proper update semantics:
  * New skills (not in manifest): copied to user dir
  * Existing skills (in manifest + on disk): updated via hash comparison
  * User-deleted skills (in manifest, not on disk): respected, not re-added
  * Stale manifest entries (removed from bundled): cleaned from manifest

- Added sync_skills() to CLI startup (cmd_chat) and gateway startup
  (start_gateway) — previously only ran during 'hermes update'

- Updated cmd_update output to show new/updated/cleaned counts

- Rewrote tests: 20 tests covering manifest CRUD, dir hashing, fresh
  install, user deletion respect, update detection, stale cleanup, and
  name collision handling

75 bundled skills total. 2002 tests pass.
2026-03-06 15:57:30 -08:00
teknium1
68fbae5692 docs: add Custom & Self-Hosted LLM Providers guide
Comprehensive guide for using Hermes Agent with alternative LLM backends:
- Ollama (local models, zero config)
- vLLM (high-performance GPU inference)
- SGLang (RadixAttention, prefix caching)
- llama.cpp / llama-server (CPU & Metal inference)
- LiteLLM Proxy (multi-provider gateway)
- ClawRouter (cost-optimized routing with complexity scoring)
- 10+ other compatible providers table (Together, Groq, DeepSeek, etc.)
- Choosing the Right Setup decision table
- General custom endpoint setup instructions

All of these work via the existing OPENAI_BASE_URL + OPENAI_API_KEY
custom endpoint support — no code changes needed.
2026-03-06 14:16:06 -08:00
14 changed files with 1789 additions and 1062 deletions

8
cli.py
View File

@@ -870,7 +870,13 @@ class HermesCLI:
or os.getenv("OPENAI_BASE_URL")
or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
)
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY")
# Match key to resolved base_url: OpenRouter URL → prefer OPENROUTER_API_KEY,
# custom endpoint → prefer OPENAI_API_KEY (issue #560).
# Note: _ensure_runtime_credentials() re-resolves this before first use.
if "openrouter.ai" in self.base_url:
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY")
else:
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
self._nous_key_expires_at: Optional[str] = None
self._nous_key_source: Optional[str] = None
# Max turns priority: CLI arg > config file > env var > default

View File

@@ -864,6 +864,8 @@ def _update_via_zip(args):
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
if result.get("updated"):
print(f"{len(result['updated'])} updated: {', '.join(result['updated'])}")
if result.get("user_modified"):
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
if result.get("cleaned"):
print(f" {len(result['cleaned'])} removed from manifest")
if not result["copied"] and not result.get("updated"):
@@ -982,6 +984,8 @@ def cmd_update(args):
print(f" + {len(result['copied'])} new: {', '.join(result['copied'])}")
if result.get("updated"):
print(f"{len(result['updated'])} updated: {', '.join(result['updated'])}")
if result.get("user_modified"):
print(f" ~ {len(result['user_modified'])} user-modified (kept)")
if result.get("cleaned"):
print(f" {len(result['cleaned'])} removed from manifest")
if not result["copied"] and not result.get("updated"):
@@ -1215,7 +1219,15 @@ For more help on a command:
setup_parser = subparsers.add_parser(
"setup",
help="Interactive setup wizard",
description="Configure Hermes Agent with an interactive wizard"
description="Configure Hermes Agent with an interactive wizard. "
"Run a specific section: hermes setup model|terminal|gateway|tools|agent"
)
setup_parser.add_argument(
"section",
nargs="?",
choices=["model", "terminal", "gateway", "tools", "agent"],
default=None,
help="Run a specific setup section instead of the full wizard"
)
setup_parser.add_argument(
"--non-interactive",

View File

@@ -72,12 +72,25 @@ def _resolve_openrouter_runtime(
or OPENROUTER_BASE_URL
).rstrip("/")
api_key = (
explicit_api_key
or os.getenv("OPENROUTER_API_KEY")
or os.getenv("OPENAI_API_KEY")
or ""
)
# Choose API key based on whether the resolved base_url targets OpenRouter.
# When hitting OpenRouter, prefer OPENROUTER_API_KEY (issue #289).
# When hitting a custom endpoint, prefer OPENAI_API_KEY so the OpenRouter
# key doesn't leak to an unrelated provider (issue #560).
_is_openrouter_url = "openrouter.ai" in base_url
if _is_openrouter_url:
api_key = (
explicit_api_key
or os.getenv("OPENROUTER_API_KEY")
or os.getenv("OPENAI_API_KEY")
or ""
)
else:
api_key = (
explicit_api_key
or os.getenv("OPENAI_API_KEY")
or os.getenv("OPENROUTER_API_KEY")
or ""
)
source = "explicit" if (explicit_api_key or explicit_base_url) else "env/config"

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,10 @@
"""
Interactive tool configuration for Hermes Agent.
Unified tool configuration for Hermes Agent.
`hermes tools` and `hermes setup tools` both enter this module.
Select a platform → toggle toolsets on/off → for newly enabled tools
that need API keys, run through provider-aware configuration.
`hermes tools` — select a platform, then toggle toolsets on/off via checklist.
Saves per-platform tool configuration to ~/.hermes/config.yaml under
the `platform_toolsets` key.
"""
@@ -12,9 +15,63 @@ from typing import Dict, List, Set
import os
from hermes_cli.config import load_config, save_config, get_env_value, save_env_value
from hermes_cli.config import (
load_config, save_config, get_env_value, save_env_value,
get_hermes_home,
)
from hermes_cli.colors import Colors, color
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
# ─── UI Helpers (shared with setup.py) ────────────────────────────────────────
def _print_info(text: str):
print(color(f" {text}", Colors.DIM))
def _print_success(text: str):
print(color(f"{text}", Colors.GREEN))
def _print_warning(text: str):
print(color(f"{text}", Colors.YELLOW))
def _print_error(text: str):
print(color(f"{text}", Colors.RED))
def _prompt(question: str, default: str = None, password: bool = False) -> str:
if default:
display = f"{question} [{default}]: "
else:
display = f"{question}: "
try:
if password:
import getpass
value = getpass.getpass(color(display, Colors.YELLOW))
else:
value = input(color(display, Colors.YELLOW))
return value.strip() or default or ""
except (KeyboardInterrupt, EOFError):
print()
return default or ""
def _prompt_yes_no(question: str, default: bool = True) -> bool:
default_str = "Y/n" if default else "y/N"
while True:
try:
value = input(color(f"{question} [{default_str}]: ", Colors.YELLOW)).strip().lower()
except (KeyboardInterrupt, EOFError):
print()
return default
if not value:
return default
if value in ('y', 'yes'):
return True
if value in ('n', 'no'):
return False
# ─── Toolset Registry ─────────────────────────────────────────────────────────
# Toolsets shown in the configurator, grouped for display.
# Each entry: (toolset_name, label, description)
# These map to keys in toolsets.py TOOLSETS dict.
@@ -49,6 +106,181 @@ PLATFORMS = {
}
# ─── Tool Categories (provider-aware configuration) ──────────────────────────
# Maps toolset keys to their provider options. When a toolset is newly enabled,
# we use this to show provider selection and prompt for the right API keys.
# Toolsets not in this map either need no config or use the simple fallback.
TOOL_CATEGORIES = {
"tts": {
"name": "Text-to-Speech",
"icon": "🔊",
"providers": [
{
"name": "Microsoft Edge TTS",
"tag": "Free - no API key needed",
"env_vars": [],
"tts_provider": "edge",
},
{
"name": "OpenAI TTS",
"tag": "Premium - high quality voices",
"env_vars": [
{"key": "VOICE_TOOLS_OPENAI_KEY", "prompt": "OpenAI API key", "url": "https://platform.openai.com/api-keys"},
],
"tts_provider": "openai",
},
{
"name": "ElevenLabs",
"tag": "Premium - most natural voices",
"env_vars": [
{"key": "ELEVENLABS_API_KEY", "prompt": "ElevenLabs API key", "url": "https://elevenlabs.io/app/settings/api-keys"},
],
"tts_provider": "elevenlabs",
},
],
},
"web": {
"name": "Web Search & Extract",
"icon": "🔍",
"providers": [
{
"name": "Firecrawl Cloud",
"tag": "Recommended - hosted service",
"env_vars": [
{"key": "FIRECRAWL_API_KEY", "prompt": "Firecrawl API key", "url": "https://firecrawl.dev"},
],
},
{
"name": "Firecrawl Self-Hosted",
"tag": "Free - run your own instance",
"env_vars": [
{"key": "FIRECRAWL_API_URL", "prompt": "Your Firecrawl instance URL (e.g., http://localhost:3002)"},
],
},
],
},
"image_gen": {
"name": "Image Generation",
"icon": "🎨",
"providers": [
{
"name": "FAL.ai",
"tag": "FLUX 2 Pro with auto-upscaling",
"env_vars": [
{"key": "FAL_KEY", "prompt": "FAL API key", "url": "https://fal.ai/dashboard/keys"},
],
},
],
},
"browser": {
"name": "Browser Automation",
"icon": "🌐",
"providers": [
{
"name": "Browserbase",
"tag": "Cloud browser with stealth mode",
"env_vars": [
{"key": "BROWSERBASE_API_KEY", "prompt": "Browserbase API key", "url": "https://browserbase.com"},
{"key": "BROWSERBASE_PROJECT_ID", "prompt": "Browserbase project ID"},
],
"post_setup": "browserbase",
},
],
},
"homeassistant": {
"name": "Smart Home",
"icon": "🏠",
"providers": [
{
"name": "Home Assistant",
"tag": "REST API integration",
"env_vars": [
{"key": "HASS_TOKEN", "prompt": "Home Assistant Long-Lived Access Token"},
{"key": "HASS_URL", "prompt": "Home Assistant URL", "default": "http://homeassistant.local:8123"},
],
},
],
},
"rl": {
"name": "RL Training",
"icon": "🧪",
"requires_python": (3, 11),
"providers": [
{
"name": "Tinker / Atropos",
"tag": "RL training platform",
"env_vars": [
{"key": "TINKER_API_KEY", "prompt": "Tinker API key", "url": "https://tinker-console.thinkingmachines.ai/keys"},
{"key": "WANDB_API_KEY", "prompt": "WandB API key", "url": "https://wandb.ai/authorize"},
],
"post_setup": "rl_training",
},
],
},
}
# Simple env-var requirements for toolsets NOT in TOOL_CATEGORIES.
# Used as a fallback for tools like vision/moa that just need an API key.
TOOLSET_ENV_REQUIREMENTS = {
"vision": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
"moa": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
}
# ─── Post-Setup Hooks ─────────────────────────────────────────────────────────
def _run_post_setup(post_setup_key: str):
"""Run post-setup hooks for tools that need extra installation steps."""
import shutil
if post_setup_key == "browserbase":
node_modules = PROJECT_ROOT / "node_modules" / "agent-browser"
if not node_modules.exists() and shutil.which("npm"):
_print_info(" Installing Node.js dependencies for browser tools...")
import subprocess
result = subprocess.run(
["npm", "install", "--silent"],
capture_output=True, text=True, cwd=str(PROJECT_ROOT)
)
if result.returncode == 0:
_print_success(" Node.js dependencies installed")
else:
_print_warning(" npm install failed - run manually: cd ~/.hermes/hermes-agent && npm install")
elif not node_modules.exists():
_print_warning(" Node.js not found - browser tools require: npm install (in hermes-agent directory)")
elif post_setup_key == "rl_training":
try:
__import__("tinker_atropos")
except ImportError:
tinker_dir = PROJECT_ROOT / "tinker-atropos"
if tinker_dir.exists() and (tinker_dir / "pyproject.toml").exists():
_print_info(" Installing tinker-atropos submodule...")
import subprocess
uv_bin = shutil.which("uv")
if uv_bin:
result = subprocess.run(
[uv_bin, "pip", "install", "-e", str(tinker_dir)],
capture_output=True, text=True
)
else:
result = subprocess.run(
[sys.executable, "-m", "pip", "install", "-e", str(tinker_dir)],
capture_output=True, text=True
)
if result.returncode == 0:
_print_success(" tinker-atropos installed")
else:
_print_warning(" tinker-atropos install failed - run manually:")
_print_info(' uv pip install -e "./tinker-atropos"')
else:
_print_warning(" tinker-atropos submodule not found - run:")
_print_info(" git submodule update --init --recursive")
_print_info(' uv pip install -e "./tinker-atropos"')
# ─── Platform / Toolset Helpers ───────────────────────────────────────────────
def _get_enabled_platforms() -> List[str]:
"""Return platform keys that are configured (have tokens or are CLI)."""
enabled = ["cli"]
@@ -97,6 +329,28 @@ def _save_platform_tools(config: dict, platform: str, enabled_toolset_keys: Set[
save_config(config)
def _toolset_has_keys(ts_key: str) -> bool:
"""Check if a toolset's required API keys are configured."""
# Check TOOL_CATEGORIES first (provider-aware)
cat = TOOL_CATEGORIES.get(ts_key)
if cat:
for provider in cat["providers"]:
env_vars = provider.get("env_vars", [])
if not env_vars:
return True # Free provider (e.g., Edge TTS)
if all(get_env_value(v["key"]) for v in env_vars):
return True
return False
# Fallback to simple requirements
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return True
return all(get_env_value(var) for var, _ in requirements)
# ─── Menu Helpers ─────────────────────────────────────────────────────────────
def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
"""Single-select menu (arrow keys)."""
print(color(question, Colors.YELLOW))
@@ -114,7 +368,7 @@ def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
)
idx = menu.show()
if idx is None:
sys.exit(0)
return default
print()
return idx
except (ImportError, NotImplementedError):
@@ -132,15 +386,7 @@ def _prompt_choice(question: str, choices: list, default: int = 0) -> int:
return idx
except (ValueError, KeyboardInterrupt, EOFError):
print()
sys.exit(0)
def _toolset_has_keys(ts_key: str) -> bool:
"""Check if a toolset's required API keys are configured."""
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return True
return all(get_env_value(var) for var, _ in requirements)
return default
def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str]:
@@ -150,8 +396,8 @@ def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str
labels = []
for ts_key, ts_label, ts_desc in CONFIGURABLE_TOOLSETS:
suffix = ""
if not _toolset_has_keys(ts_key) and TOOLSET_ENV_REQUIREMENTS.get(ts_key):
suffix = " no API key"
if not _toolset_has_keys(ts_key) and (TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key)):
suffix = " [no API key]"
labels.append(f"{ts_label} ({ts_desc}){suffix}")
pre_selected_indices = [
@@ -302,77 +548,294 @@ def _prompt_toolset_checklist(platform_label: str, enabled: Set[str]) -> Set[str
return {CONFIGURABLE_TOOLSETS[i][0] for i in selected}
# Map toolset keys to the env vars they require and where to get them
TOOLSET_ENV_REQUIREMENTS = {
"web": [("FIRECRAWL_API_KEY", "https://firecrawl.dev/")],
"browser": [("BROWSERBASE_API_KEY", "https://browserbase.com/"),
("BROWSERBASE_PROJECT_ID", None)],
"vision": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
"image_gen": [("FAL_KEY", "https://fal.ai/")],
"moa": [("OPENROUTER_API_KEY", "https://openrouter.ai/keys")],
"tts": [], # Edge TTS is free, no key needed
"rl": [("TINKER_API_KEY", "https://tinker-console.thinkingmachines.ai/keys"),
("WANDB_API_KEY", "https://wandb.ai/authorize")],
"homeassistant": [("HASS_TOKEN", "Home Assistant > Profile > Long-Lived Access Tokens"),
("HASS_URL", None)],
}
# ─── Provider-Aware Configuration ────────────────────────────────────────────
def _configure_toolset(ts_key: str, config: dict):
"""Configure a toolset - provider selection + API keys.
Uses TOOL_CATEGORIES for provider-aware config, falls back to simple
env var prompts for toolsets not in TOOL_CATEGORIES.
"""
cat = TOOL_CATEGORIES.get(ts_key)
if cat:
_configure_tool_category(ts_key, cat, config)
else:
# Simple fallback for vision, moa, etc.
_configure_simple_requirements(ts_key)
def _check_and_prompt_requirements(newly_enabled: Set[str]):
"""Check if newly enabled toolsets have missing API keys and offer to set them up."""
for ts_key in sorted(newly_enabled):
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
continue
def _configure_tool_category(ts_key: str, cat: dict, config: dict):
"""Configure a tool category with provider selection."""
icon = cat.get("icon", "")
name = cat["name"]
providers = cat["providers"]
missing = [(var, url) for var, url in requirements if not get_env_value(var)]
if not missing:
continue
ts_label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts_key), ts_key)
print()
print(color(f"{ts_label} requires configuration:", Colors.YELLOW))
for var, url in missing:
if url:
print(color(f" {var}", Colors.CYAN) + color(f" ({url})", Colors.DIM))
else:
print(color(f" {var}", Colors.CYAN))
print()
try:
response = input(color(" Set up now? [Y/n] ", Colors.YELLOW)).strip().lower()
except (KeyboardInterrupt, EOFError):
# Check Python version requirement
if cat.get("requires_python"):
req = cat["requires_python"]
if sys.version_info < req:
print()
continue
_print_error(f" {name} requires Python {req[0]}.{req[1]}+ (current: {sys.version_info.major}.{sys.version_info.minor})")
_print_info(" Upgrade Python and reinstall to enable this tool.")
return
if response in ("", "y", "yes"):
for var, url in missing:
if url:
print(color(f" Get key at: {url}", Colors.DIM))
try:
import getpass
value = getpass.getpass(color(f" {var}: ", Colors.YELLOW))
except (KeyboardInterrupt, EOFError):
print()
break
if value.strip():
save_env_value(var, value.strip())
print(color(f" ✓ Saved", Colors.GREEN))
if len(providers) == 1:
# Single provider - configure directly
provider = providers[0]
print()
print(color(f" --- {icon} {name} ({provider['name']}) ---", Colors.CYAN))
if provider.get("tag"):
_print_info(f" {provider['tag']}")
_configure_provider(provider, config)
else:
# Multiple providers - let user choose
print()
print(color(f" --- {icon} {name} - Choose a provider ---", Colors.CYAN))
print()
# Plain text labels only (no ANSI codes in menu items)
provider_choices = []
for p in providers:
tag = f" ({p['tag']})" if p.get("tag") else ""
configured = ""
env_vars = p.get("env_vars", [])
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
configured = " [active]"
elif not env_vars:
configured = " [active]" if config.get("tts", {}).get("provider", "edge") == p.get("tts_provider", "") else ""
else:
print(color(f" Skipped", Colors.DIM))
configured = " [configured]"
provider_choices.append(f"{p['name']}{tag}{configured}")
# Detect current provider as default
default_idx = 0
for i, p in enumerate(providers):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
default_idx = i
break
env_vars = p.get("env_vars", [])
if env_vars and all(get_env_value(v["key"]) for v in env_vars):
default_idx = i
break
provider_idx = _prompt_choice(" Select provider:", provider_choices, default_idx)
_configure_provider(providers[provider_idx], config)
def _configure_provider(provider: dict, config: dict):
"""Configure a single provider - prompt for API keys and set config."""
env_vars = provider.get("env_vars", [])
# Set TTS provider in config if applicable
if provider.get("tts_provider"):
config.setdefault("tts", {})["provider"] = provider["tts_provider"]
if not env_vars:
_print_success(f" {provider['name']} - no configuration needed!")
return
# Prompt for each required env var
all_configured = True
for var in env_vars:
existing = get_env_value(var["key"])
if existing:
_print_success(f" {var['key']}: already configured")
# Don't ask to update - this is a new enable flow.
# Reconfigure is handled separately.
else:
print(color(" Skipped — configure later with 'hermes setup'", Colors.DIM))
url = var.get("url", "")
if url:
_print_info(f" Get yours at: {url}")
default_val = var.get("default", "")
if default_val:
value = _prompt(f" {var.get('prompt', var['key'])}", default_val)
else:
value = _prompt(f" {var.get('prompt', var['key'])}", password=True)
if value:
save_env_value(var["key"], value)
_print_success(f" Saved")
else:
_print_warning(f" Skipped")
all_configured = False
# Run post-setup hooks if needed
if provider.get("post_setup") and all_configured:
_run_post_setup(provider["post_setup"])
if all_configured:
_print_success(f" {provider['name']} configured!")
def tools_command(args):
"""Entry point for `hermes tools`."""
def _configure_simple_requirements(ts_key: str):
"""Simple fallback for toolsets that just need env vars (no provider selection)."""
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return
missing = [(var, url) for var, url in requirements if not get_env_value(var)]
if not missing:
return
ts_label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts_key), ts_key)
print()
print(color(f" {ts_label} requires configuration:", Colors.YELLOW))
for var, url in missing:
if url:
_print_info(f" Get key at: {url}")
value = _prompt(f" {var}", password=True)
if value and value.strip():
save_env_value(var, value.strip())
_print_success(f" Saved")
else:
_print_warning(f" Skipped")
def _reconfigure_tool(config: dict):
"""Let user reconfigure an existing tool's provider or API key."""
# Build list of configurable tools that are currently set up
configurable = []
for ts_key, ts_label, _ in CONFIGURABLE_TOOLSETS:
cat = TOOL_CATEGORIES.get(ts_key)
reqs = TOOLSET_ENV_REQUIREMENTS.get(ts_key)
if cat or reqs:
if _toolset_has_keys(ts_key):
configurable.append((ts_key, ts_label))
if not configurable:
_print_info("No configured tools to reconfigure.")
return
choices = [label for _, label in configurable]
choices.append("Cancel")
idx = _prompt_choice(" Which tool would you like to reconfigure?", choices, len(choices) - 1)
if idx >= len(configurable):
return # Cancel
ts_key, ts_label = configurable[idx]
cat = TOOL_CATEGORIES.get(ts_key)
if cat:
_configure_tool_category_for_reconfig(ts_key, cat, config)
else:
_reconfigure_simple_requirements(ts_key)
save_config(config)
def _configure_tool_category_for_reconfig(ts_key: str, cat: dict, config: dict):
"""Reconfigure a tool category - provider selection + API key update."""
icon = cat.get("icon", "")
name = cat["name"]
providers = cat["providers"]
if len(providers) == 1:
provider = providers[0]
print()
print(color(f" --- {icon} {name} ({provider['name']}) ---", Colors.CYAN))
_reconfigure_provider(provider, config)
else:
print()
print(color(f" --- {icon} {name} - Choose a provider ---", Colors.CYAN))
print()
provider_choices = []
for p in providers:
tag = f" ({p['tag']})" if p.get("tag") else ""
configured = ""
env_vars = p.get("env_vars", [])
if not env_vars or all(get_env_value(v["key"]) for v in env_vars):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
configured = " [active]"
elif not env_vars:
configured = ""
else:
configured = " [configured]"
provider_choices.append(f"{p['name']}{tag}{configured}")
default_idx = 0
for i, p in enumerate(providers):
if p.get("tts_provider") and config.get("tts", {}).get("provider") == p["tts_provider"]:
default_idx = i
break
env_vars = p.get("env_vars", [])
if env_vars and all(get_env_value(v["key"]) for v in env_vars):
default_idx = i
break
provider_idx = _prompt_choice(" Select provider:", provider_choices, default_idx)
_reconfigure_provider(providers[provider_idx], config)
def _reconfigure_provider(provider: dict, config: dict):
"""Reconfigure a provider - update API keys."""
env_vars = provider.get("env_vars", [])
if provider.get("tts_provider"):
config.setdefault("tts", {})["provider"] = provider["tts_provider"]
_print_success(f" TTS provider set to: {provider['tts_provider']}")
if not env_vars:
_print_success(f" {provider['name']} - no configuration needed!")
return
for var in env_vars:
existing = get_env_value(var["key"])
if existing:
_print_info(f" {var['key']}: configured ({existing[:8]}...)")
url = var.get("url", "")
if url:
_print_info(f" Get yours at: {url}")
default_val = var.get("default", "")
value = _prompt(f" {var.get('prompt', var['key'])} (Enter to keep current)", password=not default_val)
if value and value.strip():
save_env_value(var["key"], value.strip())
_print_success(f" Updated")
else:
_print_info(f" Kept current")
def _reconfigure_simple_requirements(ts_key: str):
"""Reconfigure simple env var requirements."""
requirements = TOOLSET_ENV_REQUIREMENTS.get(ts_key, [])
if not requirements:
return
ts_label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts_key), ts_key)
print()
print(color(f" {ts_label}:", Colors.CYAN))
for var, url in requirements:
existing = get_env_value(var)
if existing:
_print_info(f" {var}: configured ({existing[:8]}...)")
if url:
_print_info(f" Get key at: {url}")
value = _prompt(f" {var} (Enter to keep current)", password=True)
if value and value.strip():
save_env_value(var, value.strip())
_print_success(f" Updated")
else:
_print_info(f" Kept current")
# ─── Main Entry Point ─────────────────────────────────────────────────────────
def tools_command(args=None):
"""Entry point for `hermes tools` and `hermes setup tools`."""
config = load_config()
enabled_platforms = _get_enabled_platforms()
print()
print(color("⚕ Hermes Tool Configuration", Colors.CYAN, Colors.BOLD))
print(color(" Enable or disable tools per platform.", Colors.DIM))
print(color(" Tools that need API keys will be configured when enabled.", Colors.DIM))
print()
# Build platform choices
@@ -380,22 +843,28 @@ def tools_command(args):
platform_keys = []
for pkey in enabled_platforms:
pinfo = PLATFORMS[pkey]
# Count currently enabled toolsets
current = _get_platform_tools(config, pkey)
count = len(current)
total = len(CONFIGURABLE_TOOLSETS)
platform_choices.append(f"Configure {pinfo['label']} ({count}/{total} enabled)")
platform_keys.append(pkey)
platform_choices.append("Done — save and exit")
platform_choices.append("Reconfigure an existing tool's provider or API key")
platform_choices.append("Done")
while True:
idx = _prompt_choice("Select a platform to configure:", platform_choices, default=0)
idx = _prompt_choice("Select an option:", platform_choices, default=0)
# "Done" selected
if idx == len(platform_keys):
if idx == len(platform_keys) + 1:
break
# "Reconfigure" selected
if idx == len(platform_keys):
_reconfigure_tool(config)
print()
continue
pkey = platform_keys[idx]
pinfo = PLATFORMS[pkey]
@@ -418,11 +887,15 @@ def tools_command(args):
label = next((l for k, l, _ in CONFIGURABLE_TOOLSETS if k == ts), ts)
print(color(f" - {label}", Colors.RED))
# Prompt for missing API keys on newly enabled toolsets
# Configure newly enabled toolsets that need API keys
if added:
_check_and_prompt_requirements(added)
for ts_key in sorted(added):
if TOOL_CATEGORIES.get(ts_key) or TOOLSET_ENV_REQUIREMENTS.get(ts_key):
if not _toolset_has_keys(ts_key):
_configure_toolset(ts_key, config)
_save_platform_tools(config, pkey, new_enabled)
save_config(config)
print(color(f" ✓ Saved {pinfo['label']} configuration", Colors.GREEN))
else:
print(color(f" No changes to {pinfo['label']}", Colors.DIM))

View File

@@ -225,6 +225,18 @@ def get_tool_definitions(
# Ask the registry for schemas (only returns tools whose check_fn passes)
filtered_tools = registry.get_definitions(tools_to_include, quiet=quiet_mode)
# Rebuild execute_code schema to only list sandbox tools that are actually
# enabled. Without this, the model sees "web_search is available in
# execute_code" even when the user disabled the web toolset (#560-discord).
if "execute_code" in tools_to_include:
from tools.code_execution_tool import SANDBOX_ALLOWED_TOOLS, build_execute_code_schema
sandbox_enabled = SANDBOX_ALLOWED_TOOLS & tools_to_include
dynamic_schema = build_execute_code_schema(sandbox_enabled)
for i, td in enumerate(filtered_tools):
if td.get("function", {}).get("name") == "execute_code":
filtered_tools[i] = {"type": "function", "function": dynamic_schema}
break
if not quiet_mode:
if filtered_tools:
tool_names = [t["function"]["name"] for t in filtered_tools]

View File

@@ -3707,13 +3707,33 @@ class AIAgent:
# Check if response only has think block with no actual content after it
if not self._has_content_after_think_block(final_response):
# Track retries for empty-after-think responses
# If the previous turn already delivered real content alongside
# tool calls (e.g. "You're welcome!" + memory save), the model
# has nothing more to say. Use the earlier content immediately
# instead of wasting API calls on retries that won't help.
fallback = getattr(self, '_last_content_with_tools', None)
if fallback:
logger.debug("Empty follow-up after tool calls — using prior turn content as final response")
self._last_content_with_tools = None
self._empty_content_retries = 0
for i in range(len(messages) - 1, -1, -1):
msg = messages[i]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tool_names = []
for tc in msg["tool_calls"]:
fn = tc.get("function", {})
tool_names.append(fn.get("name", "unknown"))
msg["content"] = f"Calling the {', '.join(tool_names)} tool{'s' if len(tool_names) > 1 else ''}..."
break
final_response = self._strip_think_blocks(fallback).strip()
break
# No fallback available — this is a genuine empty response.
# Retry in case the model just had a bad generation.
if not hasattr(self, '_empty_content_retries'):
self._empty_content_retries = 0
self._empty_content_retries += 1
# Show the reasoning/thinking content so the user can see
# what the model was thinking even though content is empty
reasoning_text = self._extract_reasoning(assistant_message)
print(f"{self.log_prefix}⚠️ Response only contains think block with no content after it")
if reasoning_text:

View File

@@ -172,7 +172,11 @@ class TestBuildSkillsSystemPrompt:
class TestBuildContextFilesPrompt:
def test_empty_dir_returns_empty(self, tmp_path):
result = build_context_files_prompt(cwd=str(tmp_path))
from unittest.mock import patch
fake_home = tmp_path / "fake_home"
fake_home.mkdir()
with patch("pathlib.Path.home", return_value=fake_home):
result = build_context_files_prompt(cwd=str(tmp_path))
assert result == ""
def test_loads_agents_md(self, tmp_path):

View File

@@ -12,8 +12,21 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
def _make_cli(**kwargs):
"""Create a HermesCLI instance with minimal mocking."""
import cli as _cli_mod
from cli import HermesCLI
with patch("cli.get_tool_definitions", return_value=[]):
_clean_config = {
"model": {
"default": "anthropic/claude-opus-4.6",
"base_url": "https://openrouter.ai/api/v1",
"provider": "auto",
},
"display": {"compact": False, "tool_progress": "all"},
"agent": {},
"terminal": {"env_type": "local"},
}
with patch("cli.get_tool_definitions", return_value=[]), \
patch.dict("os.environ", {"LLM_MODEL": ""}, clear=False), \
patch.dict(_cli_mod.__dict__, {"CLI_CONFIG": _clean_config}):
return HermesCLI(**kwargs)

View File

@@ -121,6 +121,43 @@ def test_openai_key_used_when_no_openrouter_key(monkeypatch):
assert resolved["api_key"] == "sk-openai-fallback"
def test_custom_endpoint_prefers_openai_key(monkeypatch):
"""Custom endpoint should use OPENAI_API_KEY, not OPENROUTER_API_KEY.
Regression test for #560: when base_url is a non-OpenRouter endpoint,
OPENROUTER_API_KEY was being sent as the auth header instead of OPENAI_API_KEY.
"""
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENAI_BASE_URL", "https://api.z.ai/api/coding/paas/v4")
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-zai-correct-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-wrong-key-for-zai")
resolved = rp.resolve_runtime_provider(requested="custom")
assert resolved["base_url"] == "https://api.z.ai/api/coding/paas/v4"
assert resolved["api_key"] == "sk-zai-correct-key"
def test_custom_endpoint_auto_provider_prefers_openai_key(monkeypatch):
"""Auto provider with non-OpenRouter base_url should prefer OPENAI_API_KEY.
Same as #560 but via 'hermes model' flow which sets provider to 'auto'.
"""
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openrouter")
monkeypatch.setattr(rp, "_get_model_config", lambda: {})
monkeypatch.setenv("OPENAI_BASE_URL", "https://my-vllm-server.example.com/v1")
monkeypatch.delenv("OPENROUTER_BASE_URL", raising=False)
monkeypatch.setenv("OPENAI_API_KEY", "sk-vllm-key")
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-should-not-leak")
resolved = rp.resolve_runtime_provider(requested="auto")
assert resolved["base_url"] == "https://my-vllm-server.example.com/v1"
assert resolved["api_key"] == "sk-vllm-key"
def test_resolve_requested_provider_precedence(monkeypatch):
monkeypatch.setenv("HERMES_INFERENCE_PROVIDER", "nous")
monkeypatch.setattr(rp, "_get_model_config", lambda: {"provider": "openai-codex"})

View File

@@ -17,42 +17,62 @@ from tools.skills_sync import (
class TestReadWriteManifest:
def test_read_missing_manifest(self, tmp_path):
with patch.object(
__import__("tools.skills_sync", fromlist=["MANIFEST_FILE"]),
"MANIFEST_FILE",
with patch(
"tools.skills_sync.MANIFEST_FILE",
tmp_path / "nonexistent",
):
result = _read_manifest()
assert result == set()
assert result == {}
def test_write_and_read_roundtrip(self, tmp_path):
def test_write_and_read_roundtrip_v2(self, tmp_path):
manifest_file = tmp_path / ".bundled_manifest"
names = {"skill-a", "skill-b", "skill-c"}
entries = {"skill-a": "abc123", "skill-b": "def456", "skill-c": "789012"}
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
_write_manifest(names)
_write_manifest(entries)
result = _read_manifest()
assert result == names
assert result == entries
def test_write_manifest_sorted(self, tmp_path):
manifest_file = tmp_path / ".bundled_manifest"
names = {"zebra", "alpha", "middle"}
entries = {"zebra": "hash1", "alpha": "hash2", "middle": "hash3"}
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
_write_manifest(names)
_write_manifest(entries)
lines = manifest_file.read_text().strip().splitlines()
assert lines == ["alpha", "middle", "zebra"]
names = [line.split(":")[0] for line in lines]
assert names == ["alpha", "middle", "zebra"]
def test_read_manifest_ignores_blank_lines(self, tmp_path):
def test_read_v1_manifest_migration(self, tmp_path):
"""v1 format (plain names, no hashes) should be read with empty hashes."""
manifest_file = tmp_path / ".bundled_manifest"
manifest_file.write_text("skill-a\n\n \nskill-b\n")
manifest_file.write_text("skill-a\nskill-b\n")
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = _read_manifest()
assert result == {"skill-a", "skill-b"}
assert result == {"skill-a": "", "skill-b": ""}
def test_read_manifest_ignores_blank_lines(self, tmp_path):
manifest_file = tmp_path / ".bundled_manifest"
manifest_file.write_text("skill-a:hash1\n\n \nskill-b:hash2\n")
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = _read_manifest()
assert result == {"skill-a": "hash1", "skill-b": "hash2"}
def test_read_manifest_mixed_v1_v2(self, tmp_path):
"""Manifest with both v1 and v2 lines (shouldn't happen but handle gracefully)."""
manifest_file = tmp_path / ".bundled_manifest"
manifest_file.write_text("old-skill\nnew-skill:abc123\n")
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
result = _read_manifest()
assert result == {"old-skill": "", "new-skill": "abc123"}
class TestDirHash:
@@ -87,13 +107,10 @@ class TestDirHash:
class TestDiscoverBundledSkills:
def test_finds_skills_with_skill_md(self, tmp_path):
# Create two skills
(tmp_path / "category" / "skill-a").mkdir(parents=True)
(tmp_path / "category" / "skill-a" / "SKILL.md").write_text("# Skill A")
(tmp_path / "skill-b").mkdir()
(tmp_path / "skill-b" / "SKILL.md").write_text("# Skill B")
# A directory without SKILL.md — should NOT be found
(tmp_path / "not-a-skill").mkdir()
(tmp_path / "not-a-skill" / "README.md").write_text("Not a skill")
@@ -140,105 +157,198 @@ class TestSyncSkills:
(bundled / "old-skill" / "SKILL.md").write_text("# Old")
return bundled
def _patches(self, bundled, skills_dir, manifest_file):
"""Return context manager stack for patching sync globals."""
from contextlib import ExitStack
stack = ExitStack()
stack.enter_context(patch("tools.skills_sync._get_bundled_dir", return_value=bundled))
stack.enter_context(patch("tools.skills_sync.SKILLS_DIR", skills_dir))
stack.enter_context(patch("tools.skills_sync.MANIFEST_FILE", manifest_file))
return stack
def test_fresh_install_copies_all(self, tmp_path):
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
assert len(result["copied"]) == 2
assert result["total_bundled"] == 2
assert result["updated"] == []
assert result["user_modified"] == []
assert result["cleaned"] == []
assert (skills_dir / "category" / "new-skill" / "SKILL.md").exists()
assert (skills_dir / "old-skill" / "SKILL.md").exists()
# DESCRIPTION.md should also be copied
assert (skills_dir / "category" / "DESCRIPTION.md").exists()
def test_fresh_install_records_origin_hashes(self, tmp_path):
"""After fresh install, manifest should have v2 format with hashes."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
with self._patches(bundled, skills_dir, manifest_file):
sync_skills(quiet=True)
manifest = _read_manifest()
assert "new-skill" in manifest
assert "old-skill" in manifest
# Hashes should be non-empty MD5 strings
assert len(manifest["new-skill"]) == 32
assert len(manifest["old-skill"]) == 32
def test_user_deleted_skill_not_re_added(self, tmp_path):
"""Skill in manifest but not on disk = user deleted it. Don't re-add."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
skills_dir.mkdir(parents=True)
# old-skill is in manifest but NOT on disk (user deleted it)
manifest_file.write_text("old-skill\n")
# old-skill is in manifest (v2 format) but NOT on disk
old_hash = _dir_hash(bundled / "old-skill")
manifest_file.write_text(f"old-skill:{old_hash}\n")
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# new-skill should be copied, old-skill should be skipped
assert "new-skill" in result["copied"]
assert "old-skill" not in result["copied"]
assert "old-skill" not in result.get("updated", [])
assert not (skills_dir / "old-skill").exists()
def test_existing_skill_gets_updated(self, tmp_path):
"""Skill in manifest AND on disk with changed content = updated."""
def test_unmodified_skill_gets_updated(self, tmp_path):
"""Skill in manifest + on disk + user hasn't modified = update from bundled."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Pre-create old-skill on disk with DIFFERENT content
# Simulate: user has old version that was synced from an older bundled
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old version from last sync")
# Mark it in the manifest
manifest_file.write_text("old-skill\n")
(user_skill / "SKILL.md").write_text("# Old v1")
old_origin_hash = _dir_hash(user_skill)
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
# Record origin hash = hash of what was synced (the old version)
manifest_file.write_text(f"old-skill:{old_origin_hash}\n")
# Now bundled has a newer version ("# Old" != "# Old v1")
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# old-skill should be updated
# Should be updated because user copy matches origin (unmodified)
assert "old-skill" in result["updated"]
assert (user_skill / "SKILL.md").read_text() == "# Old"
def test_unchanged_skill_not_updated(self, tmp_path):
"""Skill in manifest AND on disk with same content = skipped."""
def test_user_modified_skill_not_overwritten(self, tmp_path):
"""Skill modified by user should NOT be overwritten even if bundled changed."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Pre-create old-skill on disk with SAME content
# Simulate: user had the old version synced, then modified it
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_origin_hash = _dir_hash(user_skill)
# Record origin hash from what was originally synced
manifest_file.write_text(f"old-skill:{old_origin_hash}\n")
# User modifies their copy
(user_skill / "SKILL.md").write_text("# My custom version")
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# Should NOT update — user modified it
assert "old-skill" in result["user_modified"]
assert "old-skill" not in result.get("updated", [])
assert (user_skill / "SKILL.md").read_text() == "# My custom version"
def test_unchanged_skill_not_updated(self, tmp_path):
"""Skill in sync (user == bundled == origin) = no action needed."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Copy bundled to user dir (simulating perfect sync state)
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old")
# Mark it in the manifest
manifest_file.write_text("old-skill\n")
origin_hash = _dir_hash(user_skill)
manifest_file.write_text(f"old-skill:{origin_hash}\n")
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# Should be skipped, not updated
assert "old-skill" not in result.get("updated", [])
assert "old-skill" not in result.get("user_modified", [])
assert result["skipped"] >= 1
def test_v1_manifest_migration_sets_baseline(self, tmp_path):
"""v1 manifest entries (no hash) should set baseline from user's current copy."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Pre-create skill on disk
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old modified by user")
# v1 manifest (no hashes)
manifest_file.write_text("old-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# Should skip (migration baseline set), NOT update
assert "old-skill" not in result.get("updated", [])
assert "old-skill" not in result.get("user_modified", [])
# Now check manifest was upgraded to v2 with user's hash as baseline
manifest = _read_manifest()
assert len(manifest["old-skill"]) == 32 # MD5 hash
def test_v1_migration_then_bundled_update_detected(self, tmp_path):
"""After v1 migration, a subsequent sync should detect bundled updates."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# User has the SAME content as bundled (in sync)
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old")
# v1 manifest
manifest_file.write_text("old-skill\n")
with self._patches(bundled, skills_dir, manifest_file):
# First sync: migration — sets baseline
sync_skills(quiet=True)
# Now change bundled content
(bundled / "old-skill" / "SKILL.md").write_text("# Old v2 — improved")
# Second sync: should detect bundled changed + user unmodified → update
result = sync_skills(quiet=True)
assert "old-skill" in result["updated"]
assert (user_skill / "SKILL.md").read_text() == "# Old v2 — improved"
def test_stale_manifest_entries_cleaned(self, tmp_path):
"""Skills in manifest that no longer exist in bundled dir get cleaned."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
skills_dir.mkdir(parents=True)
# Add a stale entry that doesn't exist in bundled
manifest_file.write_text("old-skill\nremoved-skill\n")
manifest_file.write_text("old-skill:abc123\nremoved-skill:def456\n")
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
assert "removed-skill" in result["cleaned"]
# Verify manifest no longer has removed-skill
with patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
manifest = _read_manifest()
assert "removed-skill" not in manifest
@@ -249,20 +359,111 @@ class TestSyncSkills:
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Pre-create the skill dir with user content (not in manifest)
user_skill = skills_dir / "category" / "new-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# User modified")
with patch("tools.skills_sync._get_bundled_dir", return_value=bundled), \
patch("tools.skills_sync.SKILLS_DIR", skills_dir), \
patch("tools.skills_sync.MANIFEST_FILE", manifest_file):
with self._patches(bundled, skills_dir, manifest_file):
result = sync_skills(quiet=True)
# Should not overwrite user's version
assert (user_skill / "SKILL.md").read_text() == "# User modified"
def test_nonexistent_bundled_dir(self, tmp_path):
with patch("tools.skills_sync._get_bundled_dir", return_value=tmp_path / "nope"):
result = sync_skills(quiet=True)
assert result == {"copied": [], "updated": [], "skipped": 0, "cleaned": [], "total_bundled": 0}
assert result == {
"copied": [], "updated": [], "skipped": 0,
"user_modified": [], "cleaned": [], "total_bundled": 0,
}
def test_failed_copy_does_not_poison_manifest(self, tmp_path):
"""If copytree fails, the skill must NOT be added to the manifest.
Otherwise the next sync treats it as 'user deleted' and never retries.
"""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
with self._patches(bundled, skills_dir, manifest_file):
# Patch copytree to fail for new-skill
original_copytree = __import__("shutil").copytree
def failing_copytree(src, dst, *a, **kw):
if "new-skill" in str(src):
raise OSError("Simulated disk full")
return original_copytree(src, dst, *a, **kw)
with patch("shutil.copytree", side_effect=failing_copytree):
result = sync_skills(quiet=True)
# new-skill should NOT be in copied (it failed)
assert "new-skill" not in result["copied"]
# Critical: new-skill must NOT be in the manifest
manifest = _read_manifest()
assert "new-skill" not in manifest, (
"Failed copy was recorded in manifest — next sync will "
"treat it as 'user deleted' and never retry"
)
# Now run sync again (copytree works this time) — it should retry
result2 = sync_skills(quiet=True)
assert "new-skill" in result2["copied"]
assert (skills_dir / "category" / "new-skill" / "SKILL.md").exists()
def test_failed_update_does_not_destroy_user_copy(self, tmp_path):
"""If copytree fails during update, the user's existing copy must survive."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Start with old synced version
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_hash = _dir_hash(user_skill)
manifest_file.write_text(f"old-skill:{old_hash}\n")
with self._patches(bundled, skills_dir, manifest_file):
# Patch copytree to fail (rmtree succeeds, copytree fails)
original_copytree = __import__("shutil").copytree
def failing_copytree(src, dst, *a, **kw):
if "old-skill" in str(src):
raise OSError("Simulated write failure")
return original_copytree(src, dst, *a, **kw)
with patch("shutil.copytree", side_effect=failing_copytree):
result = sync_skills(quiet=True)
# old-skill should NOT be in updated (it failed)
assert "old-skill" not in result.get("updated", [])
# The skill directory should still exist (rmtree destroyed it
# but copytree failed to replace it — this is data loss)
assert user_skill.exists(), (
"Update failure destroyed user's skill copy without replacing it"
)
def test_update_records_new_origin_hash(self, tmp_path):
"""After updating a skill, the manifest should record the new bundled hash."""
bundled = self._setup_bundled(tmp_path)
skills_dir = tmp_path / "user_skills"
manifest_file = skills_dir / ".bundled_manifest"
# Start with old synced version
user_skill = skills_dir / "old-skill"
user_skill.mkdir(parents=True)
(user_skill / "SKILL.md").write_text("# Old v1")
old_hash = _dir_hash(user_skill)
manifest_file.write_text(f"old-skill:{old_hash}\n")
with self._patches(bundled, skills_dir, manifest_file):
sync_skills(quiet=True) # updates to "# Old"
manifest = _read_manifest()
# New origin hash should match the bundled version
new_bundled_hash = _dir_hash(bundled / "old-skill")
assert manifest["old-skill"] == new_bundled_hash
assert manifest["old-skill"] != old_hash

View File

@@ -592,9 +592,55 @@ def _load_config() -> dict:
# OpenAI Function-Calling Schema
# ---------------------------------------------------------------------------
EXECUTE_CODE_SCHEMA = {
"name": "execute_code",
"description": (
# Per-tool documentation lines for the execute_code description.
# Ordered to match the canonical display order.
_TOOL_DOC_LINES = [
("web_search",
" web_search(query: str, limit: int = 5) -> dict\n"
" Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}"),
("web_extract",
" web_extract(urls: list[str]) -> dict\n"
" Returns {\"results\": [{\"url\", \"content\", \"error\"}, ...]} where content is markdown"),
("read_file",
" read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n"
" Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}"),
("write_file",
" write_file(path: str, content: str) -> dict\n"
" Always overwrites the entire file."),
("search_files",
" search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n"
" target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}"),
("patch",
" patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n"
" Replaces old_string with new_string in the file."),
("terminal",
" terminal(command: str, timeout=None, workdir=None) -> dict\n"
" Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}"),
]
def build_execute_code_schema(enabled_sandbox_tools: set = None) -> dict:
"""Build the execute_code schema with description listing only enabled tools.
When tools are disabled via ``hermes tools`` (e.g. web is turned off),
the schema description should NOT mention web_search / web_extract —
otherwise the model thinks they are available and keeps trying to use them.
"""
if enabled_sandbox_tools is None:
enabled_sandbox_tools = SANDBOX_ALLOWED_TOOLS
# Build tool documentation lines for only the enabled tools
tool_lines = "\n".join(
doc for name, doc in _TOOL_DOC_LINES if name in enabled_sandbox_tools
)
# Build example import list from enabled tools
import_examples = [n for n in ("web_search", "terminal") if n in enabled_sandbox_tools]
if not import_examples:
import_examples = sorted(enabled_sandbox_tools)[:2]
import_str = ", ".join(import_examples) + ", ..."
description = (
"Run a Python script that can call Hermes tools programmatically. "
"Use this when you need 3+ tool calls with processing logic between them, "
"need to filter/reduce large tool outputs before they enter your context, "
@@ -603,21 +649,8 @@ EXECUTE_CODE_SCHEMA = {
"Use normal tool calls instead when: single tool call with no processing, "
"you need to see the full result and apply complex reasoning, "
"or the task requires interactive user input.\n\n"
"Available via `from hermes_tools import ...`:\n\n"
" web_search(query: str, limit: int = 5) -> dict\n"
" Returns {\"data\": {\"web\": [{\"url\", \"title\", \"description\"}, ...]}}\n"
" web_extract(urls: list[str]) -> dict\n"
" Returns {\"results\": [{\"url\", \"content\", \"error\"}, ...]} where content is markdown\n"
" read_file(path: str, offset: int = 1, limit: int = 500) -> dict\n"
" Lines are 1-indexed. Returns {\"content\": \"...\", \"total_lines\": N}\n"
" write_file(path: str, content: str) -> dict\n"
" Always overwrites the entire file.\n"
" search_files(pattern: str, target=\"content\", path=\".\", file_glob=None, limit=50) -> dict\n"
" target: \"content\" (search inside files) or \"files\" (find files by name). Returns {\"matches\": [...]}\n"
" patch(path: str, old_string: str, new_string: str, replace_all: bool = False) -> dict\n"
" Replaces old_string with new_string in the file.\n"
" terminal(command: str, timeout=None, workdir=None) -> dict\n"
" Foreground only (no background/pty). Returns {\"output\": \"...\", \"exit_code\": N}\n\n"
f"Available via `from hermes_tools import ...`:\n\n"
f"{tool_lines}\n\n"
"Limits: 5-minute timeout, 50KB stdout cap, max 50 tool calls per script. "
"terminal() is foreground-only (no background or pty).\n\n"
"Print your final result to stdout. Use Python stdlib (json, re, math, csv, "
@@ -626,22 +659,30 @@ EXECUTE_CODE_SCHEMA = {
" json_parse(text: str) — json.loads with strict=False; use for terminal() output with control chars\n"
" shell_quote(s: str) — shlex.quote(); use when interpolating dynamic strings into shell commands\n"
" retry(fn, max_attempts=3, delay=2) — retry with exponential backoff for transient failures"
),
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": (
"Python code to execute. Import tools with "
"`from hermes_tools import web_search, terminal, ...` "
"and print your final result to stdout."
),
)
return {
"name": "execute_code",
"description": description,
"parameters": {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": (
"Python code to execute. Import tools with "
f"`from hermes_tools import {import_str}` "
"and print your final result to stdout."
),
},
},
"required": ["code"],
},
"required": ["code"],
},
}
}
# Default schema used at registration time (all sandbox tools listed)
EXECUTE_CODE_SCHEMA = build_execute_code_schema()
# --- Registry ---

View File

@@ -174,7 +174,14 @@ def _run_single_child(
child_start = time.monotonic()
child_toolsets = _strip_blocked_tools(toolsets or DEFAULT_TOOLSETS)
# When no explicit toolsets given, inherit from parent's enabled toolsets
# so disabled tools (e.g. web) don't leak to subagents.
if toolsets:
child_toolsets = _strip_blocked_tools(toolsets)
elif parent_agent and getattr(parent_agent, "enabled_toolsets", None):
child_toolsets = _strip_blocked_tools(parent_agent.enabled_toolsets)
else:
child_toolsets = _strip_blocked_tools(DEFAULT_TOOLSETS)
child_prompt = _build_child_system_prompt(goal, context)
@@ -493,7 +500,7 @@ DELEGATE_TASK_SCHEMA = {
"items": {"type": "string"},
"description": (
"Toolsets to enable for this subagent. "
"Default: ['terminal', 'file', 'web']. "
"Default: inherits your enabled toolsets. "
"Common patterns: ['terminal', 'file'] for code work, "
"['web'] for research, ['terminal', 'file', 'web'] for "
"full-stack tasks."

View File

@@ -3,16 +3,22 @@
Skills Sync -- Manifest-based seeding and updating of bundled skills.
Copies bundled skills from the repo's skills/ directory into ~/.hermes/skills/
and uses a manifest to track which skills have been offered.
and uses a manifest to track which skills have been synced and their origin hash.
Behavior:
- NEW skills (not in manifest): copied to user dir, added to manifest.
- EXISTING skills (in manifest, present in user dir): UPDATED from bundled.
- DELETED by user (in manifest, absent from user dir): respected -- not re-added.
Manifest format (v2): each line is "skill_name:origin_hash" where origin_hash
is the MD5 of the bundled skill at the time it was last synced to the user dir.
Old v1 manifests (plain names without hashes) are auto-migrated.
Update logic:
- NEW skills (not in manifest): copied to user dir, origin hash recorded.
- EXISTING skills (in manifest, present in user dir):
* If user copy matches origin hash: user hasn't modified it → safe to
update from bundled if bundled changed. New origin hash recorded.
* If user copy differs from origin hash: user customized it → SKIP.
- DELETED by user (in manifest, absent from user dir): respected, not re-added.
- REMOVED from bundled (in manifest, gone from repo): cleaned from manifest.
The manifest lives at ~/.hermes/skills/.bundled_manifest and is a simple
newline-delimited list of skill names that have been offered to the user.
The manifest lives at ~/.hermes/skills/.bundled_manifest.
"""
import hashlib
@@ -20,7 +26,7 @@ import logging
import os
import shutil
from pathlib import Path
from typing import List, Tuple
from typing import Dict, List, Tuple
logger = logging.getLogger(__name__)
@@ -35,27 +41,38 @@ def _get_bundled_dir() -> Path:
return Path(__file__).parent.parent / "skills"
def _read_manifest() -> set:
"""Read the set of skill names already offered to the user."""
def _read_manifest() -> Dict[str, str]:
"""
Read the manifest as a dict of {skill_name: origin_hash}.
Handles both v1 (plain names) and v2 (name:hash) formats.
v1 entries get an empty hash string which triggers migration on next sync.
"""
if not MANIFEST_FILE.exists():
return set()
return {}
try:
return set(
line.strip()
for line in MANIFEST_FILE.read_text(encoding="utf-8").splitlines()
if line.strip()
)
result = {}
for line in MANIFEST_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line:
continue
if ":" in line:
# v2 format: name:hash
name, _, hash_val = line.partition(":")
result[name.strip()] = hash_val.strip()
else:
# v1 format: plain name — empty hash triggers migration
result[line] = ""
return result
except (OSError, IOError):
return set()
return {}
def _write_manifest(names: set):
"""Write the manifest file."""
def _write_manifest(entries: Dict[str, str]):
"""Write the manifest file in v2 format (name:hash)."""
MANIFEST_FILE.parent.mkdir(parents=True, exist_ok=True)
MANIFEST_FILE.write_text(
"\n".join(sorted(names)) + "\n",
encoding="utf-8",
)
lines = [f"{name}:{hash_val}" for name, hash_val in sorted(entries.items())]
MANIFEST_FILE.write_text("\n".join(lines) + "\n", encoding="utf-8")
def _discover_bundled_skills(bundled_dir: Path) -> List[Tuple[str, Path]]:
@@ -105,18 +122,16 @@ def sync_skills(quiet: bool = False) -> dict:
"""
Sync bundled skills into ~/.hermes/skills/ using the manifest.
- NEW skills (not in manifest): copied to user dir, added to manifest.
- EXISTING skills (in manifest, present in user dir): updated from bundled.
- DELETED by user (in manifest, absent from user dir): respected, not re-added.
- REMOVED from bundled (in manifest, gone from repo): cleaned from manifest.
Returns:
dict with keys: copied (list), updated (list), skipped (int),
cleaned (list), total_bundled (int)
user_modified (list), cleaned (list), total_bundled (int)
"""
bundled_dir = _get_bundled_dir()
if not bundled_dir.exists():
return {"copied": [], "updated": [], "skipped": 0, "cleaned": [], "total_bundled": 0}
return {
"copied": [], "updated": [], "skipped": 0,
"user_modified": [], "cleaned": [], "total_bundled": 0,
}
SKILLS_DIR.mkdir(parents=True, exist_ok=True)
manifest = _read_manifest()
@@ -125,52 +140,88 @@ def sync_skills(quiet: bool = False) -> dict:
copied = []
updated = []
user_modified = []
skipped = 0
for skill_name, skill_src in bundled_skills:
dest = _compute_relative_dest(skill_src, bundled_dir)
bundled_hash = _dir_hash(skill_src)
if skill_name not in manifest:
# New skill -- never offered before
# ── New skill never offered before ──
try:
if dest.exists():
# User already has a skill with the same name (unlikely but possible)
# User already has a skill with the same name — don't overwrite
skipped += 1
manifest[skill_name] = bundled_hash
else:
dest.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(skill_src, dest)
copied.append(skill_name)
manifest[skill_name] = bundled_hash
if not quiet:
print(f" + {skill_name}")
except (OSError, IOError) as e:
if not quiet:
print(f" ! Failed to copy {skill_name}: {e}")
manifest.add(skill_name)
# Do NOT add to manifest — next sync should retry
elif dest.exists():
# Existing skill in manifest AND on disk -- check for updates
src_hash = _dir_hash(skill_src)
dst_hash = _dir_hash(dest)
if src_hash != dst_hash:
# ── Existing skill in manifest AND on disk ──
origin_hash = manifest.get(skill_name, "")
user_hash = _dir_hash(dest)
if not origin_hash:
# v1 migration: no origin hash recorded. Set baseline from
# user's current copy so future syncs can detect modifications.
manifest[skill_name] = user_hash
if user_hash == bundled_hash:
skipped += 1 # already in sync
else:
# Can't tell if user modified or bundled changed — be safe
skipped += 1
continue
if user_hash != origin_hash:
# User modified this skill — don't overwrite their changes
user_modified.append(skill_name)
if not quiet:
print(f" ~ {skill_name} (user-modified, skipping)")
continue
# User copy matches origin — check if bundled has a newer version
if bundled_hash != origin_hash:
try:
shutil.rmtree(dest)
shutil.copytree(skill_src, dest)
updated.append(skill_name)
if not quiet:
print(f"{skill_name} (updated)")
# Move old copy to a backup so we can restore on failure
backup = dest.with_suffix(".bak")
shutil.move(str(dest), str(backup))
try:
shutil.copytree(skill_src, dest)
manifest[skill_name] = bundled_hash
updated.append(skill_name)
if not quiet:
print(f"{skill_name} (updated)")
# Remove backup after successful copy
shutil.rmtree(backup, ignore_errors=True)
except (OSError, IOError):
# Restore from backup
if backup.exists() and not dest.exists():
shutil.move(str(backup), str(dest))
raise
except (OSError, IOError) as e:
if not quiet:
print(f" ! Failed to update {skill_name}: {e}")
else:
skipped += 1
skipped += 1 # bundled unchanged, user unchanged
else:
# In manifest but not on disk -- user deleted it, respect that
# ── In manifest but not on disk user deleted it ──
skipped += 1
# Clean stale manifest entries (skills removed from bundled dir)
cleaned = sorted(manifest - bundled_names)
manifest -= set(cleaned)
cleaned = sorted(set(manifest.keys()) - bundled_names)
for name in cleaned:
del manifest[name]
# Also copy DESCRIPTION.md files for categories (if not already present)
for desc_md in bundled_dir.rglob("DESCRIPTION.md"):
@@ -189,6 +240,7 @@ def sync_skills(quiet: bool = False) -> dict:
"copied": copied,
"updated": updated,
"skipped": skipped,
"user_modified": user_modified,
"cleaned": cleaned,
"total_bundled": len(bundled_skills),
}
@@ -197,6 +249,13 @@ def sync_skills(quiet: bool = False) -> dict:
if __name__ == "__main__":
print("Syncing bundled skills into ~/.hermes/skills/ ...")
result = sync_skills(quiet=False)
print(f"\nDone: {len(result['copied'])} new, {len(result['updated'])} updated, "
f"{result['skipped']} unchanged, {len(result['cleaned'])} cleaned from manifest, "
f"{result['total_bundled']} total bundled.")
parts = [
f"{len(result['copied'])} new",
f"{len(result['updated'])} updated",
f"{result['skipped']} unchanged",
]
if result["user_modified"]:
parts.append(f"{len(result['user_modified'])} user-modified (kept)")
if result["cleaned"]:
parts.append(f"{len(result['cleaned'])} cleaned from manifest")
print(f"\nDone: {', '.join(parts)}. {result['total_bundled']} total bundled.")