Compare commits

..

3 Commits

Author SHA1 Message Date
balyan.sid@gmail.com
02c9e7fee2 update skill.md to callout for remote machines 2026-03-13 07:34:29 +05:30
balyan.sid@gmail.com
f77811a8a2 Remove unnecessary comments from X OAuth2 setup script 2026-03-12 23:01:09 +05:30
balyan.sid@gmail.com
1ad8713b2b add xitter skill 2026-03-12 22:41:48 +05:30
39 changed files with 1982 additions and 3127 deletions

View File

@@ -55,7 +55,6 @@ hermes tools # Configure which tools are enabled
hermes config set # Set individual config values
hermes gateway # Start the messaging gateway (Telegram, Discord, etc.)
hermes setup # Run the full setup wizard (configures everything at once)
hermes claw migrate # Migrate from OpenClaw (if coming from OpenClaw)
hermes update # Update to the latest version
hermes doctor # Diagnose any issues
```
@@ -88,35 +87,6 @@ All documentation lives at **[hermes-agent.nousresearch.com/docs](https://hermes
---
## Migrating from OpenClaw
If you're coming from OpenClaw, Hermes can automatically import your settings, memories, skills, and API keys.
**During first-time setup:** The setup wizard (`hermes setup`) automatically detects `~/.openclaw` and offers to migrate before configuration begins.
**Anytime after install:**
```bash
hermes claw migrate # Interactive migration (full preset)
hermes claw migrate --dry-run # Preview what would be migrated
hermes claw migrate --preset user-data # Migrate without secrets
hermes claw migrate --overwrite # Overwrite existing conflicts
```
What gets imported:
- **SOUL.md** — persona file
- **Memories** — MEMORY.md and USER.md entries
- **Skills** — user-created skills → `~/.hermes/skills/openclaw-imports/`
- **Command allowlist** — approval patterns
- **Messaging settings** — platform configs, allowed users, working directory
- **API keys** — allowlisted secrets (Telegram, OpenRouter, OpenAI, Anthropic, ElevenLabs)
- **TTS assets** — workspace audio files
- **Workspace instructions** — AGENTS.md (with `--workspace-target`)
See `hermes claw migrate --help` for all options, or use the `openclaw-migration` skill for an interactive agent-guided migration with dry-run previews.
---
## Contributing
We welcome contributions! See the [Contributing Guide](https://hermes-agent.nousresearch.com/docs/developer-guide/contributing) for development setup, code style, and PR process.
@@ -124,9 +94,8 @@ We welcome contributions! See the [Contributing Guide](https://hermes-agent.nous
Quick start for contributors:
```bash
git clone https://github.com/NousResearch/hermes-agent.git
git clone --recurse-submodules https://github.com/NousResearch/hermes-agent.git
cd hermes-agent
git submodule update --init mini-swe-agent # required terminal backend
curl -LsSf https://astral.sh/uv/install.sh | sh
uv venv .venv --python 3.11
source .venv/bin/activate
@@ -135,12 +104,6 @@ uv pip install -e "./mini-swe-agent"
python -m pytest tests/ -q
```
> **RL Training (optional):** To work on the RL/Tinker-Atropos integration, also run:
> ```bash
> git submodule update --init tinker-atropos
> uv pip install -e "./tinker-atropos"
> ```
---
## Community

23
cli.py
View File

@@ -3608,19 +3608,6 @@ class HermesCLI:
continue
print(f"\n⚡ New message detected, interrupting...")
self.agent.interrupt(interrupt_msg)
# Debug: log to file (stdout may be devnull from redirect_stdout)
try:
import pathlib as _pl
_dbg = _pl.Path.home() / ".hermes" / "interrupt_debug.log"
with open(_dbg, "a") as _f:
import time as _t
_f.write(f"{_t.strftime('%H:%M:%S')} interrupt fired: msg={str(interrupt_msg)[:60]!r}, "
f"children={len(self.agent._active_children)}, "
f"parent._interrupt={self.agent._interrupt_requested}\n")
for _ci, _ch in enumerate(self.agent._active_children):
_f.write(f" child[{_ci}]._interrupt={_ch._interrupt_requested}\n")
except Exception:
pass
break
except queue.Empty:
pass # Queue empty or timeout, continue waiting
@@ -3890,16 +3877,6 @@ class HermesCLI:
payload = (text, images) if images else text
if self._agent_running and not (text and text.startswith("/")):
self._interrupt_queue.put(payload)
# Debug: log to file when message enters interrupt queue
try:
import pathlib as _pl
_dbg = _pl.Path.home() / ".hermes" / "interrupt_debug.log"
with open(_dbg, "a") as _f:
import time as _t
_f.write(f"{_t.strftime('%H:%M:%S')} ENTER: queued interrupt msg={str(payload)[:60]!r}, "
f"agent_running={self._agent_running}\n")
except Exception:
pass
else:
self._pending_input.put(payload)
event.app.current_buffer.reset(append_to_history=True)

View File

@@ -1,110 +0,0 @@
# Migrating from OpenClaw to Hermes Agent
This guide covers how to import your OpenClaw settings, memories, skills, and API keys into Hermes Agent.
## Three Ways to Migrate
### 1. Automatic (during first-time setup)
When you run `hermes setup` for the first time and Hermes detects `~/.openclaw`, it automatically offers to import your OpenClaw data before configuration begins. Just accept the prompt and everything is handled for you.
### 2. CLI Command (quick, scriptable)
```bash
hermes claw migrate # Full migration with confirmation prompt
hermes claw migrate --dry-run # Preview what would happen
hermes claw migrate --preset user-data # Migrate without API keys/secrets
hermes claw migrate --yes # Skip confirmation prompt
```
**All options:**
| Flag | Description |
|------|-------------|
| `--source PATH` | Path to OpenClaw directory (default: `~/.openclaw`) |
| `--dry-run` | Preview only — no files are modified |
| `--preset {user-data,full}` | Migration preset (default: `full`). `user-data` excludes secrets |
| `--overwrite` | Overwrite existing files (default: skip conflicts) |
| `--migrate-secrets` | Include allowlisted secrets (auto-enabled with `full` preset) |
| `--workspace-target PATH` | Copy workspace instructions (AGENTS.md) to this absolute path |
| `--skill-conflict {skip,overwrite,rename}` | How to handle skill name conflicts (default: `skip`) |
| `--yes`, `-y` | Skip confirmation prompts |
### 3. Agent-Guided (interactive, with previews)
Ask the agent to run the migration for you:
```
> Migrate my OpenClaw setup to Hermes
```
The agent will use the `openclaw-migration` skill to:
1. Run a dry-run first to preview changes
2. Ask about conflict resolution (SOUL.md, skills, etc.)
3. Let you choose between `user-data` and `full` presets
4. Execute the migration with your choices
5. Print a detailed summary of what was migrated
## What Gets Migrated
### `user-data` preset
| Item | Source | Destination |
|------|--------|-------------|
| SOUL.md | `~/.openclaw/workspace/SOUL.md` | `~/.hermes/SOUL.md` |
| Memory entries | `~/.openclaw/workspace/MEMORY.md` | `~/.hermes/memories/MEMORY.md` |
| User profile | `~/.openclaw/workspace/USER.md` | `~/.hermes/memories/USER.md` |
| Skills | `~/.openclaw/workspace/skills/` | `~/.hermes/skills/openclaw-imports/` |
| Command allowlist | `~/.openclaw/workspace/exec_approval_patterns.yaml` | Merged into `~/.hermes/config.yaml` |
| Messaging settings | `~/.openclaw/config.yaml` (TELEGRAM_ALLOWED_USERS, MESSAGING_CWD) | `~/.hermes/.env` |
| TTS assets | `~/.openclaw/workspace/tts/` | `~/.hermes/tts/` |
### `full` preset (adds to `user-data`)
| Item | Source | Destination |
|------|--------|-------------|
| Telegram bot token | `~/.openclaw/config.yaml` | `~/.hermes/.env` |
| OpenRouter API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
| OpenAI API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
| Anthropic API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
| ElevenLabs API key | `~/.openclaw/.env` or config | `~/.hermes/.env` |
Only these 6 allowlisted secrets are ever imported. Other credentials are skipped and reported.
## Conflict Handling
By default, the migration **will not overwrite** existing Hermes data:
- **SOUL.md** — skipped if one already exists in `~/.hermes/`
- **Memory entries** — skipped if memories already exist (to avoid duplicates)
- **Skills** — skipped if a skill with the same name already exists
- **API keys** — skipped if the key is already set in `~/.hermes/.env`
To overwrite conflicts, use `--overwrite`. The migration creates backups before overwriting.
For skills, you can also use `--skill-conflict rename` to import conflicting skills under a new name (e.g., `skill-name-imported`).
## Migration Report
Every migration (including dry runs) produces a report showing:
- **Migrated items** — what was successfully imported
- **Conflicts** — items skipped because they already exist
- **Skipped items** — items not found in the source
- **Errors** — items that failed to import
For execute runs, the full report is saved to `~/.hermes/migration/openclaw/<timestamp>/`.
## Troubleshooting
### "OpenClaw directory not found"
The migration looks for `~/.openclaw` by default. If your OpenClaw is installed elsewhere, use `--source`:
```bash
hermes claw migrate --source /path/to/.openclaw
```
### "Migration script not found"
The migration script ships with Hermes Agent. If you installed via pip (not git clone), the `optional-skills/` directory may not be present. Install the skill from the Skills Hub:
```bash
hermes skills install openclaw-migration
```
### Memory overflow
If your OpenClaw MEMORY.md or USER.md exceeds Hermes' character limits, excess entries are exported to an overflow file in the migration report directory. You can manually review and add the most important ones.

View File

@@ -3418,19 +3418,17 @@ class GatewayRunner:
# Monitor for interrupts from the adapter (new messages arriving)
async def monitor_for_interrupt():
adapter = self.adapters.get(source.platform)
if not adapter or not session_key:
if not adapter:
return
chat_id = source.chat_id
while True:
await asyncio.sleep(0.2) # Check every 200ms
# Check if adapter has a pending interrupt for this session.
# Must use session_key (build_session_key output) — NOT
# source.chat_id — because the adapter stores interrupt events
# under the full session key.
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(session_key):
# Check if adapter has a pending interrupt for this session
if hasattr(adapter, 'has_pending_interrupt') and adapter.has_pending_interrupt(chat_id):
agent = agent_holder[0]
if agent:
pending_event = adapter.get_pending_message(session_key)
pending_event = adapter.get_pending_message(chat_id)
pending_text = pending_event.text if pending_event else None
logger.debug("Interrupt detected from adapter, signaling agent...")
agent.interrupt(pending_text)
@@ -3447,11 +3445,10 @@ class GatewayRunner:
result = result_holder[0]
adapter = self.adapters.get(source.platform)
# Get pending message from adapter if interrupted.
# Use session_key (not source.chat_id) to match adapter's storage keys.
# Get pending message from adapter if interrupted
pending = None
if result and result.get("interrupted") and adapter:
pending_event = adapter.get_pending_message(session_key) if session_key else None
pending_event = adapter.get_pending_message(source.chat_id)
if pending_event:
pending = pending_event.text
elif result.get("interrupt_message"):
@@ -3463,8 +3460,8 @@ class GatewayRunner:
# Clear the adapter's interrupt event so the next _run_agent call
# doesn't immediately re-trigger the interrupt before the new agent
# even makes its first API call (this was causing an infinite loop).
if adapter and hasattr(adapter, '_active_sessions') and session_key and session_key in adapter._active_sessions:
adapter._active_sessions[session_key].clear()
if adapter and hasattr(adapter, '_active_sessions') and source.chat_id in adapter._active_sessions:
adapter._active_sessions[source.chat_id].clear()
# Don't send the interrupted response to the user — it's just noise
# like "Operation interrupted." They already know they sent a new

View File

@@ -1,296 +0,0 @@
"""hermes claw — OpenClaw migration commands.
Usage:
hermes claw migrate # Interactive migration from ~/.openclaw
hermes claw migrate --dry-run # Preview what would be migrated
hermes claw migrate --preset full --overwrite # Full migration, overwrite conflicts
"""
import importlib.util
import logging
import sys
from pathlib import Path
from hermes_cli.config import get_hermes_home, get_config_path, load_config, save_config
from hermes_cli.setup import (
Colors,
color,
print_header,
print_info,
print_success,
print_warning,
print_error,
prompt_yes_no,
prompt_choice,
)
logger = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).parent.parent.resolve()
_OPENCLAW_SCRIPT = (
PROJECT_ROOT
/ "optional-skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
# Fallback: user may have installed the skill from the Hub
_OPENCLAW_SCRIPT_INSTALLED = (
get_hermes_home()
/ "skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
def _find_migration_script() -> Path | None:
"""Find the openclaw_to_hermes.py script in known locations."""
for candidate in [_OPENCLAW_SCRIPT, _OPENCLAW_SCRIPT_INSTALLED]:
if candidate.exists():
return candidate
return None
def _load_migration_module(script_path: Path):
"""Dynamically load the migration script as a module."""
spec = importlib.util.spec_from_file_location("openclaw_to_hermes", script_path)
if spec is None or spec.loader is None:
return None
mod = importlib.util.module_from_spec(spec)
# Register in sys.modules so @dataclass can resolve the module
# (Python 3.11+ requires this for dynamically loaded modules)
sys.modules[spec.name] = mod
try:
spec.loader.exec_module(mod)
except Exception:
sys.modules.pop(spec.name, None)
raise
return mod
def claw_command(args):
"""Route hermes claw subcommands."""
action = getattr(args, "claw_action", None)
if action == "migrate":
_cmd_migrate(args)
else:
print("Usage: hermes claw migrate [options]")
print()
print("Commands:")
print(" migrate Migrate settings from OpenClaw to Hermes")
print()
print("Run 'hermes claw migrate --help' for migration options.")
def _cmd_migrate(args):
"""Run the OpenClaw → Hermes migration."""
source_dir = Path(getattr(args, "source", None) or Path.home() / ".openclaw")
dry_run = getattr(args, "dry_run", False)
preset = getattr(args, "preset", "full")
overwrite = getattr(args, "overwrite", False)
migrate_secrets = getattr(args, "migrate_secrets", False)
workspace_target = getattr(args, "workspace_target", None)
skill_conflict = getattr(args, "skill_conflict", "skip")
# If using the "full" preset, secrets are included by default
if preset == "full":
migrate_secrets = True
print()
print(
color(
"┌─────────────────────────────────────────────────────────┐",
Colors.MAGENTA,
)
)
print(
color(
"│ ⚕ Hermes — OpenClaw Migration │",
Colors.MAGENTA,
)
)
print(
color(
"└─────────────────────────────────────────────────────────┘",
Colors.MAGENTA,
)
)
# Check source directory
if not source_dir.is_dir():
print()
print_error(f"OpenClaw directory not found: {source_dir}")
print_info("Make sure your OpenClaw installation is at the expected path.")
print_info(f"You can specify a custom path: hermes claw migrate --source /path/to/.openclaw")
return
# Find the migration script
script_path = _find_migration_script()
if not script_path:
print()
print_error("Migration script not found.")
print_info("Expected at one of:")
print_info(f" {_OPENCLAW_SCRIPT}")
print_info(f" {_OPENCLAW_SCRIPT_INSTALLED}")
print_info("Make sure the openclaw-migration skill is installed.")
return
# Show what we're doing
hermes_home = get_hermes_home()
print()
print_header("Migration Settings")
print_info(f"Source: {source_dir}")
print_info(f"Target: {hermes_home}")
print_info(f"Preset: {preset}")
print_info(f"Mode: {'dry run (preview only)' if dry_run else 'execute'}")
print_info(f"Overwrite: {'yes' if overwrite else 'no (skip conflicts)'}")
print_info(f"Secrets: {'yes (allowlisted only)' if migrate_secrets else 'no'}")
if skill_conflict != "skip":
print_info(f"Skill conflicts: {skill_conflict}")
if workspace_target:
print_info(f"Workspace: {workspace_target}")
print()
# For execute mode (non-dry-run), confirm unless --yes was passed
if not dry_run and not getattr(args, "yes", False):
if not prompt_yes_no("Proceed with migration?", default=True):
print_info("Migration cancelled.")
return
# Ensure config.yaml exists before migration tries to read it
config_path = get_config_path()
if not config_path.exists():
save_config(load_config())
# Load and run the migration
try:
mod = _load_migration_module(script_path)
if mod is None:
print_error("Could not load migration script.")
return
selected = mod.resolve_selected_options(None, None, preset=preset)
ws_target = Path(workspace_target).resolve() if workspace_target else None
migrator = mod.Migrator(
source_root=source_dir.resolve(),
target_root=hermes_home.resolve(),
execute=not dry_run,
workspace_target=ws_target,
overwrite=overwrite,
migrate_secrets=migrate_secrets,
output_dir=None,
selected_options=selected,
preset_name=preset,
skill_conflict_mode=skill_conflict,
)
report = migrator.migrate()
except Exception as e:
print()
print_error(f"Migration failed: {e}")
logger.debug("OpenClaw migration error", exc_info=True)
return
# Print results
_print_migration_report(report, dry_run)
def _print_migration_report(report: dict, dry_run: bool):
"""Print a formatted migration report."""
summary = report.get("summary", {})
migrated = summary.get("migrated", 0)
skipped = summary.get("skipped", 0)
conflicts = summary.get("conflict", 0)
errors = summary.get("error", 0)
total = migrated + skipped + conflicts + errors
print()
if dry_run:
print_header("Dry Run Results")
print_info("No files were modified. This is a preview of what would happen.")
else:
print_header("Migration Results")
print()
# Detailed items
items = report.get("items", [])
if items:
# Group by status
migrated_items = [i for i in items if i.get("status") == "migrated"]
skipped_items = [i for i in items if i.get("status") == "skipped"]
conflict_items = [i for i in items if i.get("status") == "conflict"]
error_items = [i for i in items if i.get("status") == "error"]
if migrated_items:
label = "Would migrate" if dry_run else "Migrated"
print(color(f"{label}:", Colors.GREEN))
for item in migrated_items:
kind = item.get("kind", "unknown")
dest = item.get("destination", "")
if dest:
dest_short = str(dest).replace(str(Path.home()), "~")
print(f" {kind:<22s}{dest_short}")
else:
print(f" {kind}")
print()
if conflict_items:
print(color(f" ⚠ Conflicts (skipped — use --overwrite to force):", Colors.YELLOW))
for item in conflict_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "already exists")
print(f" {kind:<22s} {reason}")
print()
if skipped_items:
print(color(f" ─ Skipped:", Colors.DIM))
for item in skipped_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "")
print(f" {kind:<22s} {reason}")
print()
if error_items:
print(color(f" ✗ Errors:", Colors.RED))
for item in error_items:
kind = item.get("kind", "unknown")
reason = item.get("reason", "unknown error")
print(f" {kind:<22s} {reason}")
print()
# Summary line
parts = []
if migrated:
action = "would migrate" if dry_run else "migrated"
parts.append(f"{migrated} {action}")
if conflicts:
parts.append(f"{conflicts} conflict(s)")
if skipped:
parts.append(f"{skipped} skipped")
if errors:
parts.append(f"{errors} error(s)")
if parts:
print_info(f"Summary: {', '.join(parts)}")
else:
print_info("Nothing to migrate.")
# Output directory
output_dir = report.get("output_dir")
if output_dir:
print_info(f"Full report saved to: {output_dir}")
if dry_run:
print()
print_info("To execute the migration, run without --dry-run:")
print_info(f" hermes claw migrate --preset {report.get('preset', 'full')}")
elif migrated:
print()
print_success("Migration complete!")

View File

@@ -22,8 +22,6 @@ Usage:
hermes update # Update to latest version
hermes uninstall # Uninstall Hermes Agent
hermes sessions browse # Interactive session picker with search
hermes claw migrate # Migrate from OpenClaw to Hermes
hermes claw migrate --dry-run # Preview migration without changes
"""
import argparse
@@ -2401,7 +2399,7 @@ For more help on a command:
skills_inspect.add_argument("identifier", help="Skill identifier")
skills_list = skills_subparsers.add_parser("list", help="List installed skills")
skills_list.add_argument("--source", default="all", choices=["all", "hub", "builtin", "local"])
skills_list.add_argument("--source", default="all", choices=["all", "hub", "builtin"])
skills_audit = skills_subparsers.add_parser("audit", help="Re-scan installed hub skills")
skills_audit.add_argument("name", nargs="?", help="Specific skill to audit (default: all)")
@@ -2685,69 +2683,6 @@ For more help on a command:
insights_parser.set_defaults(func=cmd_insights)
# =========================================================================
# claw command (OpenClaw migration)
# =========================================================================
claw_parser = subparsers.add_parser(
"claw",
help="OpenClaw migration tools",
description="Migrate settings, memories, skills, and API keys from OpenClaw to Hermes"
)
claw_subparsers = claw_parser.add_subparsers(dest="claw_action")
# claw migrate
claw_migrate = claw_subparsers.add_parser(
"migrate",
help="Migrate from OpenClaw to Hermes",
description="Import settings, memories, skills, and API keys from an OpenClaw installation"
)
claw_migrate.add_argument(
"--source",
help="Path to OpenClaw directory (default: ~/.openclaw)"
)
claw_migrate.add_argument(
"--dry-run",
action="store_true",
help="Preview what would be migrated without making changes"
)
claw_migrate.add_argument(
"--preset",
choices=["user-data", "full"],
default="full",
help="Migration preset (default: full). 'user-data' excludes secrets"
)
claw_migrate.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing files (default: skip conflicts)"
)
claw_migrate.add_argument(
"--migrate-secrets",
action="store_true",
help="Include allowlisted secrets (TELEGRAM_BOT_TOKEN, API keys, etc.)"
)
claw_migrate.add_argument(
"--workspace-target",
help="Absolute path to copy workspace instructions into"
)
claw_migrate.add_argument(
"--skill-conflict",
choices=["skip", "overwrite", "rename"],
default="skip",
help="How to handle skill name conflicts (default: skip)"
)
claw_migrate.add_argument(
"--yes", "-y",
action="store_true",
help="Skip confirmation prompts"
)
def cmd_claw(args):
from hermes_cli.claw import claw_command
claw_command(args)
claw_parser.set_defaults(func=cmd_claw)
# =========================================================================
# version command
# =========================================================================

View File

@@ -11,7 +11,6 @@ Modular wizard with independently-runnable sections:
Config files are stored in ~/.hermes/ for easy access.
"""
import importlib.util
import logging
import os
import sys
@@ -2020,114 +2019,6 @@ def setup_tools(config: dict, first_install: bool = False):
tools_command(first_install=first_install, config=config)
# =============================================================================
# OpenClaw Migration
# =============================================================================
_OPENCLAW_SCRIPT = (
PROJECT_ROOT
/ "optional-skills"
/ "migration"
/ "openclaw-migration"
/ "scripts"
/ "openclaw_to_hermes.py"
)
def _offer_openclaw_migration(hermes_home: Path) -> bool:
"""Detect ~/.openclaw and offer to migrate during first-time setup.
Returns True if migration ran successfully, False otherwise.
"""
openclaw_dir = Path.home() / ".openclaw"
if not openclaw_dir.is_dir():
return False
if not _OPENCLAW_SCRIPT.exists():
return False
print()
print_header("OpenClaw Installation Detected")
print_info(f"Found OpenClaw data at {openclaw_dir}")
print_info("Hermes can import your settings, memories, skills, and API keys.")
print()
if not prompt_yes_no("Would you like to import from OpenClaw?", default=True):
print_info(
"Skipping migration. You can run it later via the openclaw-migration skill."
)
return False
# Ensure config.yaml exists before migration tries to read it
config_path = get_config_path()
if not config_path.exists():
save_config(load_config())
# Dynamically load the migration script
try:
spec = importlib.util.spec_from_file_location(
"openclaw_to_hermes", _OPENCLAW_SCRIPT
)
if spec is None or spec.loader is None:
print_warning("Could not load migration script.")
return False
mod = importlib.util.module_from_spec(spec)
# Register in sys.modules so @dataclass can resolve the module
# (Python 3.11+ requires this for dynamically loaded modules)
import sys as _sys
_sys.modules[spec.name] = mod
try:
spec.loader.exec_module(mod)
except Exception:
_sys.modules.pop(spec.name, None)
raise
# Run migration with the "full" preset, execute mode, no overwrite
selected = mod.resolve_selected_options(None, None, preset="full")
migrator = mod.Migrator(
source_root=openclaw_dir.resolve(),
target_root=hermes_home.resolve(),
execute=True,
workspace_target=None,
overwrite=False,
migrate_secrets=True,
output_dir=None,
selected_options=selected,
preset_name="full",
)
report = migrator.migrate()
except Exception as e:
print_warning(f"Migration failed: {e}")
logger.debug("OpenClaw migration error", exc_info=True)
return False
# Print summary
summary = report.get("summary", {})
migrated = summary.get("migrated", 0)
skipped = summary.get("skipped", 0)
conflicts = summary.get("conflict", 0)
errors = summary.get("error", 0)
print()
if migrated:
print_success(f"Imported {migrated} item(s) from OpenClaw.")
if conflicts:
print_info(f"Skipped {conflicts} item(s) that already exist in Hermes.")
if skipped:
print_info(f"Skipped {skipped} item(s) (not found or unchanged).")
if errors:
print_warning(f"{errors} item(s) had errors — check the migration report.")
output_dir = report.get("output_dir")
if output_dir:
print_info(f"Full report saved to: {output_dir}")
print_success("Migration complete! Continuing with setup...")
return True
# =============================================================================
# Main Wizard Orchestrator
# =============================================================================
@@ -2294,11 +2185,6 @@ def run_setup_wizard(args):
print()
return
# Offer OpenClaw migration before configuration begins
if _offer_openclaw_migration(hermes_home):
# Reload config in case migration wrote to it
config = load_config()
# ── Full Setup — run all sections ──
print_header("Configuration Location")
print_info(f"Config file: {get_config_path()}")

View File

@@ -407,16 +407,14 @@ def do_inspect(identifier: str, console: Optional[Console] = None) -> None:
def do_list(source_filter: str = "all", console: Optional[Console] = None) -> None:
"""List installed skills, distinguishing hub, builtin, and local skills."""
"""List installed skills, distinguishing builtins from hub-installed."""
from tools.skills_hub import HubLockFile, ensure_hub_dirs
from tools.skills_sync import _read_manifest
from tools.skills_tool import _find_all_skills
c = console or _console
ensure_hub_dirs()
lock = HubLockFile()
hub_installed = {e["name"]: e for e in lock.list_installed()}
builtin_names = set(_read_manifest())
all_skills = _find_all_skills()
@@ -426,42 +424,30 @@ def do_list(source_filter: str = "all", console: Optional[Console] = None) -> No
table.add_column("Source", style="dim")
table.add_column("Trust", style="dim")
hub_count = 0
builtin_count = 0
local_count = 0
for skill in sorted(all_skills, key=lambda s: (s.get("category") or "", s["name"])):
name = skill["name"]
category = skill.get("category", "")
hub_entry = hub_installed.get(name)
if hub_entry:
source_type = "hub"
source_display = hub_entry.get("source", "hub")
trust = hub_entry.get("trust_level", "community")
hub_count += 1
elif name in builtin_names:
source_type = "builtin"
else:
source_display = "builtin"
trust = "builtin"
builtin_count += 1
else:
source_type = "local"
source_display = "local"
trust = "local"
local_count += 1
if source_filter != "all" and source_filter != source_type:
if source_filter == "hub" and not hub_entry:
continue
if source_filter == "builtin" and hub_entry:
continue
trust_style = {"builtin": "bright_cyan", "trusted": "green", "community": "yellow", "local": "dim"}.get(trust, "dim")
trust_style = {"builtin": "bright_cyan", "trusted": "green", "community": "yellow"}.get(trust, "dim")
trust_label = "official" if source_display == "official" else trust
table.add_row(name, category, source_display, f"[{trust_style}]{trust_label}[/]")
c.print(table)
c.print(
f"[dim]{hub_count} hub-installed, {builtin_count} builtin, {local_count} local[/]\n"
)
c.print(f"[dim]{len(hub_installed)} hub-installed, "
f"{len(all_skills) - len(hub_installed)} builtin[/]\n")
def do_audit(name: Optional[str] = None, console: Optional[Console] = None) -> None:
@@ -1028,7 +1014,7 @@ def _print_skills_help(console: Console) -> None:
" [cyan]search[/] <query> Search registries for skills\n"
" [cyan]install[/] <identifier> Install a skill (with security scan)\n"
" [cyan]inspect[/] <identifier> Preview a skill without installing\n"
" [cyan]list[/] [--source hub|builtin|local] List installed skills\n"
" [cyan]list[/] [--source hub|builtin] List installed skills\n"
" [cyan]audit[/] [name] Re-scan hub skills for security\n"
" [cyan]uninstall[/] <name> Remove a hub-installed skill\n"
" [cyan]publish[/] <path> --repo <r> Publish a skill to GitHub via PR\n"

View File

@@ -14,22 +14,6 @@ metadata:
Use this skill when a user wants to move their OpenClaw setup into Hermes Agent with minimal manual cleanup.
## CLI Command
For a quick, non-interactive migration, use the built-in CLI command:
```bash
hermes claw migrate # Full interactive migration
hermes claw migrate --dry-run # Preview what would be migrated
hermes claw migrate --preset user-data # Migrate without secrets
hermes claw migrate --overwrite # Overwrite existing conflicts
hermes claw migrate --source /custom/path/.openclaw # Custom source
```
The CLI command runs the same migration script described below. Use this skill (via the agent) when you want an interactive, guided migration with dry-run previews and per-item conflict resolution.
**First-time setup:** The `hermes setup` wizard automatically detects `~/.openclaw` and offers migration before configuration begins.
## What this skill does
It uses `scripts/openclaw_to_hermes.py` to:

View File

@@ -53,13 +53,6 @@ pty = [
honcho = ["honcho-ai>=2.0.1"]
mcp = ["mcp>=1.2.0"]
homeassistant = ["aiohttp>=3.9.0"]
rl = [
"atroposlib @ git+https://github.com/NousResearch/atropos.git",
"tinker @ git+https://github.com/thinking-machines-lab/tinker.git",
"fastapi>=0.104.0",
"uvicorn[standard]>=0.24.0",
"wandb>=0.15.0",
]
yc-bench = ["yc-bench @ git+https://github.com/collinear-ai/yc-bench.git"]
all = [
"hermes-agent[modal]",

View File

@@ -2543,31 +2543,6 @@ class AIAgent:
return msg
@staticmethod
def _sanitize_tool_calls_for_strict_api(api_msg: dict) -> dict:
"""Strip Codex Responses API fields from tool_calls for strict providers.
Providers like Mistral strictly validate the Chat Completions schema
and reject unknown fields (call_id, response_item_id) with 422.
These fields are preserved in the internal message history — this
method only modifies the outgoing API copy.
Creates new tool_call dicts rather than mutating in-place, so the
original messages list retains call_id/response_item_id for Codex
Responses API compatibility (e.g. if the session falls back to a
Codex provider later).
"""
tool_calls = api_msg.get("tool_calls")
if not isinstance(tool_calls, list):
return api_msg
_STRIP_KEYS = {"call_id", "response_item_id"}
api_msg["tool_calls"] = [
{k: v for k, v in tc.items() if k not in _STRIP_KEYS}
if isinstance(tc, dict) else tc
for tc in tool_calls
]
return api_msg
def flush_memories(self, messages: list = None, min_turns: int = None):
"""Give the model one turn to persist memories before context is lost.
@@ -2605,7 +2580,6 @@ class AIAgent:
try:
# Build API messages for the flush call
_is_strict_api = "api.mistral.ai" in self.base_url.lower()
api_messages = []
for msg in messages:
api_msg = msg.copy()
@@ -2616,8 +2590,6 @@ class AIAgent:
api_msg.pop("reasoning", None)
api_msg.pop("finish_reason", None)
api_msg.pop("_flush_sentinel", None)
if _is_strict_api:
self._sanitize_tool_calls_for_strict_api(api_msg)
api_messages.append(api_msg)
if self._cached_system_prompt:
@@ -3083,14 +3055,11 @@ class AIAgent:
try:
# Build API messages, stripping internal-only fields
# (finish_reason, reasoning) that strict APIs like Mistral reject with 422
_is_strict_api = "api.mistral.ai" in self.base_url.lower()
api_messages = []
for msg in messages:
api_msg = msg.copy()
for internal_field in ("reasoning", "finish_reason"):
api_msg.pop(internal_field, None)
if _is_strict_api:
self._sanitize_tool_calls_for_strict_api(api_msg)
api_messages.append(api_msg)
effective_system = self._cached_system_prompt or ""
@@ -3469,12 +3438,6 @@ class AIAgent:
# Remove finish_reason - not accepted by strict APIs (e.g. Mistral)
if "finish_reason" in api_msg:
api_msg.pop("finish_reason")
# Strip Codex Responses API fields (call_id, response_item_id) for
# strict providers like Mistral that reject unknown fields with 422.
# Uses new dicts so the internal messages list retains the fields
# for Codex Responses compatibility.
if "api.mistral.ai" in self.base_url.lower():
self._sanitize_tool_calls_for_strict_api(api_msg)
# Keep 'reasoning_details' - OpenRouter uses this for multi-turn reasoning context
# The signature field helps maintain reasoning continuity
api_messages.append(api_msg)

View File

@@ -572,16 +572,17 @@ clone_repo() {
fi
else
# Try SSH first (for private repo access), fall back to HTTPS
# Use --recurse-submodules to also clone mini-swe-agent and tinker-atropos
# GIT_SSH_COMMAND disables interactive prompts and sets a short timeout
# so SSH fails fast instead of hanging when no key is configured.
log_info "Trying SSH clone..."
if GIT_SSH_COMMAND="ssh -o BatchMode=yes -o ConnectTimeout=5" \
git clone --branch "$BRANCH" "$REPO_URL_SSH" "$INSTALL_DIR" 2>/dev/null; then
git clone --branch "$BRANCH" --recurse-submodules "$REPO_URL_SSH" "$INSTALL_DIR" 2>/dev/null; then
log_success "Cloned via SSH"
else
rm -rf "$INSTALL_DIR" 2>/dev/null # Clean up partial SSH clone
log_info "SSH failed, trying HTTPS..."
if git clone --branch "$BRANCH" "$REPO_URL_HTTPS" "$INSTALL_DIR"; then
if git clone --branch "$BRANCH" --recurse-submodules "$REPO_URL_HTTPS" "$INSTALL_DIR"; then
log_success "Cloned via HTTPS"
else
log_error "Failed to clone repository"
@@ -592,12 +593,10 @@ clone_repo() {
cd "$INSTALL_DIR"
# Only init mini-swe-agent (terminal tool backend — required).
# tinker-atropos (RL training) is optional and heavy — users can opt in later
# with: git submodule update --init tinker-atropos && uv pip install -e ./tinker-atropos
log_info "Initializing mini-swe-agent submodule (terminal backend)..."
git submodule update --init mini-swe-agent
log_success "Submodule ready"
# Ensure submodules are initialized and updated (for existing installs or if --recurse failed)
log_info "Initializing submodules (mini-swe-agent, tinker-atropos)..."
git submodule update --init --recursive
log_success "Submodules ready"
log_success "Repository ready"
}
@@ -680,11 +679,12 @@ install_deps() {
log_warn "mini-swe-agent not found (run: git submodule update --init)"
fi
# tinker-atropos (RL training) is optional — skip by default.
# To enable RL tools: git submodule update --init tinker-atropos && uv pip install -e "./tinker-atropos"
log_info "Installing tinker-atropos (RL training backend)..."
if [ -d "tinker-atropos" ] && [ -f "tinker-atropos/pyproject.toml" ]; then
log_info "tinker-atropos submodule found — skipping install (optional, for RL training)"
log_info " To install: $UV_CMD pip install -e \"./tinker-atropos\""
$UV_CMD pip install -e "./tinker-atropos" || log_warn "tinker-atropos install failed (RL tools may not work)"
log_success "tinker-atropos installed"
else
log_warn "tinker-atropos not found (run: git submodule update --init)"
fi
log_success "All dependencies installed"

22
skills/xitter/README.md Normal file
View File

@@ -0,0 +1,22 @@
# xitter
X/Twitter skill for Hermes Agent, powered by [x-cli](https://github.com/Infatoshi/x-cli).
## Credits
The bundled `x-cli/` is a patched fork of **Infatoshi's** work:
- **x-cli** (CLI tool): https://github.com/Infatoshi/x-cli
- **x-mcp** (MCP server): https://github.com/Infatoshi/x-mcp
The patch adds OAuth 2.0 PKCE support for the X Bookmarks API, adapting
the token exchange and refresh flow from x-mcp's `oauth2.ts` into x-cli's
Python `OAuth2Manager`.
## What's Changed from Upstream
- `auth.py`: Added `OAuth2Manager` class with PKCE token refresh
- `api.py`: Added `_oauth2_request()` for bookmark endpoints (`get_bookmarks`, `bookmark_tweet`, `unbookmark_tweet`)
- `cli.py`: Added `me bookmarks`, `me bookmark`, `me unbookmark` commands
Everything else (OAuth 1.0a signing, Bearer token auth, formatters, utils) is upstream as-is.

225
skills/xitter/SKILL.md Normal file
View File

@@ -0,0 +1,225 @@
---
name: xitter
description: "Post tweets, read timelines, search, bookmark, and engage on X/Twitter via the x-cli command-line tool. Use this skill whenever the user wants to interact with X/Twitter — posting, reading timelines, searching tweets, managing bookmarks, liking, retweeting, looking up users, or checking mentions. Also trigger when the user mentions 'tweet', 'X', 'Twitter', or any social media posting task targeting X."
version: 1.0.0
author: alt-glitch (x-cli upstream: Infatoshi)
platforms: [linux, macos]
metadata:
hermes:
tags: [twitter, x, social-media]
requires_toolsets: [terminal]
---
# Xitter — X/Twitter CLI Skill
Interact with X/Twitter through `x-cli`, a Python CLI that talks directly to the X API v2. Supports posting, reading, searching, engagement, and bookmarks.
> **Pay-per-use X API tier required.** The free tier does not support most endpoints and returns misleading 403 errors. You need at least the Basic tier ($200/month) from https://developer.x.com/en/portal/products
## When to Use
Any X/Twitter task:
- Posting tweets, replies, quote tweets, polls
- Reading timelines, mentions, user profiles
- Searching tweets
- Managing bookmarks (add, remove, list)
- Liking and retweeting
- Looking up followers/following
## Setup
If `x-cli` is not installed or the user hasn't configured credentials yet, walk them through this setup. Each step has direct links — give them to the user verbatim.
### Step 1: Install x-cli
Install from the bundled source in this skill directory:
```bash
uv tool install <SKILL_DIR>/x-cli/
```
Replace `<SKILL_DIR>` with the absolute path to this skill's directory.
Verify: `x-cli --help` should show the command list.
### Step 2: Create an X Developer App
Direct the user to: **https://developer.x.com/en/portal/dashboard**
1. Sign in with their X account
2. If no developer account exists, sign up (free tier exists but **pay-per-use is required** for API access — see note above)
3. Go to **Apps** in the left sidebar → **Create App**
4. Enter any app name (e.g. `hermes-xitter`)
5. After creation, three credentials appear on screen:
- **API Key** (Consumer Key) → this is `X_API_KEY`
- **API Secret** (Consumer Secret) → this is `X_API_SECRET`
- **Bearer Token** → this is `X_BEARER_TOKEN`
**Tell the user to save all three immediately. The secret won't be shown again.**
### Step 3: Enable Write Permissions
Without this, posting/liking/retweeting fails with a 403 error.
On the app's page in the developer portal:
1. Scroll to **User authentication settings** → click **Set up**
2. Set these values:
- **App permissions**: **Read and write** (NOT just Read)
- **Type of App**: **Web App, Automated App or Bot**
- **Callback URI / Redirect URL**: `http://127.0.0.1:3219/callback`
- **Website URL**: `https://example.com` (any valid URL)
3. Click **Save**
It will show an OAuth 2.0 Client Secret — save it for Step 6.
### Step 4: Generate Access Token & Secret
**This MUST be done AFTER Step 3.** If tokens existed before enabling write perms, they must be regenerated.
1. Go to the app's **Keys and Tokens** page: **https://developer.x.com/en/portal/dashboard** → click app → **Keys and tokens** tab
2. Under **Access Token and Secret** → click **Generate** (or **Regenerate**)
3. Save both:
- **Access Token** → `X_ACCESS_TOKEN`
- **Access Token Secret** → `X_ACCESS_TOKEN_SECRET`
4. **Verify** the Access Token section shows **"Read and Write"**, not just "Read"
### Step 5: Save Credentials
Append these 5 variables to `~/.hermes/.env`:
```bash
X_API_KEY=<API Key from Step 2>
X_API_SECRET=<API Secret from Step 2>
X_BEARER_TOKEN=<Bearer Token from Step 2>
X_ACCESS_TOKEN=<Access Token from Step 4>
X_ACCESS_TOKEN_SECRET=<Access Token Secret from Step 4>
```
Test with: `x-cli me mentions` — should return recent mentions (or an empty list).
### Step 6: OAuth2 PKCE Setup (for Bookmarks)
Bookmarks use a separate OAuth 2.0 flow. This step requires a browser.
**If running over SSH**: The setup script starts a local callback server on `127.0.0.1:3219`. For the browser redirect to reach the remote machine, the user must set up SSH port forwarding first:
```bash
ssh -L 3219:127.0.0.1:3219 <user>@<host>
```
Then they can open the printed URL in their local browser and the callback will tunnel through. If they're already in an SSH session, they can add the tunnel from another terminal.
**If running natively on Mac/Linux**: The script will open the browser automatically. No extra steps needed.
1. In the developer portal (**https://developer.x.com/en/portal/dashboard** → app → **Keys and tokens** tab), find **OAuth 2.0 Client ID and Client Secret**. Generate them if they don't exist yet.
2. Run the setup script:
```bash
uv run <SKILL_DIR>/scripts/x-oauth2-setup.py
```
3. It will ask for Client ID and Client Secret, open the browser (or print the URL if no browser is available) for authorization, then automatically:
- Save `X_OAUTH2_CLIENT_ID` and `X_OAUTH2_CLIENT_SECRET` to `~/.hermes/.env`
- Save tokens to `~/.config/x-cli/.oauth2-tokens.json`
Test with: `x-cli me bookmarks` — should return bookmarked tweets.
### Step 7: Token Refresh Cron
OAuth2 access tokens expire every 2 hours. Set up an hourly cron to keep them alive:
Create a hermes scheduled task:
- **Schedule**: every 1 hour
- **Command**: `uv run <SKILL_DIR>/scripts/refresh-oauth2.py`
- **Delivery**: local (silent on success)
If the refresh token itself dies (~6 months or revocation), the script exits with code 1 and prints a message. The user will need to re-run `x-oauth2-setup.py`.
## Command Reference
### Tweet Commands (`x-cli tweet <action>`)
| Command | Args | Flags | Description |
|---------|------|-------|-------------|
| `post` | `TEXT` | `--poll OPTIONS` `--poll-duration MINS` | Post a tweet (optionally with poll) |
| `get` | `ID_OR_URL` | | Fetch a tweet with metadata |
| `delete` | `ID_OR_URL` | | Delete a tweet |
| `reply` | `ID_OR_URL` `TEXT` | | Reply to a tweet (restricted — see Pitfalls) |
| `quote` | `ID_OR_URL` `TEXT` | | Quote-retweet a tweet |
| `search` | `QUERY` | `--max N` | Search recent tweets (last 7 days) |
| `metrics` | `ID_OR_URL` | | Get engagement metrics |
### User Commands (`x-cli user <action>`)
| Command | Args | Flags | Description |
|---------|------|-------|-------------|
| `get` | `USERNAME` | | Look up a user profile |
| `timeline` | `USERNAME` | `--max N` | Get a user's recent posts |
| `followers` | `USERNAME` | `--max N` | List a user's followers |
| `following` | `USERNAME` | `--max N` | List who a user follows |
### Self Commands (`x-cli me <action>`)
| Command | Args | Flags | Description |
|---------|------|-------|-------------|
| `mentions` | | `--max N` | Your recent mentions |
| `bookmarks` | | `--max N` | Your bookmarks (OAuth2) |
| `bookmark` | `ID_OR_URL` | | Bookmark a tweet (OAuth2) |
| `unbookmark` | `ID_OR_URL` | | Remove a bookmark (OAuth2) |
### Top-Level Commands
| Command | Args | Description |
|---------|------|-------------|
| `like` | `ID_OR_URL` | Like a tweet |
| `retweet` | `ID_OR_URL` | Retweet a tweet |
### Output Flags
All commands accept these flags (placed before the subcommand, e.g. `x-cli -j user get ...`):
- `-j` / `--json` — Raw JSON output (add `-v` for full response including `includes` and `meta`)
- `-p` / `--plain` — TSV format for piping
- `-md` / `--markdown` — Markdown tables/headings
- `-v` / `--verbose` — Include timestamps, metrics, metadata, pagination tokens
- Default: TSV (`-p`) — agent-friendly tab-separated output. Use `-j` when you need structured data for parsing.
### Search Query Syntax
The `search` command supports X's full query language:
- `from:username` — posts by a user
- `to:username` — replies to a user
- `#hashtag` — hashtag search
- `"exact phrase"` — exact match
- `has:media` / `has:links` / `has:images`
- `is:reply` / `-is:retweet`
- `lang:en` — language filter
- Combine with spaces (AND) or `OR`
## Auth Architecture
x-cli uses three auth methods depending on the endpoint:
| Method | Endpoints | Credentials |
|--------|-----------|-------------|
| **Bearer Token** | Public reads: `get_tweet`, `search`, `get_user`, `get_timeline`, `get_followers`, `get_following` | `X_BEARER_TOKEN` |
| **OAuth 1.0a** | Writes + authenticated reads: `post`, `delete`, `like`, `retweet`, `reply`, `quote`, `mentions`, `metrics` | `X_API_KEY`, `X_API_SECRET`, `X_ACCESS_TOKEN`, `X_ACCESS_TOKEN_SECRET` |
| **OAuth 2.0 PKCE** | Bookmarks only: `bookmarks`, `bookmark`, `unbookmark` | `X_OAUTH2_CLIENT_ID`, `X_OAUTH2_CLIENT_SECRET` + token file |
## Credential Locations
| What | Where | Written by |
|------|-------|-----------|
| API keys (7 vars) | `~/.hermes/.env` | User (Steps 2-5) + setup script (Step 6) |
| OAuth2 tokens | `~/.config/x-cli/.oauth2-tokens.json` | `x-oauth2-setup.py`, then auto-refreshed by cron |
## Pitfalls
**Pay-per-use API required**: The free tier returns 403 errors on most endpoints. The error message says "oauth1-permissions" which is misleading — the real issue is the API tier. Basic tier costs $200/month.
**403 "oauth1-permissions"**: If you're on the right tier and still get this, the Access Token was generated before write permissions were enabled. Fix: go to the app's User Authentication Settings, confirm "Read and write" is set, then **regenerate** the Access Token and Secret.
**Reply restrictions**: Since Feb 2024, X restricts programmatic replies. `x-cli tweet reply` only works if the original tweet's author @mentioned you or quoted your post. For everything else, use `x-cli tweet quote` instead.
**OAuth2 token expiry**: Access tokens last 2 hours. The hourly cron (Step 7) handles this. If the cron isn't running, `x-cli me bookmarks` will fail with a RuntimeError. The refresh token itself lasts ~6 months — if it dies, re-run `x-oauth2-setup.py`.
**Rate limits**: X API has per-endpoint rate limits. When hit, the error includes a reset timestamp. Wait until then.

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""
Refresh X/Twitter OAuth2 tokens. Intended to run as an hourly cron job.
Access tokens expire every 2h. This script refreshes them proactively
so bookmark operations never hit an expired token.
Exit codes:
0 — tokens refreshed or still valid
1 — refresh token is dead (user must re-run x-oauth2-setup.py)
Usage:
uv run refresh-oauth2.py
"""
# /// script
# requires-python = ">=3.11"
# dependencies = ["httpx", "python-dotenv"]
# ///
import base64
import json
import sys
import time
from pathlib import Path
import httpx
from dotenv import load_dotenv
import os
TOKEN_URL = "https://api.twitter.com/2/oauth2/token"
HERMES_ENV = Path.home() / ".hermes" / ".env"
TOKEN_FILE = Path.home() / ".config" / "x-cli" / ".oauth2-tokens.json"
EXPIRY_BUFFER_MS = 60_000 # refresh 60s before actual expiry
def main() -> int:
# Load credentials from ~/.hermes/.env
if HERMES_ENV.exists():
load_dotenv(HERMES_ENV)
client_id = os.environ.get("X_OAUTH2_CLIENT_ID", "")
client_secret = os.environ.get("X_OAUTH2_CLIENT_SECRET", "")
if not client_id or not client_secret:
print("ERROR: X_OAUTH2_CLIENT_ID and X_OAUTH2_CLIENT_SECRET not found in ~/.hermes/.env")
return 1
# Load tokens
if not TOKEN_FILE.exists():
print("ERROR: No token file at ~/.config/x-cli/.oauth2-tokens.json")
print("Run x-oauth2-setup.py first.")
return 1
tokens = json.loads(TOKEN_FILE.read_text())
now_ms = int(time.time() * 1000)
# Check if still valid (with 60s buffer)
if now_ms < (tokens["expires_at"] - EXPIRY_BUFFER_MS):
remaining_min = (tokens["expires_at"] - now_ms) / 60_000
print(f"OK: token still valid ({remaining_min:.0f}min remaining)")
return 0
# Refresh
print("Token expired or expiring soon. Refreshing...")
raw = f"{client_id}:{client_secret}"
basic_auth = f"Basic {base64.b64encode(raw.encode()).decode()}"
from urllib.parse import urlencode
body = urlencode({
"grant_type": "refresh_token",
"refresh_token": tokens["refresh_token"],
"client_id": client_id,
})
try:
resp = httpx.post(
TOKEN_URL,
content=body,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": basic_auth,
},
timeout=30.0,
)
except Exception as e:
print(f"ERROR: network request failed: {e}")
return 1
if resp.status_code != 200:
print(f"ERROR: refresh failed with status {resp.status_code}")
print(resp.text)
if resp.status_code == 401:
print("\nRefresh token is dead. Re-run x-oauth2-setup.py to get new tokens.")
TOKEN_FILE.unlink(missing_ok=True)
return 1
data = resp.json()
new_tokens = {
"access_token": data["access_token"],
"refresh_token": data.get("refresh_token", tokens["refresh_token"]),
"expires_at": int(time.time() * 1000) + data.get("expires_in", 7200) * 1000,
}
TOKEN_FILE.write_text(json.dumps(new_tokens, indent=2))
print("OK: tokens refreshed successfully")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""
One-time OAuth2 PKCE setup for X/Twitter bookmarks.
Run this on a machine where you have a browser and are logged into X.
Usage:
uv run x-oauth2-setup.py
It will ask for your Client ID and Client Secret, open your browser,
and save the tokens automatically.
To get Client ID + Secret:
1. Go to https://developer.x.com/en/portal/dashboard
2. Click your app -> "Keys and tokens" tab
3. Under "OAuth 2.0 Client ID and Client Secret" -> generate/copy both
4. Make sure your app has "Read and Write" permissions
5. Under "User authentication settings":
- Type: "Web App, Automated App or Bot"
- Callback URL: http://127.0.0.1:3219/callback
- Website URL: anything (e.g. https://example.com)
"""
# /// script
# requires-python = ">=3.11"
# dependencies = ["httpx"]
# ///
import base64
import hashlib
import json
import os
import secrets
import time
import webbrowser
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from urllib.parse import parse_qs, urlencode, urlparse
AUTH_URL = "https://twitter.com/i/oauth2/authorize"
TOKEN_URL = "https://api.twitter.com/2/oauth2/token"
REDIRECT_URI = "http://127.0.0.1:3219/callback"
SCOPES = "bookmark.read bookmark.write tweet.read users.read offline.access"
HERMES_ENV = Path.home() / ".hermes" / ".env"
TOKEN_FILE = Path.home() / ".config" / "x-cli" / ".oauth2-tokens.json"
# PKCE: generate verifier + challenge
code_verifier = base64.urlsafe_b64encode(secrets.token_bytes(32)).rstrip(b"=").decode()
code_challenge = (
base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest())
.rstrip(b"=")
.decode()
)
state = secrets.token_urlsafe(16)
received_code = None
cid = None
csecret = None
def _basic_auth_header(client_id: str, client_secret: str) -> str:
"""Match x-mcp: Basic base64(client_id:client_secret)"""
raw = f"{client_id}:{client_secret}"
encoded = base64.b64encode(raw.encode()).decode()
return f"Basic {encoded}"
def _append_to_hermes_env(key: str, value: str) -> None:
"""Append a key=value to ~/.hermes/.env if not already present."""
HERMES_ENV.parent.mkdir(parents=True, exist_ok=True)
if HERMES_ENV.exists():
content = HERMES_ENV.read_text()
for line in content.splitlines():
if line.strip().startswith(f"{key}="):
# Already present — update in place
lines = content.splitlines()
new_lines = []
for l in lines:
if l.strip().startswith(f"{key}="):
new_lines.append(f"{key}={value}")
else:
new_lines.append(l)
HERMES_ENV.write_text("\n".join(new_lines) + "\n")
return
with open(HERMES_ENV, "a") as f:
f.write(f"{key}={value}\n")
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
global received_code
parsed = urlparse(self.path)
params = parse_qs(parsed.query)
if parsed.path == "/callback":
recv_state = params.get("state", [None])[0]
if recv_state != state:
self.send_response(400)
self.end_headers()
self.wfile.write(b"State mismatch! CSRF detected. Try again.")
return
if "code" in params:
received_code = params["code"][0]
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(b"""
<html><body style="font-family:monospace;text-align:center;padding:60px;background:#111;color:#0f0">
<h1>authorized. go back to your terminal.</h1>
<p>you can close this tab.</p>
</body></html>
""")
else:
error = params.get("error", ["unknown"])[0]
self.send_response(400)
self.end_headers()
self.wfile.write(f"Auth failed: {error}".encode())
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
pass
def main():
global cid, csecret
print("=" * 50)
print("X/Twitter OAuth2 PKCE Setup")
print("=" * 50)
print()
print("This gets you a refresh token for bookmark access.")
print("You only need to do this once.")
print()
cid = input("Client ID: ").strip()
csecret = input("Client Secret: ").strip()
if not cid or not csecret:
print(
"Both are required. Get them from https://developer.x.com/en/portal/dashboard"
)
return
# Build auth URL
params = {
"response_type": "code",
"client_id": cid,
"redirect_uri": REDIRECT_URI,
"scope": SCOPES,
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
url = f"{AUTH_URL}?{urlencode(params)}"
server = HTTPServer(("127.0.0.1", 3219), CallbackHandler)
server.timeout = 120
print()
print("Opening browser for authorization...")
print(f"If it doesn't open, go to:\n{url}")
print()
webbrowser.open(url)
while received_code is None:
server.handle_request()
server.server_close()
print("Got authorization code. Exchanging for tokens...")
import httpx
token_body = urlencode(
{
"grant_type": "authorization_code",
"code": received_code,
"redirect_uri": REDIRECT_URI,
"code_verifier": code_verifier,
"client_id": cid,
}
)
resp = httpx.post(
TOKEN_URL,
content=token_body,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": _basic_auth_header(cid, csecret),
},
timeout=30.0,
)
if resp.status_code != 200:
print(f"Token exchange failed: {resp.status_code}")
print(resp.text)
return
data = resp.json()
expires_at = int(time.time() * 1000) + data.get("expires_in", 7200) * 1000
result = {
"access_token": data["access_token"],
"refresh_token": data["refresh_token"],
"expires_at": expires_at,
}
# Save tokens to ~/.config/x-cli/.oauth2-tokens.json
TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
TOKEN_FILE.write_text(json.dumps(result, indent=2))
print(f"\nTokens saved to {TOKEN_FILE}")
# Save client credentials to ~/.hermes/.env
_append_to_hermes_env("X_OAUTH2_CLIENT_ID", cid)
_append_to_hermes_env("X_OAUTH2_CLIENT_SECRET", csecret)
print(f"Client credentials saved to {HERMES_ENV}")
print()
print("=" * 50)
print("SUCCESS!")
print("=" * 50)
print()
print("OAuth2 is fully configured. Bookmarks are ready to use.")
print("The hourly token refresh cron will keep your tokens alive.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,27 @@
[project]
name = "x-cli"
version = "0.1.0"
description = "CLI for X/Twitter API v2"
requires-python = ">=3.11"
dependencies = [
"click>=8.1",
"httpx>=0.27",
"rich>=13.0",
"python-dotenv>=1.0",
]
[project.scripts]
x-cli = "x_cli.cli:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/x_cli"]
[tool.pytest.ini_options]
testpaths = ["tests"]
[dependency-groups]
dev = ["pytest>=8.0", "ruff>=0.4"]

View File

@@ -0,0 +1 @@
"""x-cli: CLI for X/Twitter API v2."""

View File

@@ -0,0 +1,218 @@
"""Twitter API v2 client with OAuth 1.0a, Bearer token, and OAuth 2.0 auth."""
from __future__ import annotations
from typing import Any
import httpx
from .auth import Credentials, OAuth2Manager, generate_oauth_header
API_BASE = "https://api.x.com/2"
class XApiClient:
def __init__(self, creds: Credentials) -> None:
self.creds = creds
self._user_id: str | None = None
self._http = httpx.Client(timeout=30.0)
self._oauth2: OAuth2Manager | None = None
if creds.oauth2_client_id and creds.oauth2_client_secret:
self._oauth2 = OAuth2Manager(creds.oauth2_client_id, creds.oauth2_client_secret)
def close(self) -> None:
self._http.close()
# ---- internal ----
def _bearer_get(self, url: str) -> dict[str, Any]:
resp = self._http.get(url, headers={"Authorization": f"Bearer {self.creds.bearer_token}"})
return self._handle(resp)
def _oauth_request(self, method: str, url: str, json_body: dict | None = None) -> dict[str, Any]:
auth_header = generate_oauth_header(method, url, self.creds)
headers: dict[str, str] = {"Authorization": auth_header}
if json_body is not None:
headers["Content-Type"] = "application/json"
resp = self._http.request(method, url, headers=headers, json=json_body if json_body else None)
return self._handle(resp)
def _handle(self, resp: httpx.Response) -> dict[str, Any]:
if resp.status_code == 429:
reset = resp.headers.get("x-rate-limit-reset", "unknown")
raise RuntimeError(f"Rate limited. Resets at {reset}.")
data = resp.json()
if not resp.is_success:
errors = data.get("errors", [])
msg = "; ".join(e.get("detail") or e.get("message", "") for e in errors) or resp.text[:500]
raise RuntimeError(f"API error (HTTP {resp.status_code}): {msg}")
return data
def get_authenticated_user_id(self) -> str:
if self._user_id:
return self._user_id
data = self._oauth_request("GET", f"{API_BASE}/users/me")
self._user_id = data["data"]["id"]
return self._user_id
# ---- tweets ----
def post_tweet(
self,
text: str,
reply_to: str | None = None,
quote_tweet_id: str | None = None,
poll_options: list[str] | None = None,
poll_duration_minutes: int = 1440,
) -> dict[str, Any]:
body: dict[str, Any] = {"text": text}
if reply_to:
# NOTE: X API restricts programmatic replies (Feb 2024). Replies only
# succeed if the original author @mentioned you or quoted your post.
body["reply"] = {"in_reply_to_tweet_id": reply_to}
if quote_tweet_id:
body["quote_tweet_id"] = quote_tweet_id
if poll_options:
body["poll"] = {"options": poll_options, "duration_minutes": poll_duration_minutes}
return self._oauth_request("POST", f"{API_BASE}/tweets", body)
def delete_tweet(self, tweet_id: str) -> dict[str, Any]:
return self._oauth_request("DELETE", f"{API_BASE}/tweets/{tweet_id}")
def get_tweet(self, tweet_id: str) -> dict[str, Any]:
params = {
"tweet.fields": "created_at,public_metrics,author_id,conversation_id,in_reply_to_user_id,referenced_tweets,attachments,entities,lang,note_tweet",
"expansions": "author_id,referenced_tweets.id,attachments.media_keys",
"user.fields": "name,username,verified,profile_image_url,public_metrics",
"media.fields": "url,preview_image_url,type,width,height,alt_text",
}
qs = "&".join(f"{k}={v}" for k, v in params.items())
return self._bearer_get(f"{API_BASE}/tweets/{tweet_id}?{qs}")
def search_tweets(self, query: str, max_results: int = 10) -> dict[str, Any]:
max_results = max(10, min(max_results, 100))
params = {
"query": query,
"max_results": str(max_results),
"tweet.fields": "created_at,public_metrics,author_id,conversation_id,entities,lang,note_tweet",
"expansions": "author_id,attachments.media_keys",
"user.fields": "name,username,verified,profile_image_url",
"media.fields": "url,preview_image_url,type",
}
url = f"{API_BASE}/tweets/search/recent"
resp = self._http.get(url, params=params, headers={"Authorization": f"Bearer {self.creds.bearer_token}"})
return self._handle(resp)
def get_tweet_metrics(self, tweet_id: str) -> dict[str, Any]:
params = "tweet.fields=public_metrics,non_public_metrics,organic_metrics"
return self._oauth_request("GET", f"{API_BASE}/tweets/{tweet_id}?{params}")
# ---- users ----
def get_user(self, username: str) -> dict[str, Any]:
fields = "user.fields=created_at,description,public_metrics,verified,profile_image_url,url,location,pinned_tweet_id"
return self._bearer_get(f"{API_BASE}/users/by/username/{username}?{fields}")
def get_timeline(self, user_id: str, max_results: int = 10) -> dict[str, Any]:
max_results = max(5, min(max_results, 100))
params = {
"max_results": str(max_results),
"tweet.fields": "created_at,public_metrics,author_id,conversation_id,entities,lang,note_tweet",
"expansions": "author_id,attachments.media_keys,referenced_tweets.id",
"user.fields": "name,username,verified",
"media.fields": "url,preview_image_url,type",
}
resp = self._http.get(
f"{API_BASE}/users/{user_id}/tweets",
params=params,
headers={"Authorization": f"Bearer {self.creds.bearer_token}"},
)
return self._handle(resp)
def get_followers(self, user_id: str, max_results: int = 100) -> dict[str, Any]:
max_results = max(1, min(max_results, 1000))
params = {
"max_results": str(max_results),
"user.fields": "created_at,description,public_metrics,verified,profile_image_url",
}
resp = self._http.get(
f"{API_BASE}/users/{user_id}/followers",
params=params,
headers={"Authorization": f"Bearer {self.creds.bearer_token}"},
)
return self._handle(resp)
def get_following(self, user_id: str, max_results: int = 100) -> dict[str, Any]:
max_results = max(1, min(max_results, 1000))
params = {
"max_results": str(max_results),
"user.fields": "created_at,description,public_metrics,verified,profile_image_url",
}
resp = self._http.get(
f"{API_BASE}/users/{user_id}/following",
params=params,
headers={"Authorization": f"Bearer {self.creds.bearer_token}"},
)
return self._handle(resp)
def get_mentions(self, max_results: int = 10) -> dict[str, Any]:
user_id = self.get_authenticated_user_id()
max_results = max(5, min(max_results, 100))
params = {
"max_results": str(max_results),
"tweet.fields": "created_at,public_metrics,author_id,conversation_id,entities,note_tweet",
"expansions": "author_id",
"user.fields": "name,username,verified",
}
qs = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{API_BASE}/users/{user_id}/mentions?{qs}"
return self._oauth_request("GET", url)
# ---- engagement ----
def like_tweet(self, tweet_id: str) -> dict[str, Any]:
user_id = self.get_authenticated_user_id()
return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/likes", {"tweet_id": tweet_id})
def retweet(self, tweet_id: str) -> dict[str, Any]:
user_id = self.get_authenticated_user_id()
return self._oauth_request("POST", f"{API_BASE}/users/{user_id}/retweets", {"tweet_id": tweet_id})
# ---- bookmarks (OAuth 2.0 PKCE — required by X API for bookmark endpoints) ----
def _oauth2_request(self, method: str, url: str, json_body: dict | None = None) -> dict[str, Any]:
"""Make a request using OAuth 2.0 bearer token (for bookmarks)."""
if not self._oauth2:
raise RuntimeError(
"Bookmarks require OAuth2 credentials. Add X_OAUTH2_CLIENT_ID and "
"X_OAUTH2_CLIENT_SECRET to your .env, then run x-oauth2-setup.py "
"to get tokens."
)
token = self._oauth2.get_access_token()
headers: dict[str, str] = {"Authorization": f"Bearer {token}"}
if json_body is not None:
headers["Content-Type"] = "application/json"
resp = self._http.request(method, url, headers=headers, json=json_body if json_body else None)
return self._handle(resp)
def get_bookmarks(self, max_results: int = 10) -> dict[str, Any]:
user_id = self.get_authenticated_user_id()
max_results = max(1, min(max_results, 100))
params = {
"max_results": str(max_results),
"tweet.fields": "created_at,public_metrics,author_id,conversation_id,entities,lang,note_tweet",
"expansions": "author_id,attachments.media_keys",
"user.fields": "name,username,verified,profile_image_url",
"media.fields": "url,preview_image_url,type",
}
qs = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{API_BASE}/users/{user_id}/bookmarks?{qs}"
return self._oauth2_request("GET", url)
def bookmark_tweet(self, tweet_id: str) -> dict[str, Any]:
user_id = self.get_authenticated_user_id()
return self._oauth2_request("POST", f"{API_BASE}/users/{user_id}/bookmarks", {"tweet_id": tweet_id})
def unbookmark_tweet(self, tweet_id: str) -> dict[str, Any]:
user_id = self.get_authenticated_user_id()
return self._oauth2_request("DELETE", f"{API_BASE}/users/{user_id}/bookmarks/{tweet_id}")

View File

@@ -0,0 +1,210 @@
"""Auth: env var loading, OAuth 1.0a header generation, and OAuth 2.0 PKCE token management."""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import secrets
import time
import urllib.parse
from dataclasses import dataclass, field
from pathlib import Path
import httpx
from dotenv import load_dotenv
TOKEN_URL = "https://api.twitter.com/2/oauth2/token"
@dataclass
class Credentials:
api_key: str
api_secret: str
access_token: str
access_token_secret: str
bearer_token: str
oauth2_client_id: str = ""
oauth2_client_secret: str = ""
@dataclass
class OAuth2Tokens:
access_token: str
refresh_token: str
expires_at: int # unix ms
def is_expired(self) -> bool:
return int(time.time() * 1000) >= (self.expires_at - 60_000)
class OAuth2Manager:
"""Manages OAuth 2.0 tokens for bookmark operations. Auto-refreshes."""
def __init__(self, client_id: str, client_secret: str) -> None:
self.client_id = client_id
self.client_secret = client_secret
self._tokens: OAuth2Tokens | None = None
self._token_path = Path.home() / ".config" / "x-cli" / ".oauth2-tokens.json"
def _load_tokens(self) -> OAuth2Tokens | None:
if self._tokens and not self._tokens.is_expired():
return self._tokens
if self._token_path.exists():
data = json.loads(self._token_path.read_text())
self._tokens = OAuth2Tokens(**data)
if not self._tokens.is_expired():
return self._tokens
# expired — try refresh
return self._refresh()
return None
def _save_tokens(self, tokens: OAuth2Tokens) -> None:
self._tokens = tokens
self._token_path.parent.mkdir(parents=True, exist_ok=True)
self._token_path.write_text(json.dumps({
"access_token": tokens.access_token,
"refresh_token": tokens.refresh_token,
"expires_at": tokens.expires_at,
}, indent=2))
def _basic_auth_header(self) -> str:
"""Match x-mcp: Basic base64(client_id:client_secret)"""
import base64 as b64
raw = f"{self.client_id}:{self.client_secret}"
return f"Basic {b64.b64encode(raw.encode()).decode()}"
def _refresh(self) -> OAuth2Tokens | None:
if not self._tokens:
return None
try:
# Match x-mcp exactly: client_id in body + Basic auth header
from urllib.parse import urlencode
body = urlencode({
"grant_type": "refresh_token",
"refresh_token": self._tokens.refresh_token,
"client_id": self.client_id,
})
resp = httpx.post(
TOKEN_URL,
content=body,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": self._basic_auth_header(),
},
timeout=30.0,
)
if resp.status_code != 200:
# token file is stale, nuke it
self._token_path.unlink(missing_ok=True)
self._tokens = None
return None
data = resp.json()
tokens = OAuth2Tokens(
access_token=data["access_token"],
refresh_token=data.get("refresh_token", self._tokens.refresh_token),
expires_at=int(time.time() * 1000) + data.get("expires_in", 7200) * 1000,
)
self._save_tokens(tokens)
return tokens
except Exception:
return None
def get_access_token(self) -> str:
tokens = self._load_tokens()
if not tokens:
raise RuntimeError(
"OAuth2 not set up. Run the x-oauth2-setup.py script on a machine "
"with a browser, then copy .oauth2-tokens.json to ~/.config/x-cli/"
)
return tokens.access_token
def load_credentials() -> Credentials:
"""Load credentials from env vars, with .env fallback."""
# Try ~/.config/x-cli/.env then cwd .env
config_env = Path.home() / ".config" / "x-cli" / ".env"
if config_env.exists():
load_dotenv(config_env)
load_dotenv() # cwd .env
def require(name: str) -> str:
val = os.environ.get(name)
if not val:
raise SystemExit(
f"Missing env var: {name}. "
"Set X_API_KEY, X_API_SECRET, X_ACCESS_TOKEN, X_ACCESS_TOKEN_SECRET, X_BEARER_TOKEN."
)
return val
return Credentials(
api_key=require("X_API_KEY"),
api_secret=require("X_API_SECRET"),
access_token=require("X_ACCESS_TOKEN"),
access_token_secret=require("X_ACCESS_TOKEN_SECRET"),
bearer_token=require("X_BEARER_TOKEN"),
oauth2_client_id=os.environ.get("X_OAUTH2_CLIENT_ID", ""),
oauth2_client_secret=os.environ.get("X_OAUTH2_CLIENT_SECRET", ""),
)
def _percent_encode(s: str) -> str:
return urllib.parse.quote(s, safe="")
def generate_oauth_header(
method: str,
url: str,
creds: Credentials,
params: dict[str, str] | None = None,
) -> str:
"""Generate an OAuth 1.0a Authorization header (HMAC-SHA1)."""
oauth_params = {
"oauth_consumer_key": creds.api_key,
"oauth_nonce": secrets.token_hex(16),
"oauth_signature_method": "HMAC-SHA1",
"oauth_timestamp": str(int(time.time())),
"oauth_token": creds.access_token,
"oauth_version": "1.0",
}
# Combine oauth params with any query/body params for signature base
all_params = {**oauth_params}
if params:
all_params.update(params)
# Also include query string params from the URL
parsed = urllib.parse.urlparse(url)
if parsed.query:
qs_params = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
for k, v in qs_params.items():
all_params[k] = v[0]
# Sort and encode
sorted_params = sorted(all_params.items())
param_string = "&".join(f"{_percent_encode(k)}={_percent_encode(v)}" for k, v in sorted_params)
# Base URL (no query string)
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Signature base string
base_string = f"{method.upper()}&{_percent_encode(base_url)}&{_percent_encode(param_string)}"
# Signing key
signing_key = f"{_percent_encode(creds.api_secret)}&{_percent_encode(creds.access_token_secret)}"
# HMAC-SHA1
signature = base64.b64encode(
hmac.new(signing_key.encode(), base_string.encode(), hashlib.sha1).digest()
).decode()
oauth_params["oauth_signature"] = signature
# Build header
header_parts = ", ".join(
f'{_percent_encode(k)}="{_percent_encode(v)}"'
for k, v in sorted(oauth_params.items())
)
return f"OAuth {header_parts}"

View File

@@ -0,0 +1,271 @@
"""Click CLI for x-cli."""
from __future__ import annotations
import click
from .api import XApiClient
from .auth import load_credentials
from .formatters import format_output
from .utils import parse_tweet_id, strip_at
class State:
def __init__(self, mode: str, verbose: bool = False) -> None:
self.mode = mode
self.verbose = verbose
self._client: XApiClient | None = None
@property
def client(self) -> XApiClient:
if self._client is None:
creds = load_credentials()
self._client = XApiClient(creds)
return self._client
def output(self, data, title: str = "") -> None:
format_output(data, self.mode, title, verbose=self.verbose)
pass_state = click.make_pass_decorator(State)
@click.group()
@click.option("--json", "-j", "fmt", flag_value="json", help="JSON output")
@click.option("--plain", "-p", "fmt", flag_value="plain", help="TSV output for piping")
@click.option("--markdown", "-md", "fmt", flag_value="markdown", help="Markdown output")
@click.option("--verbose", "-v", is_flag=True, default=False, help="Verbose output (show metrics, timestamps, metadata)")
@click.pass_context
def cli(ctx, fmt, verbose):
"""x-cli: CLI for X/Twitter API v2."""
ctx.ensure_object(dict)
ctx.obj = State(fmt or "plain", verbose=verbose)
# ============================================================
# tweet
# ============================================================
@cli.group()
def tweet():
"""Tweet operations."""
@tweet.command("post")
@click.argument("text")
@click.option("--poll", default=None, help="Comma-separated poll options")
@click.option("--poll-duration", default=1440, type=int, help="Poll duration in minutes")
@pass_state
def tweet_post(state, text, poll, poll_duration):
"""Post a tweet."""
poll_options = [o.strip() for o in poll.split(",")] if poll else None
data = state.client.post_tweet(text, poll_options=poll_options, poll_duration_minutes=poll_duration)
state.output(data, "Posted")
@tweet.command("get")
@click.argument("id_or_url")
@pass_state
def tweet_get(state, id_or_url):
"""Fetch a tweet by ID or URL."""
tid = parse_tweet_id(id_or_url)
data = state.client.get_tweet(tid)
state.output(data, f"Tweet {tid}")
@tweet.command("delete")
@click.argument("id_or_url")
@pass_state
def tweet_delete(state, id_or_url):
"""Delete a tweet."""
tid = parse_tweet_id(id_or_url)
data = state.client.delete_tweet(tid)
state.output(data, "Deleted")
@tweet.command("reply")
@click.argument("id_or_url")
@click.argument("text")
@pass_state
def tweet_reply(state, id_or_url, text):
"""Reply to a tweet.
NOTE: X restricts programmatic replies. You can only reply if the original
author @mentioned you or quoted your post. Use 'tweet quote' as a workaround.
"""
tid = parse_tweet_id(id_or_url)
click.echo(
"Warning: X restricts programmatic replies. This will only succeed if "
"the original author @mentioned you or quoted your post.",
err=True,
)
data = state.client.post_tweet(text, reply_to=tid)
state.output(data, "Reply")
@tweet.command("quote")
@click.argument("id_or_url")
@click.argument("text")
@pass_state
def tweet_quote(state, id_or_url, text):
"""Quote tweet."""
tid = parse_tweet_id(id_or_url)
data = state.client.post_tweet(text, quote_tweet_id=tid)
state.output(data, "Quote")
@tweet.command("search")
@click.argument("query")
@click.option("--max", "max_results", default=10, type=int, help="Max results (10-100)")
@pass_state
def tweet_search(state, query, max_results):
"""Search recent tweets."""
data = state.client.search_tweets(query, max_results)
state.output(data, f"Search: {query}")
@tweet.command("metrics")
@click.argument("id_or_url")
@pass_state
def tweet_metrics(state, id_or_url):
"""Get tweet engagement metrics."""
tid = parse_tweet_id(id_or_url)
data = state.client.get_tweet_metrics(tid)
state.output(data, f"Metrics {tid}")
# ============================================================
# user
# ============================================================
@cli.group()
def user():
"""User operations."""
@user.command("get")
@click.argument("username")
@pass_state
def user_get(state, username):
"""Look up a user profile."""
data = state.client.get_user(strip_at(username))
state.output(data, f"@{strip_at(username)}")
@user.command("timeline")
@click.argument("username")
@click.option("--max", "max_results", default=10, type=int, help="Max results (5-100)")
@pass_state
def user_timeline(state, username, max_results):
"""Fetch a user's recent tweets."""
uname = strip_at(username)
user_data = state.client.get_user(uname)
uid = user_data["data"]["id"]
data = state.client.get_timeline(uid, max_results)
state.output(data, f"@{uname} timeline")
@user.command("followers")
@click.argument("username")
@click.option("--max", "max_results", default=100, type=int, help="Max results (1-1000)")
@pass_state
def user_followers(state, username, max_results):
"""List a user's followers."""
uname = strip_at(username)
user_data = state.client.get_user(uname)
uid = user_data["data"]["id"]
data = state.client.get_followers(uid, max_results)
state.output(data, f"@{uname} followers")
@user.command("following")
@click.argument("username")
@click.option("--max", "max_results", default=100, type=int, help="Max results (1-1000)")
@pass_state
def user_following(state, username, max_results):
"""List who a user follows."""
uname = strip_at(username)
user_data = state.client.get_user(uname)
uid = user_data["data"]["id"]
data = state.client.get_following(uid, max_results)
state.output(data, f"@{uname} following")
# ============================================================
# me
# ============================================================
@cli.group()
def me():
"""Self operations (authenticated user)."""
@me.command("mentions")
@click.option("--max", "max_results", default=10, type=int, help="Max results (5-100)")
@pass_state
def me_mentions(state, max_results):
"""Fetch your recent mentions."""
data = state.client.get_mentions(max_results)
state.output(data, "Mentions")
@me.command("bookmarks")
@click.option("--max", "max_results", default=10, type=int, help="Max results (1-100)")
@pass_state
def me_bookmarks(state, max_results):
"""Fetch your bookmarks."""
data = state.client.get_bookmarks(max_results)
state.output(data, "Bookmarks")
@me.command("bookmark")
@click.argument("id_or_url")
@pass_state
def me_bookmark(state, id_or_url):
"""Bookmark a tweet."""
tid = parse_tweet_id(id_or_url)
data = state.client.bookmark_tweet(tid)
state.output(data, "Bookmarked")
@me.command("unbookmark")
@click.argument("id_or_url")
@pass_state
def me_unbookmark(state, id_or_url):
"""Remove a bookmark."""
tid = parse_tweet_id(id_or_url)
data = state.client.unbookmark_tweet(tid)
state.output(data, "Unbookmarked")
# ============================================================
# quick actions (top-level)
# ============================================================
@cli.command("like")
@click.argument("id_or_url")
@pass_state
def like(state, id_or_url):
"""Like a tweet."""
tid = parse_tweet_id(id_or_url)
data = state.client.like_tweet(tid)
state.output(data, "Liked")
@cli.command("retweet")
@click.argument("id_or_url")
@pass_state
def retweet(state, id_or_url):
"""Retweet a tweet."""
tid = parse_tweet_id(id_or_url)
data = state.client.retweet(tid)
state.output(data, "Retweeted")
def main():
cli()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,348 @@
"""Output formatters: human (rich), JSON, TSV/plain, markdown."""
from __future__ import annotations
import json
from typing import Any
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
# ---- JSON ----
def output_json(data: Any, verbose: bool = False) -> None:
"""Raw JSON to stdout."""
if not verbose and isinstance(data, dict):
# Strip includes/meta, just emit data
inner = data.get("data")
if inner is not None:
print(json.dumps(inner, indent=2, default=str))
return
print(json.dumps(data, indent=2, default=str))
# ---- Plain/TSV ----
def output_plain(data: Any, verbose: bool = False) -> None:
"""TSV output for piping."""
if isinstance(data, dict):
inner = data.get("data")
if inner is None:
inner = data
if isinstance(inner, list):
_plain_list(inner, verbose)
elif isinstance(inner, dict):
_plain_dict(inner, verbose)
else:
print(inner)
elif isinstance(data, list):
_plain_list(data, verbose)
else:
print(data)
def _plain_dict(d: dict, verbose: bool = False) -> None:
skip = set() if verbose else {"public_metrics", "entities", "edit_history_tweet_ids", "attachments", "referenced_tweets", "profile_image_url"}
for k, v in d.items():
if not verbose and k in skip:
continue
if isinstance(v, (dict, list)):
v = json.dumps(v, default=str)
print(f"{k}\t{v}")
def _plain_list(items: list, verbose: bool = False) -> None:
if not items:
return
if not isinstance(items[0], dict):
for item in items:
print(item)
return
# Pick columns based on verbose
all_keys = list(items[0].keys())
if verbose:
keys = all_keys
else:
# Compact: only the most useful fields
if "username" in items[0]:
keys = [k for k in ["username", "name", "description"] if k in all_keys]
else:
keys = [k for k in ["id", "author_id", "text", "created_at"] if k in all_keys]
if not keys:
keys = all_keys
print("\t".join(keys))
for item in items:
vals = []
for k in keys:
v = item.get(k, "")
if isinstance(v, (dict, list)):
v = json.dumps(v, default=str)
vals.append(str(v))
print("\t".join(vals))
# ---- Markdown ----
def output_markdown(data: Any, title: str = "", verbose: bool = False) -> None:
"""Markdown output to stdout."""
if isinstance(data, dict):
inner = data.get("data")
includes = data.get("includes", {})
meta = data.get("meta", {})
if inner is None:
inner = data
if isinstance(inner, list):
_md_list(inner, includes, title, verbose)
elif isinstance(inner, dict):
_md_single(inner, includes, title, verbose)
else:
print(str(inner))
if verbose and meta.get("next_token"):
print(f"\n*Next page: `--next-token {meta['next_token']}`*")
elif isinstance(data, list):
_md_list(data, {}, title, verbose)
else:
print(str(data))
def _md_single(item: dict, includes: dict, title: str = "", verbose: bool = False) -> None:
if "username" in item:
_md_user(item, verbose)
else:
_md_tweet(item, includes, title, verbose)
def _md_tweet(tweet: dict, includes: dict, title: str = "", verbose: bool = False) -> None:
author = _resolve_author(tweet.get("author_id"), includes)
text = tweet.get("text", "")
tweet_id = tweet.get("id", "")
note = tweet.get("note_tweet", {})
if note and note.get("text"):
text = note["text"]
if title:
print(f"## {title}\n")
print(f"**{author}**")
if verbose:
created = tweet.get("created_at", "")
if created:
print(f"*{created}*")
print(f"\n{text}\n")
if verbose:
metrics = tweet.get("public_metrics", {})
if metrics:
parts = [f"{k.replace('_count', '')}: {v}" for k, v in metrics.items()]
print(" | ".join(parts))
print()
print(f"ID: `{tweet_id}`")
def _md_user(user: dict, verbose: bool = False) -> None:
name = user.get("name", "")
username = user.get("username", "")
desc = user.get("description", "")
print(f"## {name} (@{username})\n")
if desc:
print(f"{desc}\n")
metrics = user.get("public_metrics", {})
if metrics:
parts = [f"**{k.replace('_count', '')}**: {v:,}" for k, v in metrics.items()]
print(" | ".join(parts))
print()
if verbose:
loc = user.get("location", "")
created = user.get("created_at", "")
if loc:
print(f"Location: {loc}")
if created:
print(f"Joined: {created}")
def _md_list(items: list, includes: dict, title: str = "", verbose: bool = False) -> None:
if not items:
return
if title:
print(f"## {title}\n")
if items and "username" in items[0]:
_md_user_table(items, verbose)
else:
for i, item in enumerate(items):
if i > 0:
print("\n---\n")
_md_tweet(item, includes, verbose=verbose)
def _md_user_table(users: list, verbose: bool = False) -> None:
if verbose:
print("| Username | Name | Followers | Description |")
print("|----------|------|-----------|-------------|")
for u in users:
m = u.get("public_metrics", {})
followers = f"{m.get('followers_count', 0):,}"
desc = (u.get("description", "") or "")[:60].replace("|", "/").replace("\n", " ")
print(f"| @{u.get('username', '')} | {u.get('name', '')} | {followers} | {desc} |")
else:
print("| Username | Name | Followers |")
print("|----------|------|-----------|")
for u in users:
m = u.get("public_metrics", {})
followers = f"{m.get('followers_count', 0):,}"
print(f"| @{u.get('username', '')} | {u.get('name', '')} | {followers} |")
# ---- Rich (human-readable) ----
_console = Console(stderr=True)
_stdout = Console()
def output_human(data: Any, title: str = "", verbose: bool = False) -> None:
"""Pretty-print with rich."""
if isinstance(data, dict):
inner = data.get("data")
includes = data.get("includes", {})
meta = data.get("meta", {})
if inner is None:
inner = data
if isinstance(inner, list):
_human_tweet_list(inner, includes, title, verbose)
elif isinstance(inner, dict):
_human_single(inner, includes, title, verbose)
else:
_stdout.print(inner)
if verbose and meta.get("next_token"):
_console.print(f"[dim]Next page: --next-token {meta['next_token']}[/dim]")
elif isinstance(data, list):
_human_tweet_list(data, {}, title, verbose)
else:
_stdout.print(data)
def _resolve_author(author_id: str | None, includes: dict) -> str:
if not author_id:
return "?"
users = includes.get("users", [])
for u in users:
if u.get("id") == author_id:
return f"@{u.get('username', '?')}"
return author_id
def _human_single(item: dict, includes: dict, title: str = "", verbose: bool = False) -> None:
if "username" in item:
_human_user(item, verbose)
else:
_human_tweet(item, includes, title, verbose)
def _human_tweet(tweet: dict, includes: dict, title: str = "", verbose: bool = False) -> None:
author = _resolve_author(tweet.get("author_id"), includes)
text = tweet.get("text", "")
tweet_id = tweet.get("id", "")
note = tweet.get("note_tweet", {})
if note and note.get("text"):
text = note["text"]
content = f"[bold]{author}[/bold]"
if verbose:
created = tweet.get("created_at", "")
content += f" [dim]{created}[/dim]"
content += f"\n\n{text}"
if verbose:
metrics = tweet.get("public_metrics", {})
if metrics:
parts = [f"{k.replace('_count', '').replace('_', ' ')}: {v}" for k, v in metrics.items()]
content += f"\n\n[dim]{' | '.join(parts)}[/dim]"
panel_title = title or f"Tweet {tweet_id}"
_stdout.print(Panel(content, title=panel_title, border_style="blue", expand=False))
def _human_user(user: dict, verbose: bool = False) -> None:
name = user.get("name", "")
username = user.get("username", "")
desc = user.get("description", "")
metrics = user.get("public_metrics", {})
metrics_parts = []
if metrics:
for k, v in metrics.items():
label = k.replace("_count", "").replace("_", " ")
metrics_parts.append(f"{label}: {v:,}")
content = f"[bold]{name}[/bold] @{username}"
if user.get("verified"):
content += " [blue]verified[/blue]"
if desc:
content += f"\n{desc}"
if verbose:
loc = user.get("location", "")
created = user.get("created_at", "")
if loc:
content += f"\n[dim]Location: {loc}[/dim]"
if created:
content += f"\n[dim]Joined: {created}[/dim]"
if metrics_parts:
content += f"\n\n{' | '.join(metrics_parts)}"
_stdout.print(Panel(content, title=f"@{username}", border_style="green", expand=False))
def _human_tweet_list(items: list, includes: dict, title: str = "", verbose: bool = False) -> None:
if items and "username" in items[0]:
_human_user_table(items, title, verbose)
else:
for item in items:
_human_tweet(item, includes, verbose=verbose)
def _human_user_table(users: list, title: str = "", verbose: bool = False) -> None:
table = Table(title=title or "Users", show_lines=True)
table.add_column("Username", style="bold")
table.add_column("Name")
table.add_column("Followers", justify="right")
if verbose:
table.add_column("Description", max_width=50)
for u in users:
metrics = u.get("public_metrics", {})
followers = str(metrics.get("followers_count", ""))
row = [
f"@{u.get('username', '')}",
u.get("name", ""),
followers,
]
if verbose:
row.append((u.get("description", "") or "")[:50])
table.add_row(*row)
_stdout.print(table)
# ---- Router ----
def format_output(data: Any, mode: str = "human", title: str = "", verbose: bool = False) -> None:
"""Route to the appropriate formatter."""
if mode == "json":
output_json(data, verbose)
elif mode == "plain":
output_plain(data, verbose)
elif mode == "markdown":
output_markdown(data, title, verbose)
else:
output_human(data, title, verbose)

View File

@@ -0,0 +1,21 @@
"""Utility helpers for x-cli."""
from __future__ import annotations
import re
def parse_tweet_id(input_str: str) -> str:
"""Extract a tweet ID from a URL or raw numeric string."""
match = re.search(r"(?:twitter\.com|x\.com)/\w+/status/(\d+)", input_str)
if match:
return match.group(1)
stripped = input_str.strip()
if re.fullmatch(r"\d+", stripped):
return stripped
raise ValueError(f"Invalid tweet ID or URL: {input_str}")
def strip_at(username: str) -> str:
"""Remove leading @ from a username if present."""
return username.lstrip("@")

252
skills/xitter/x-cli/uv.lock generated Normal file
View File

@@ -0,0 +1,252 @@
version = 1
revision = 3
requires-python = ">=3.11"
[[package]]
name = "anyio"
version = "4.12.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/96/f0/5eb65b2bb0d09ac6776f2eb54adee6abe8228ea05b20a5ad0e4945de8aac/anyio-4.12.1.tar.gz", hash = "sha256:41cfcc3a4c85d3f05c932da7c26d0201ac36f72abd4435ba90d0464a3ffed703", size = 228685, upload-time = "2026-01-06T11:45:21.246Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/0e/27be9fdef66e72d64c0cdc3cc2823101b80585f8119b5c112c2e8f5f7dab/anyio-4.12.1-py3-none-any.whl", hash = "sha256:d405828884fc140aa80a3c667b8beed277f1dfedec42ba031bd6ac3db606ab6c", size = 113592, upload-time = "2026-01-06T11:45:19.497Z" },
]
[[package]]
name = "certifi"
version = "2026.1.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e0/2d/a891ca51311197f6ad14a7ef42e2399f36cf2f9bd44752b3dc4eab60fdc5/certifi-2026.1.4.tar.gz", hash = "sha256:ac726dd470482006e014ad384921ed6438c457018f4b3d204aea4281258b2120", size = 154268, upload-time = "2026-01-04T02:42:41.825Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e6/ad/3cc14f097111b4de0040c83a525973216457bbeeb63739ef1ed275c1c021/certifi-2026.1.4-py3-none-any.whl", hash = "sha256:9943707519e4add1115f44c2bc244f782c0249876bf51b6599fee1ffbedd685c", size = 152900, upload-time = "2026-01-04T02:42:40.15Z" },
]
[[package]]
name = "click"
version = "8.3.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/fa/656b739db8587d7b5dfa22e22ed02566950fbfbcdc20311993483657a5c0/click-8.3.1.tar.gz", hash = "sha256:12ff4785d337a1bb490bb7e9c2b1ee5da3112e94a8622f26a6c77f5d2fc6842a", size = 295065, upload-time = "2025-11-15T20:45:42.706Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/98/78/01c019cdb5d6498122777c1a43056ebb3ebfeef2076d9d026bfe15583b2b/click-8.3.1-py3-none-any.whl", hash = "sha256:981153a64e25f12d547d3426c367a4857371575ee7ad18df2a6183ab0545b2a6", size = 108274, upload-time = "2025-11-15T20:45:41.139Z" },
]
[[package]]
name = "colorama"
version = "0.4.6"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]]
name = "idna"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/6f/6d/0703ccc57f3a7233505399edb88de3cbd678da106337b9fcde432b65ed60/idna-3.11.tar.gz", hash = "sha256:795dafcc9c04ed0c1fb032c2aa73654d8e8c5023a7df64a53f39190ada629902", size = 194582, upload-time = "2025-10-12T14:55:20.501Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/61/66938bbb5fc52dbdf84594873d5b51fb1f7c7794e9c0f5bd885f30bc507b/idna-3.11-py3-none-any.whl", hash = "sha256:771a87f49d9defaf64091e6e6fe9c18d4833f140bd19464795bc32d966ca37ea", size = 71008, upload-time = "2025-10-12T14:55:18.883Z" },
]
[[package]]
name = "iniconfig"
version = "2.3.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "packaging"
version = "26.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/65/ee/299d360cdc32edc7d2cf530f3accf79c4fca01e96ffc950d8a52213bd8e4/packaging-26.0.tar.gz", hash = "sha256:00243ae351a257117b6a241061796684b084ed1c516a08c48a3f7e147a9d80b4", size = 143416, upload-time = "2026-01-21T20:50:39.064Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/b9/c538f279a4e237a006a2c98387d081e9eb060d203d8ed34467cc0f0b9b53/packaging-26.0-py3-none-any.whl", hash = "sha256:b36f1fef9334a5588b4166f8bcd26a14e521f2b55e6b9de3aaa80d3ff7a37529", size = 74366, upload-time = "2026-01-21T20:50:37.788Z" },
]
[[package]]
name = "pluggy"
version = "1.6.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "pygments"
version = "2.19.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
]
[[package]]
name = "pytest"
version = "9.0.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" },
{ name = "iniconfig" },
{ name = "packaging" },
{ name = "pluggy" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d1/db/7ef3487e0fb0049ddb5ce41d3a49c235bf9ad299b6a25d5780a89f19230f/pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11", size = 1568901, upload-time = "2025-12-06T21:30:51.014Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" },
]
[[package]]
name = "python-dotenv"
version = "1.2.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/f0/26/19cadc79a718c5edbec86fd4919a6b6d3f681039a2f6d66d14be94e75fb9/python_dotenv-1.2.1.tar.gz", hash = "sha256:42667e897e16ab0d66954af0e60a9caa94f0fd4ecf3aaf6d2d260eec1aa36ad6", size = 44221, upload-time = "2025-10-26T15:12:10.434Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/14/1b/a298b06749107c305e1fe0f814c6c74aea7b2f1e10989cb30f544a1b3253/python_dotenv-1.2.1-py3-none-any.whl", hash = "sha256:b81ee9561e9ca4004139c6cbba3a238c32b03e4894671e181b671e8cb8425d61", size = 21230, upload-time = "2025-10-26T15:12:09.109Z" },
]
[[package]]
name = "rich"
version = "14.3.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/74/99/a4cab2acbb884f80e558b0771e97e21e939c5dfb460f488d19df485e8298/rich-14.3.2.tar.gz", hash = "sha256:e712f11c1a562a11843306f5ed999475f09ac31ffb64281f73ab29ffdda8b3b8", size = 230143, upload-time = "2026-02-01T16:20:47.908Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ef/45/615f5babd880b4bd7d405cc0dc348234c5ffb6ed1ea33e152ede08b2072d/rich-14.3.2-py3-none-any.whl", hash = "sha256:08e67c3e90884651da3239ea668222d19bea7b589149d8014a21c633420dbb69", size = 309963, upload-time = "2026-02-01T16:20:46.078Z" },
]
[[package]]
name = "ruff"
version = "0.15.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/04/dc/4e6ac71b511b141cf626357a3946679abeba4cf67bc7cc5a17920f31e10d/ruff-0.15.1.tar.gz", hash = "sha256:c590fe13fb57c97141ae975c03a1aedb3d3156030cabd740d6ff0b0d601e203f", size = 4540855, upload-time = "2026-02-12T23:09:09.998Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/23/bf/e6e4324238c17f9d9120a9d60aa99a7daaa21204c07fcd84e2ef03bb5fd1/ruff-0.15.1-py3-none-linux_armv6l.whl", hash = "sha256:b101ed7cf4615bda6ffe65bdb59f964e9f4a0d3f85cbf0e54f0ab76d7b90228a", size = 10367819, upload-time = "2026-02-12T23:09:03.598Z" },
{ url = "https://files.pythonhosted.org/packages/b3/ea/c8f89d32e7912269d38c58f3649e453ac32c528f93bb7f4219258be2e7ed/ruff-0.15.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:939c995e9277e63ea632cc8d3fae17aa758526f49a9a850d2e7e758bfef46602", size = 10798618, upload-time = "2026-02-12T23:09:22.928Z" },
{ url = "https://files.pythonhosted.org/packages/5e/0f/1d0d88bc862624247d82c20c10d4c0f6bb2f346559d8af281674cf327f15/ruff-0.15.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:1d83466455fdefe60b8d9c8df81d3c1bbb2115cede53549d3b522ce2bc703899", size = 10148518, upload-time = "2026-02-12T23:08:58.339Z" },
{ url = "https://files.pythonhosted.org/packages/f5/c8/291c49cefaa4a9248e986256df2ade7add79388fe179e0691be06fae6f37/ruff-0.15.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a9457e3c3291024866222b96108ab2d8265b477e5b1534c7ddb1810904858d16", size = 10518811, upload-time = "2026-02-12T23:09:31.865Z" },
{ url = "https://files.pythonhosted.org/packages/c3/1a/f5707440e5ae43ffa5365cac8bbb91e9665f4a883f560893829cf16a606b/ruff-0.15.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:92c92b003e9d4f7fbd33b1867bb15a1b785b1735069108dfc23821ba045b29bc", size = 10196169, upload-time = "2026-02-12T23:09:17.306Z" },
{ url = "https://files.pythonhosted.org/packages/2a/ff/26ddc8c4da04c8fd3ee65a89c9fb99eaa5c30394269d424461467be2271f/ruff-0.15.1-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1fe5c41ab43e3a06778844c586251eb5a510f67125427625f9eb2b9526535779", size = 10990491, upload-time = "2026-02-12T23:09:25.503Z" },
{ url = "https://files.pythonhosted.org/packages/fc/00/50920cb385b89413f7cdb4bb9bc8fc59c1b0f30028d8bccc294189a54955/ruff-0.15.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:66a6dd6df4d80dc382c6484f8ce1bcceb55c32e9f27a8b94c32f6c7331bf14fb", size = 11843280, upload-time = "2026-02-12T23:09:19.88Z" },
{ url = "https://files.pythonhosted.org/packages/5d/6d/2f5cad8380caf5632a15460c323ae326f1e1a2b5b90a6ee7519017a017ca/ruff-0.15.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6a4a42cbb8af0bda9bcd7606b064d7c0bc311a88d141d02f78920be6acb5aa83", size = 11274336, upload-time = "2026-02-12T23:09:14.907Z" },
{ url = "https://files.pythonhosted.org/packages/a3/1d/5f56cae1d6c40b8a318513599b35ea4b075d7dc1cd1d04449578c29d1d75/ruff-0.15.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4ab064052c31dddada35079901592dfba2e05f5b1e43af3954aafcbc1096a5b2", size = 11137288, upload-time = "2026-02-12T23:09:07.475Z" },
{ url = "https://files.pythonhosted.org/packages/cd/20/6f8d7d8f768c93b0382b33b9306b3b999918816da46537d5a61635514635/ruff-0.15.1-py3-none-manylinux_2_31_riscv64.whl", hash = "sha256:5631c940fe9fe91f817a4c2ea4e81f47bee3ca4aa646134a24374f3c19ad9454", size = 11070681, upload-time = "2026-02-12T23:08:55.43Z" },
{ url = "https://files.pythonhosted.org/packages/9a/67/d640ac76069f64cdea59dba02af2e00b1fa30e2103c7f8d049c0cff4cafd/ruff-0.15.1-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:68138a4ba184b4691ccdc39f7795c66b3c68160c586519e7e8444cf5a53e1b4c", size = 10486401, upload-time = "2026-02-12T23:09:27.927Z" },
{ url = "https://files.pythonhosted.org/packages/65/3d/e1429f64a3ff89297497916b88c32a5cc88eeca7e9c787072d0e7f1d3e1e/ruff-0.15.1-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:518f9af03bfc33c03bdb4cb63fabc935341bb7f54af500f92ac309ecfbba6330", size = 10197452, upload-time = "2026-02-12T23:09:12.147Z" },
{ url = "https://files.pythonhosted.org/packages/78/83/e2c3bade17dad63bf1e1c2ffaf11490603b760be149e1419b07049b36ef2/ruff-0.15.1-py3-none-musllinux_1_2_i686.whl", hash = "sha256:da79f4d6a826caaea95de0237a67e33b81e6ec2e25fc7e1993a4015dffca7c61", size = 10693900, upload-time = "2026-02-12T23:09:34.418Z" },
{ url = "https://files.pythonhosted.org/packages/a1/27/fdc0e11a813e6338e0706e8b39bb7a1d61ea5b36873b351acee7e524a72a/ruff-0.15.1-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:3dd86dccb83cd7d4dcfac303ffc277e6048600dfc22e38158afa208e8bf94a1f", size = 11227302, upload-time = "2026-02-12T23:09:36.536Z" },
{ url = "https://files.pythonhosted.org/packages/f6/58/ac864a75067dcbd3b95be5ab4eb2b601d7fbc3d3d736a27e391a4f92a5c1/ruff-0.15.1-py3-none-win32.whl", hash = "sha256:660975d9cb49b5d5278b12b03bb9951d554543a90b74ed5d366b20e2c57c2098", size = 10462555, upload-time = "2026-02-12T23:09:29.899Z" },
{ url = "https://files.pythonhosted.org/packages/e0/5e/d4ccc8a27ecdb78116feac4935dfc39d1304536f4296168f91ed3ec00cd2/ruff-0.15.1-py3-none-win_amd64.whl", hash = "sha256:c820fef9dd5d4172a6570e5721704a96c6679b80cf7be41659ed439653f62336", size = 11599956, upload-time = "2026-02-12T23:09:01.157Z" },
{ url = "https://files.pythonhosted.org/packages/2a/07/5bda6a85b220c64c65686bc85bd0bbb23b29c62b3a9f9433fa55f17cda93/ruff-0.15.1-py3-none-win_arm64.whl", hash = "sha256:5ff7d5f0f88567850f45081fac8f4ec212be8d0b963e385c3f7d0d2eb4899416", size = 10874604, upload-time = "2026-02-12T23:09:05.515Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "x-cli"
version = "0.1.0"
source = { editable = "." }
dependencies = [
{ name = "click" },
{ name = "httpx" },
{ name = "python-dotenv" },
{ name = "rich" },
]
[package.dev-dependencies]
dev = [
{ name = "pytest" },
{ name = "ruff" },
]
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1" },
{ name = "httpx", specifier = ">=0.27" },
{ name = "python-dotenv", specifier = ">=1.0" },
{ name = "rich", specifier = ">=13.0" },
]
[package.metadata.requires-dev]
dev = [
{ name = "pytest", specifier = ">=8.0" },
{ name = "ruff", specifier = ">=0.4" },
]

View File

@@ -1,124 +0,0 @@
"""Tests verifying interrupt key consistency between adapter and gateway.
Regression test for a bug where monitor_for_interrupt() in _run_agent used
source.chat_id to query the adapter, but the adapter stores interrupts under
the full session key (build_session_key output). This mismatch meant
interrupts were never detected, causing subagents to ignore new messages.
"""
import asyncio
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
from gateway.session import SessionSource, build_session_key
class StubAdapter(BasePlatformAdapter):
"""Minimal adapter for interrupt tests."""
def __init__(self):
super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM)
async def connect(self):
return True
async def disconnect(self):
pass
async def send(self, chat_id, content, reply_to=None, metadata=None):
return SendResult(success=True, message_id="1")
async def send_typing(self, chat_id, metadata=None):
pass
async def get_chat_info(self, chat_id):
return {"id": chat_id}
def _source(chat_id="123456", chat_type="dm", thread_id=None):
return SessionSource(
platform=Platform.TELEGRAM,
chat_id=chat_id,
chat_type=chat_type,
thread_id=thread_id,
)
class TestInterruptKeyConsistency:
"""Ensure adapter interrupt methods are queried with session_key, not chat_id."""
def test_session_key_differs_from_chat_id_for_dm(self):
"""Session key for a DM is NOT the same as chat_id."""
source = _source("123456", "dm")
session_key = build_session_key(source)
assert session_key != source.chat_id
assert session_key == "agent:main:telegram:dm"
def test_session_key_differs_from_chat_id_for_group(self):
"""Session key for a group chat includes prefix, unlike raw chat_id."""
source = _source("-1001234", "group")
session_key = build_session_key(source)
assert session_key != source.chat_id
assert "agent:main:" in session_key
assert source.chat_id in session_key
@pytest.mark.asyncio
async def test_has_pending_interrupt_requires_session_key(self):
"""has_pending_interrupt returns True only when queried with session_key."""
adapter = StubAdapter()
source = _source("123456", "dm")
session_key = build_session_key(source)
# Simulate adapter storing interrupt under session_key
interrupt_event = asyncio.Event()
adapter._active_sessions[session_key] = interrupt_event
interrupt_event.set()
# Using session_key → found
assert adapter.has_pending_interrupt(session_key) is True
# Using chat_id → NOT found (this was the bug)
assert adapter.has_pending_interrupt(source.chat_id) is False
@pytest.mark.asyncio
async def test_get_pending_message_requires_session_key(self):
"""get_pending_message returns the event only with session_key."""
adapter = StubAdapter()
source = _source("123456", "dm")
session_key = build_session_key(source)
event = MessageEvent(text="hello", source=source, message_id="42")
adapter._pending_messages[session_key] = event
# Using chat_id → None (the bug)
assert adapter.get_pending_message(source.chat_id) is None
# Using session_key → found
result = adapter.get_pending_message(session_key)
assert result is event
@pytest.mark.asyncio
async def test_handle_message_stores_under_session_key(self):
"""handle_message stores pending messages under session_key, not chat_id."""
adapter = StubAdapter()
adapter.set_message_handler(lambda event: asyncio.sleep(0, result=None))
source = _source("-1001234", "group")
session_key = build_session_key(source)
# Mark session as active
adapter._active_sessions[session_key] = asyncio.Event()
# Send a second message while session is active
event = MessageEvent(text="interrupt!", source=source, message_id="2")
await adapter.handle_message(event)
# Stored under session_key
assert session_key in adapter._pending_messages
# NOT stored under chat_id
assert source.chat_id not in adapter._pending_messages
# Interrupt event was set
assert adapter._active_sessions[session_key].is_set()

View File

@@ -1,340 +0,0 @@
"""Tests for hermes claw commands."""
from argparse import Namespace
from types import ModuleType
from unittest.mock import MagicMock, patch
import pytest
from hermes_cli import claw as claw_mod
# ---------------------------------------------------------------------------
# _find_migration_script
# ---------------------------------------------------------------------------
class TestFindMigrationScript:
"""Test script discovery in known locations."""
def test_finds_project_root_script(self, tmp_path):
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with patch.object(claw_mod, "_OPENCLAW_SCRIPT", script):
assert claw_mod._find_migration_script() == script
def test_finds_installed_script(self, tmp_path):
installed = tmp_path / "installed.py"
installed.write_text("# placeholder")
with (
patch.object(claw_mod, "_OPENCLAW_SCRIPT", tmp_path / "nonexistent.py"),
patch.object(claw_mod, "_OPENCLAW_SCRIPT_INSTALLED", installed),
):
assert claw_mod._find_migration_script() == installed
def test_returns_none_when_missing(self, tmp_path):
with (
patch.object(claw_mod, "_OPENCLAW_SCRIPT", tmp_path / "a.py"),
patch.object(claw_mod, "_OPENCLAW_SCRIPT_INSTALLED", tmp_path / "b.py"),
):
assert claw_mod._find_migration_script() is None
# ---------------------------------------------------------------------------
# claw_command routing
# ---------------------------------------------------------------------------
class TestClawCommand:
"""Test the claw_command router."""
def test_routes_to_migrate(self):
args = Namespace(claw_action="migrate", source=None, dry_run=True,
preset="full", overwrite=False, migrate_secrets=False,
workspace_target=None, skill_conflict="skip", yes=False)
with patch.object(claw_mod, "_cmd_migrate") as mock:
claw_mod.claw_command(args)
mock.assert_called_once_with(args)
def test_shows_help_for_no_action(self, capsys):
args = Namespace(claw_action=None)
claw_mod.claw_command(args)
captured = capsys.readouterr()
assert "migrate" in captured.out
# ---------------------------------------------------------------------------
# _cmd_migrate
# ---------------------------------------------------------------------------
class TestCmdMigrate:
"""Test the migrate command handler."""
def test_error_when_source_missing(self, tmp_path, capsys):
args = Namespace(
source=str(tmp_path / "nonexistent"),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "not found" in captured.out
def test_error_when_script_missing(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_OPENCLAW_SCRIPT", tmp_path / "a.py"),
patch.object(claw_mod, "_OPENCLAW_SCRIPT_INSTALLED", tmp_path / "b.py"),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration script not found" in captured.out
def test_dry_run_succeeds(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
script = tmp_path / "script.py"
script.write_text("# placeholder")
# Build a fake migration module
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul", "memory"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 0, "skipped": 5, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "skipped", "reason": "Not found"},
],
"preset": "full",
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=script),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Dry Run Results" in captured.out
assert "5 skipped" in captured.out
def test_execute_with_confirmation(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("agent:\n max_turns: 90\n")
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 2, "skipped": 1, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": str(tmp_path / "SOUL.md")},
{"kind": "memory", "status": "migrated", "destination": str(tmp_path / "memories/MEMORY.md")},
],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="user-data", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "prompt_yes_no", return_value=True),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration Results" in captured.out
assert "Migration complete!" in captured.out
def test_execute_cancelled_by_user(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("")
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "prompt_yes_no", return_value=False),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration cancelled" in captured.out
def test_execute_with_yes_skips_confirmation(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("")
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value=set())
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
"items": [],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=False, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=True,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "prompt_yes_no") as mock_prompt,
):
claw_mod._cmd_migrate(args)
mock_prompt.assert_not_called()
def test_handles_migration_error(self, tmp_path, capsys):
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
config_path = tmp_path / "config.yaml"
config_path.write_text("")
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", side_effect=RuntimeError("boom")),
patch.object(claw_mod, "get_config_path", return_value=config_path),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
):
claw_mod._cmd_migrate(args)
captured = capsys.readouterr()
assert "Migration failed" in captured.out
def test_full_preset_enables_secrets(self, tmp_path, capsys):
"""The 'full' preset should set migrate_secrets=True automatically."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value=set())
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
"items": [],
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
args = Namespace(
source=str(openclaw_dir),
dry_run=True, preset="full", overwrite=False,
migrate_secrets=False, # Not explicitly set by user
workspace_target=None,
skill_conflict="skip", yes=False,
)
with (
patch.object(claw_mod, "_find_migration_script", return_value=tmp_path / "s.py"),
patch.object(claw_mod, "_load_migration_module", return_value=fake_mod),
patch.object(claw_mod, "get_config_path", return_value=tmp_path / "config.yaml"),
patch.object(claw_mod, "save_config"),
patch.object(claw_mod, "load_config", return_value={}),
):
claw_mod._cmd_migrate(args)
# Migrator should have been called with migrate_secrets=True
call_kwargs = fake_mod.Migrator.call_args[1]
assert call_kwargs["migrate_secrets"] is True
# ---------------------------------------------------------------------------
# _print_migration_report
# ---------------------------------------------------------------------------
class TestPrintMigrationReport:
"""Test the report formatting function."""
def test_dry_run_report(self, capsys):
report = {
"summary": {"migrated": 2, "skipped": 1, "conflict": 1, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": "/home/user/.hermes/SOUL.md"},
{"kind": "memory", "status": "migrated", "destination": "/home/user/.hermes/memories/MEMORY.md"},
{"kind": "skills", "status": "conflict", "reason": "already exists"},
{"kind": "tts-assets", "status": "skipped", "reason": "not found"},
],
"preset": "full",
}
claw_mod._print_migration_report(report, dry_run=True)
captured = capsys.readouterr()
assert "Dry Run Results" in captured.out
assert "Would migrate" in captured.out
assert "2 would migrate" in captured.out
assert "--dry-run" in captured.out
def test_execute_report(self, capsys):
report = {
"summary": {"migrated": 3, "skipped": 0, "conflict": 0, "error": 0},
"items": [
{"kind": "soul", "status": "migrated", "destination": "/home/user/.hermes/SOUL.md"},
],
"output_dir": "/home/user/.hermes/migration/openclaw/20250312T120000",
}
claw_mod._print_migration_report(report, dry_run=False)
captured = capsys.readouterr()
assert "Migration Results" in captured.out
assert "Migrated" in captured.out
assert "Full report saved to" in captured.out
def test_empty_report(self, capsys):
report = {
"summary": {"migrated": 0, "skipped": 0, "conflict": 0, "error": 0},
"items": [],
}
claw_mod._print_migration_report(report, dry_run=False)
captured = capsys.readouterr()
assert "Nothing to migrate" in captured.out

View File

@@ -1,284 +0,0 @@
"""Tests for OpenClaw migration integration in the setup wizard."""
from argparse import Namespace
from types import ModuleType
from unittest.mock import MagicMock, patch
from hermes_cli import setup as setup_mod
# ---------------------------------------------------------------------------
# _offer_openclaw_migration — unit tests
# ---------------------------------------------------------------------------
class TestOfferOpenclawMigration:
"""Test the _offer_openclaw_migration helper in isolation."""
def test_skips_when_no_openclaw_dir(self, tmp_path):
"""Should return False immediately when ~/.openclaw does not exist."""
with patch("hermes_cli.setup.Path.home", return_value=tmp_path):
assert setup_mod._offer_openclaw_migration(tmp_path / ".hermes") is False
def test_skips_when_migration_script_missing(self, tmp_path):
"""Should return False when the migration script file is absent."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", tmp_path / "nonexistent.py"),
):
assert setup_mod._offer_openclaw_migration(tmp_path / ".hermes") is False
def test_skips_when_user_declines(self, tmp_path):
"""Should return False when user declines the migration prompt."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=False),
):
assert setup_mod._offer_openclaw_migration(tmp_path / ".hermes") is False
def test_runs_migration_when_user_accepts(self, tmp_path):
"""Should dynamically load the script and run the Migrator."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
# Create a fake hermes home with config
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("agent:\n max_turns: 90\n")
# Build a fake migration module
fake_mod = ModuleType("openclaw_to_hermes")
fake_mod.resolve_selected_options = MagicMock(return_value={"soul", "memory"})
fake_migrator = MagicMock()
fake_migrator.migrate.return_value = {
"summary": {"migrated": 3, "skipped": 1, "conflict": 0, "error": 0},
"output_dir": str(hermes_home / "migration"),
}
fake_mod.Migrator = MagicMock(return_value=fake_migrator)
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=True),
patch.object(setup_mod, "get_config_path", return_value=config_path),
patch("importlib.util.spec_from_file_location") as mock_spec_fn,
):
# Wire up the fake module loading
mock_spec = MagicMock()
mock_spec.loader = MagicMock()
mock_spec_fn.return_value = mock_spec
def exec_module(mod):
mod.resolve_selected_options = fake_mod.resolve_selected_options
mod.Migrator = fake_mod.Migrator
mock_spec.loader.exec_module = exec_module
result = setup_mod._offer_openclaw_migration(hermes_home)
assert result is True
fake_mod.resolve_selected_options.assert_called_once_with(
None, None, preset="full"
)
fake_mod.Migrator.assert_called_once()
call_kwargs = fake_mod.Migrator.call_args[1]
assert call_kwargs["execute"] is True
assert call_kwargs["overwrite"] is False
assert call_kwargs["migrate_secrets"] is True
assert call_kwargs["preset_name"] == "full"
fake_migrator.migrate.assert_called_once()
def test_handles_migration_error_gracefully(self, tmp_path):
"""Should catch exceptions and return False."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
config_path.write_text("")
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=True),
patch.object(setup_mod, "get_config_path", return_value=config_path),
patch(
"importlib.util.spec_from_file_location",
side_effect=RuntimeError("boom"),
),
):
result = setup_mod._offer_openclaw_migration(hermes_home)
assert result is False
def test_creates_config_if_missing(self, tmp_path):
"""Should bootstrap config.yaml before running migration."""
openclaw_dir = tmp_path / ".openclaw"
openclaw_dir.mkdir()
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
config_path = hermes_home / "config.yaml"
# config does NOT exist yet
script = tmp_path / "openclaw_to_hermes.py"
script.write_text("# placeholder")
with (
patch("hermes_cli.setup.Path.home", return_value=tmp_path),
patch.object(setup_mod, "_OPENCLAW_SCRIPT", script),
patch.object(setup_mod, "prompt_yes_no", return_value=True),
patch.object(setup_mod, "get_config_path", return_value=config_path),
patch.object(setup_mod, "load_config", return_value={"agent": {}}),
patch.object(setup_mod, "save_config") as mock_save,
patch(
"importlib.util.spec_from_file_location",
side_effect=RuntimeError("stop early"),
),
):
setup_mod._offer_openclaw_migration(hermes_home)
# save_config should have been called to bootstrap the file
mock_save.assert_called_once_with({"agent": {}})
# ---------------------------------------------------------------------------
# Integration with run_setup_wizard — first-time flow
# ---------------------------------------------------------------------------
def _first_time_args() -> Namespace:
return Namespace(
section=None,
non_interactive=False,
reset=False,
)
class TestSetupWizardOpenclawIntegration:
"""Verify _offer_openclaw_migration is called during first-time setup."""
def test_migration_offered_during_first_time_setup(self, tmp_path):
"""On first-time setup, _offer_openclaw_migration should be called."""
args = _first_time_args()
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(setup_mod, "load_config", return_value={}),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(setup_mod, "get_env_value", return_value=""),
patch("hermes_cli.auth.get_active_provider", return_value=None),
# User presses Enter to start
patch("builtins.input", return_value=""),
# Mock the migration offer
patch.object(
setup_mod, "_offer_openclaw_migration", return_value=False
) as mock_migration,
# Mock the actual setup sections so they don't run
patch.object(setup_mod, "setup_model_provider"),
patch.object(setup_mod, "setup_terminal_backend"),
patch.object(setup_mod, "setup_agent_settings"),
patch.object(setup_mod, "setup_gateway"),
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
):
setup_mod.run_setup_wizard(args)
mock_migration.assert_called_once_with(tmp_path)
def test_migration_reloads_config_on_success(self, tmp_path):
"""When migration returns True, config should be reloaded."""
args = _first_time_args()
call_order = []
def tracking_load_config():
call_order.append("load_config")
return {}
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(setup_mod, "load_config", side_effect=tracking_load_config),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(setup_mod, "get_env_value", return_value=""),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "_offer_openclaw_migration", return_value=True),
patch.object(setup_mod, "setup_model_provider"),
patch.object(setup_mod, "setup_terminal_backend"),
patch.object(setup_mod, "setup_agent_settings"),
patch.object(setup_mod, "setup_gateway"),
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
):
setup_mod.run_setup_wizard(args)
# load_config called twice: once at start, once after migration
assert call_order.count("load_config") == 2
def test_reloaded_config_flows_into_remaining_setup_sections(self, tmp_path):
args = _first_time_args()
initial_config = {}
reloaded_config = {"model": {"provider": "openrouter"}}
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(
setup_mod,
"load_config",
side_effect=[initial_config, reloaded_config],
),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(setup_mod, "get_env_value", return_value=""),
patch("hermes_cli.auth.get_active_provider", return_value=None),
patch("builtins.input", return_value=""),
patch.object(setup_mod, "_offer_openclaw_migration", return_value=True),
patch.object(setup_mod, "setup_model_provider") as setup_model_provider,
patch.object(setup_mod, "setup_terminal_backend"),
patch.object(setup_mod, "setup_agent_settings"),
patch.object(setup_mod, "setup_gateway"),
patch.object(setup_mod, "setup_tools"),
patch.object(setup_mod, "save_config"),
patch.object(setup_mod, "_print_setup_summary"),
):
setup_mod.run_setup_wizard(args)
setup_model_provider.assert_called_once_with(reloaded_config)
def test_migration_not_offered_for_existing_install(self, tmp_path):
"""Returning users should not see the migration prompt."""
args = _first_time_args()
with (
patch.object(setup_mod, "ensure_hermes_home"),
patch.object(setup_mod, "load_config", return_value={}),
patch.object(setup_mod, "get_hermes_home", return_value=tmp_path),
patch.object(
setup_mod,
"get_env_value",
side_effect=lambda k: "sk-xxx" if k == "OPENROUTER_API_KEY" else "",
),
patch("hermes_cli.auth.get_active_provider", return_value=None),
# Returning user picks "Exit"
patch.object(setup_mod, "prompt_choice", return_value=9),
patch.object(
setup_mod, "_offer_openclaw_migration", return_value=False
) as mock_migration,
):
setup_mod.run_setup_wizard(args)
mock_migration.assert_not_called()

View File

@@ -1,23 +1,13 @@
from io import StringIO
import pytest
from rich.console import Console
from hermes_cli.skills_hub import do_list
class _DummyLockFile:
def __init__(self, installed):
self._installed = installed
def list_installed(self):
return self._installed
@pytest.fixture()
def hub_env(monkeypatch, tmp_path):
"""Set up isolated hub directory paths and return (monkeypatch, tmp_path)."""
def test_do_list_initializes_hub_dir(monkeypatch, tmp_path):
import tools.skills_hub as hub
import tools.skills_tool as skills_tool
hub_dir = tmp_path / "skills" / ".hub"
monkeypatch.setattr(hub, "SKILLS_DIR", tmp_path / "skills")
@@ -27,98 +17,15 @@ def hub_env(monkeypatch, tmp_path):
monkeypatch.setattr(hub, "AUDIT_LOG", hub_dir / "audit.log")
monkeypatch.setattr(hub, "TAPS_FILE", hub_dir / "taps.json")
monkeypatch.setattr(hub, "INDEX_CACHE_DIR", hub_dir / "index-cache")
return hub_dir
# ---------------------------------------------------------------------------
# Fixtures for common skill setups
# ---------------------------------------------------------------------------
_HUB_ENTRY = {"name": "hub-skill", "source": "github", "trust_level": "community"}
_ALL_THREE_SKILLS = [
{"name": "hub-skill", "category": "x", "description": "hub"},
{"name": "builtin-skill", "category": "x", "description": "builtin"},
{"name": "local-skill", "category": "x", "description": "local"},
]
_BUILTIN_MANIFEST = {"builtin-skill": "abc123"}
@pytest.fixture()
def three_source_env(monkeypatch, hub_env):
"""Populate hub/builtin/local skills for source-classification tests."""
import tools.skills_hub as hub
import tools.skills_sync as skills_sync
import tools.skills_tool as skills_tool
monkeypatch.setattr(hub, "HubLockFile", lambda: _DummyLockFile([_HUB_ENTRY]))
monkeypatch.setattr(skills_tool, "_find_all_skills", lambda: list(_ALL_THREE_SKILLS))
monkeypatch.setattr(skills_sync, "_read_manifest", lambda: dict(_BUILTIN_MANIFEST))
return hub_env
def _capture(source_filter: str = "all") -> str:
"""Run do_list into a string buffer and return the output."""
sink = StringIO()
console = Console(file=sink, force_terminal=False, color_system=None)
do_list(source_filter=source_filter, console=console)
return sink.getvalue()
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_do_list_initializes_hub_dir(monkeypatch, hub_env):
import tools.skills_sync as skills_sync
import tools.skills_tool as skills_tool
monkeypatch.setattr(skills_tool, "_find_all_skills", lambda: [])
monkeypatch.setattr(skills_sync, "_read_manifest", lambda: {})
hub_dir = hub_env
console = Console(file=StringIO(), force_terminal=False, color_system=None)
assert not hub_dir.exists()
_capture()
do_list(console=console)
assert hub_dir.exists()
assert (hub_dir / "lock.json").exists()
assert (hub_dir / "quarantine").is_dir()
assert (hub_dir / "index-cache").is_dir()
def test_do_list_distinguishes_hub_builtin_and_local(three_source_env):
output = _capture()
assert "hub-skill" in output
assert "builtin-skill" in output
assert "local-skill" in output
assert "1 hub-installed, 1 builtin, 1 local" in output
def test_do_list_filter_local(three_source_env):
output = _capture(source_filter="local")
assert "local-skill" in output
assert "builtin-skill" not in output
assert "hub-skill" not in output
def test_do_list_filter_hub(three_source_env):
output = _capture(source_filter="hub")
assert "hub-skill" in output
assert "builtin-skill" not in output
assert "local-skill" not in output
def test_do_list_filter_builtin(three_source_env):
output = _capture(source_filter="builtin")
assert "builtin-skill" in output
assert "hub-skill" not in output
assert "local-skill" not in output

View File

@@ -1,141 +0,0 @@
#!/usr/bin/env python3
"""Run a real interrupt test with actual AIAgent + delegate child.
Not a pytest test — runs directly as a script for live testing.
"""
import threading
import time
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import MagicMock, patch
from run_agent import AIAgent, IterationBudget
from tools.delegate_tool import _run_single_child
from tools.interrupt import set_interrupt, is_interrupted
set_interrupt(False)
# Create parent agent (minimal)
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
parent.iteration_budget = IterationBudget(max_total=100)
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
child_started = threading.Event()
result_holder = [None]
def run_delegate():
with patch("run_agent.OpenAI") as MockOpenAI:
mock_client = MagicMock()
def slow_create(**kwargs):
time.sleep(3)
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message.content = "Done"
resp.choices[0].message.tool_calls = None
resp.choices[0].message.refusal = None
resp.choices[0].finish_reason = "stop"
resp.usage.prompt_tokens = 100
resp.usage.completion_tokens = 10
resp.usage.total_tokens = 110
resp.usage.prompt_tokens_details = None
return resp
mock_client.chat.completions.create = slow_create
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
original_init = AIAgent.__init__
def patched_init(self_agent, *a, **kw):
original_init(self_agent, *a, **kw)
child_started.set()
with patch.object(AIAgent, "__init__", patched_init):
try:
result = _run_single_child(
task_index=0,
goal="Test slow task",
context=None,
toolsets=["terminal"],
model="test/model",
max_iterations=5,
parent_agent=parent,
task_count=1,
override_provider="test",
override_base_url="http://localhost:1",
override_api_key="test",
override_api_mode="chat_completions",
)
result_holder[0] = result
except Exception as e:
print(f"ERROR in delegate: {e}")
import traceback
traceback.print_exc()
print("Starting agent thread...")
agent_thread = threading.Thread(target=run_delegate, daemon=True)
agent_thread.start()
started = child_started.wait(timeout=10)
if not started:
print("ERROR: Child never started")
sys.exit(1)
time.sleep(0.5)
print(f"Active children: {len(parent._active_children)}")
for i, c in enumerate(parent._active_children):
print(f" Child {i}: _interrupt_requested={c._interrupt_requested}")
t0 = time.monotonic()
parent.interrupt("User typed a new message")
print(f"Called parent.interrupt()")
for i, c in enumerate(parent._active_children):
print(f" Child {i} after interrupt: _interrupt_requested={c._interrupt_requested}")
print(f"Global is_interrupted: {is_interrupted()}")
agent_thread.join(timeout=10)
elapsed = time.monotonic() - t0
print(f"Agent thread finished in {elapsed:.2f}s")
result = result_holder[0]
if result:
print(f"Status: {result['status']}")
print(f"Duration: {result['duration_seconds']}s")
if elapsed < 2.0:
print("✅ PASS: Interrupt detected quickly!")
else:
print(f"❌ FAIL: Took {elapsed:.2f}s — interrupt was too slow or not detected")
else:
print("❌ FAIL: No result!")
set_interrupt(False)

View File

@@ -1,171 +0,0 @@
"""End-to-end test simulating CLI interrupt during subagent execution.
Reproduces the exact scenario:
1. Parent agent calls delegate_task
2. Child agent is running (simulated with a slow tool)
3. User "types a message" (simulated by calling parent.interrupt from another thread)
4. Child should detect the interrupt and stop
This tests the COMPLETE path including _run_single_child, _active_children
registration, interrupt propagation, and child detection.
"""
import json
import os
import queue
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from tools.interrupt import set_interrupt, is_interrupted
class TestCLISubagentInterrupt(unittest.TestCase):
"""Simulate exact CLI scenario."""
def setUp(self):
set_interrupt(False)
def tearDown(self):
set_interrupt(False)
def test_full_delegate_interrupt_flow(self):
"""Full integration: parent runs delegate_task, main thread interrupts."""
from run_agent import AIAgent
interrupt_detected = threading.Event()
child_started = threading.Event()
child_api_call_count = 0
# Create a real-enough parent agent
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
# We'll track what happens with _active_children
original_children = parent._active_children
# Mock the child's run_conversation to simulate a slow operation
# that checks _interrupt_requested like the real one does
def mock_child_run_conversation(user_message, **kwargs):
child_started.set()
# Find the child in parent._active_children
child = parent._active_children[-1] if parent._active_children else None
# Simulate the agent loop: poll _interrupt_requested like run_conversation does
for i in range(100): # Up to 10 seconds (100 * 0.1s)
if child and child._interrupt_requested:
interrupt_detected.set()
return {
"final_response": "Interrupted!",
"messages": [],
"api_calls": 1,
"completed": False,
"interrupted": True,
"interrupt_message": child._interrupt_message,
}
time.sleep(0.1)
return {
"final_response": "Finished without interrupt",
"messages": [],
"api_calls": 5,
"completed": True,
"interrupted": False,
}
# Patch AIAgent to use our mock
from tools.delegate_tool import _run_single_child
from run_agent import IterationBudget
parent.iteration_budget = IterationBudget(max_total=100)
# Run delegate in a thread (simulates agent_thread)
delegate_result = [None]
delegate_error = [None]
def run_delegate():
try:
with patch('run_agent.AIAgent') as MockAgent:
mock_instance = MagicMock()
mock_instance._interrupt_requested = False
mock_instance._interrupt_message = None
mock_instance._active_children = []
mock_instance.quiet_mode = True
mock_instance.run_conversation = mock_child_run_conversation
mock_instance.interrupt = lambda msg=None: setattr(mock_instance, '_interrupt_requested', True) or setattr(mock_instance, '_interrupt_message', msg)
mock_instance.tools = []
MockAgent.return_value = mock_instance
result = _run_single_child(
task_index=0,
goal="Do something slow",
context=None,
toolsets=["terminal"],
model=None,
max_iterations=50,
parent_agent=parent,
task_count=1,
)
delegate_result[0] = result
except Exception as e:
delegate_error[0] = e
agent_thread = threading.Thread(target=run_delegate, daemon=True)
agent_thread.start()
# Wait for child to start
assert child_started.wait(timeout=5), "Child never started!"
# Now simulate user interrupt (from main/process thread)
time.sleep(0.2) # Give child a moment to be in its loop
print(f"Parent has {len(parent._active_children)} active children")
assert len(parent._active_children) >= 1, f"Expected child in _active_children, got {len(parent._active_children)}"
# This is what the CLI does:
parent.interrupt("Hey stop that")
print(f"Parent._interrupt_requested: {parent._interrupt_requested}")
for i, child in enumerate(parent._active_children):
print(f"Child {i}._interrupt_requested: {child._interrupt_requested}")
# Wait for child to detect interrupt
detected = interrupt_detected.wait(timeout=3.0)
# Wait for delegate to finish
agent_thread.join(timeout=5)
if delegate_error[0]:
raise delegate_error[0]
assert detected, "Child never detected the interrupt!"
result = delegate_result[0]
assert result is not None, "Delegate returned no result"
assert result["status"] == "interrupted", f"Expected 'interrupted', got '{result['status']}'"
print(f"✓ Interrupt detected! Result: {result}")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,189 +0,0 @@
#!/usr/bin/env python3
"""Interactive interrupt test that mimics the exact CLI flow.
Starts an agent in a thread with a mock delegate_task that takes a while,
then simulates the user typing a message via _interrupt_queue.
Logs every step to stderr (which isn't affected by redirect_stdout)
so we can see exactly where the interrupt gets lost.
"""
import contextlib
import io
import json
import logging
import queue
import sys
import threading
import time
import os
# Force stderr logging so redirect_stdout doesn't swallow it
logging.basicConfig(level=logging.DEBUG, stream=sys.stderr,
format="%(asctime)s [%(threadName)s] %(message)s")
log = logging.getLogger("interrupt_test")
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from unittest.mock import MagicMock, patch
from run_agent import AIAgent, IterationBudget
from tools.interrupt import set_interrupt, is_interrupted
set_interrupt(False)
# ─── Create parent agent ───
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
parent.iteration_budget = IterationBudget(max_total=100)
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
# Monkey-patch parent.interrupt to log
_original_interrupt = AIAgent.interrupt
def logged_interrupt(self, message=None):
log.info(f"🔴 parent.interrupt() called with: {message!r}")
log.info(f" _active_children count: {len(self._active_children)}")
_original_interrupt(self, message)
log.info(f" After interrupt: _interrupt_requested={self._interrupt_requested}")
for i, c in enumerate(self._active_children):
log.info(f" Child {i}._interrupt_requested={c._interrupt_requested}")
parent.interrupt = lambda msg=None: logged_interrupt(parent, msg)
# ─── Simulate the exact CLI flow ───
interrupt_queue = queue.Queue()
child_running = threading.Event()
agent_result = [None]
def make_slow_response(delay=2.0):
"""API response that takes a while."""
def create(**kwargs):
log.info(f" 🌐 Mock API call starting (will take {delay}s)...")
time.sleep(delay)
log.info(f" 🌐 Mock API call completed")
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message.content = "Done with the task"
resp.choices[0].message.tool_calls = None
resp.choices[0].message.refusal = None
resp.choices[0].finish_reason = "stop"
resp.usage.prompt_tokens = 100
resp.usage.completion_tokens = 10
resp.usage.total_tokens = 110
resp.usage.prompt_tokens_details = None
return resp
return create
def agent_thread_func():
"""Simulates the agent_thread in cli.py's chat() method."""
log.info("🟢 agent_thread starting")
with patch("run_agent.OpenAI") as MockOpenAI:
mock_client = MagicMock()
mock_client.chat.completions.create = make_slow_response(delay=3.0)
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
from tools.delegate_tool import _run_single_child
# Signal that child is about to start
original_init = AIAgent.__init__
def patched_init(self_agent, *a, **kw):
log.info("🟡 Child AIAgent.__init__ called")
original_init(self_agent, *a, **kw)
child_running.set()
log.info(f"🟡 Child started, parent._active_children = {len(parent._active_children)}")
with patch.object(AIAgent, "__init__", patched_init):
result = _run_single_child(
task_index=0,
goal="Do a slow thing",
context=None,
toolsets=["terminal"],
model="test/model",
max_iterations=3,
parent_agent=parent,
task_count=1,
override_provider="test",
override_base_url="http://localhost:1",
override_api_key="test",
override_api_mode="chat_completions",
)
agent_result[0] = result
log.info(f"🟢 agent_thread finished. Result status: {result.get('status')}")
# ─── Start agent thread (like chat() does) ───
agent_thread = threading.Thread(target=agent_thread_func, name="agent_thread", daemon=True)
agent_thread.start()
# ─── Wait for child to start ───
if not child_running.wait(timeout=10):
print("FAIL: Child never started", file=sys.stderr)
sys.exit(1)
# Give child time to enter its main loop and start API call
time.sleep(1.0)
# ─── Simulate user typing a message (like handle_enter does) ───
log.info("📝 Simulating user typing 'Hey stop that'")
interrupt_queue.put("Hey stop that")
# ─── Simulate chat() polling loop (like the real chat() method) ───
log.info("📡 Starting interrupt queue polling (like chat())")
interrupt_msg = None
poll_count = 0
while agent_thread.is_alive():
try:
interrupt_msg = interrupt_queue.get(timeout=0.1)
if interrupt_msg:
log.info(f"📨 Got interrupt message from queue: {interrupt_msg!r}")
log.info(f" Calling parent.interrupt()...")
parent.interrupt(interrupt_msg)
log.info(f" parent.interrupt() returned. Breaking poll loop.")
break
except queue.Empty:
poll_count += 1
if poll_count % 20 == 0: # Log every 2s
log.info(f" Still polling ({poll_count} iterations)...")
# ─── Wait for agent to finish ───
log.info("⏳ Waiting for agent_thread to join...")
t0 = time.monotonic()
agent_thread.join(timeout=10)
elapsed = time.monotonic() - t0
log.info(f"✅ agent_thread joined after {elapsed:.2f}s")
# ─── Check results ───
result = agent_result[0]
if result:
log.info(f"Result status: {result['status']}")
log.info(f"Result duration: {result['duration_seconds']}s")
if result["status"] == "interrupted" and elapsed < 2.0:
print("✅ PASS: Interrupt worked correctly!", file=sys.stderr)
else:
print(f"❌ FAIL: status={result['status']}, elapsed={elapsed:.2f}s", file=sys.stderr)
else:
print("❌ FAIL: No result returned", file=sys.stderr)
set_interrupt(False)

View File

@@ -1,155 +0,0 @@
"""Test interrupt propagation from parent to child agents.
Reproduces the CLI scenario: user sends a message while delegate_task is
running, main thread calls parent.interrupt(), child should stop.
"""
import json
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from tools.interrupt import set_interrupt, is_interrupted, _interrupt_event
class TestInterruptPropagationToChild(unittest.TestCase):
"""Verify interrupt propagates from parent to child agent."""
def setUp(self):
set_interrupt(False)
def tearDown(self):
set_interrupt(False)
def test_parent_interrupt_sets_child_flag(self):
"""When parent.interrupt() is called, child._interrupt_requested should be set."""
from run_agent import AIAgent
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = False
child._interrupt_message = None
child._active_children = []
child.quiet_mode = True
parent._active_children.append(child)
parent.interrupt("new user message")
assert parent._interrupt_requested is True
assert child._interrupt_requested is True
assert child._interrupt_message == "new user message"
assert is_interrupted() is True
def test_child_clear_interrupt_at_start_clears_global(self):
"""child.clear_interrupt() at start of run_conversation clears the GLOBAL event.
This is the intended behavior at startup, but verify it doesn't
accidentally clear an interrupt intended for a running child.
"""
from run_agent import AIAgent
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = True
child._interrupt_message = "msg"
child.quiet_mode = True
child._active_children = []
# Global is set
set_interrupt(True)
assert is_interrupted() is True
# child.clear_interrupt() clears both
child.clear_interrupt()
assert child._interrupt_requested is False
assert is_interrupted() is False
def test_interrupt_during_child_api_call_detected(self):
"""Interrupt set during _interruptible_api_call is detected within 0.5s."""
from run_agent import AIAgent
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = False
child._interrupt_message = None
child._active_children = []
child.quiet_mode = True
child.api_mode = "chat_completions"
child.log_prefix = ""
child._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1234"}
# Mock a slow API call
mock_client = MagicMock()
def slow_api_call(**kwargs):
time.sleep(5) # Would take 5s normally
return MagicMock()
mock_client.chat.completions.create = slow_api_call
mock_client.close = MagicMock()
child.client = mock_client
# Set interrupt after 0.2s from another thread
def set_interrupt_later():
time.sleep(0.2)
child.interrupt("stop!")
t = threading.Thread(target=set_interrupt_later, daemon=True)
t.start()
start = time.monotonic()
try:
child._interruptible_api_call({"model": "test", "messages": []})
self.fail("Should have raised InterruptedError")
except InterruptedError:
elapsed = time.monotonic() - start
# Should detect within ~0.5s (0.2s delay + 0.3s poll interval)
assert elapsed < 1.0, f"Took {elapsed:.2f}s to detect interrupt (expected < 1.0s)"
finally:
t.join(timeout=2)
set_interrupt(False)
def test_concurrent_interrupt_propagation(self):
"""Simulates exact CLI flow: parent runs delegate in thread, main thread interrupts."""
from run_agent import AIAgent
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
child = AIAgent.__new__(AIAgent)
child._interrupt_requested = False
child._interrupt_message = None
child._active_children = []
child.quiet_mode = True
# Register child (simulating what _run_single_child does)
parent._active_children.append(child)
# Simulate child running (checking flag in a loop)
child_detected = threading.Event()
def simulate_child_loop():
while not child._interrupt_requested:
time.sleep(0.05)
child_detected.set()
child_thread = threading.Thread(target=simulate_child_loop, daemon=True)
child_thread.start()
# Small delay, then interrupt from "main thread"
time.sleep(0.1)
parent.interrupt("user typed something new")
# Child should detect within 200ms
detected = child_detected.wait(timeout=1.0)
assert detected, "Child never detected the interrupt!"
child_thread.join(timeout=1)
set_interrupt(False)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,176 +0,0 @@
"""Test real interrupt propagation through delegate_task with actual AIAgent.
This uses a real AIAgent with mocked HTTP responses to test the complete
interrupt flow through _run_single_child → child.run_conversation().
"""
import json
import os
import threading
import time
import unittest
from unittest.mock import MagicMock, patch, PropertyMock
from tools.interrupt import set_interrupt, is_interrupted
def _make_slow_api_response(delay=5.0):
"""Create a mock that simulates a slow API response (like a real LLM call)."""
def slow_create(**kwargs):
# Simulate a slow API call
time.sleep(delay)
# Return a simple text response (no tool calls)
resp = MagicMock()
resp.choices = [MagicMock()]
resp.choices[0].message = MagicMock()
resp.choices[0].message.content = "Done"
resp.choices[0].message.tool_calls = None
resp.choices[0].message.refusal = None
resp.choices[0].finish_reason = "stop"
resp.usage = MagicMock()
resp.usage.prompt_tokens = 100
resp.usage.completion_tokens = 10
resp.usage.total_tokens = 110
resp.usage.prompt_tokens_details = None
return resp
return slow_create
class TestRealSubagentInterrupt(unittest.TestCase):
"""Test interrupt with real AIAgent child through delegate_tool."""
def setUp(self):
set_interrupt(False)
os.environ.setdefault("OPENAI_API_KEY", "test-key")
def tearDown(self):
set_interrupt(False)
def test_interrupt_child_during_api_call(self):
"""Real AIAgent child interrupted while making API call."""
from run_agent import AIAgent, IterationBudget
# Create a real parent agent (just enough to be a parent)
parent = AIAgent.__new__(AIAgent)
parent._interrupt_requested = False
parent._interrupt_message = None
parent._active_children = []
parent.quiet_mode = True
parent.model = "test/model"
parent.base_url = "http://localhost:1"
parent.api_key = "test"
parent.provider = "test"
parent.api_mode = "chat_completions"
parent.platform = "cli"
parent.enabled_toolsets = ["terminal", "file"]
parent.providers_allowed = None
parent.providers_ignored = None
parent.providers_order = None
parent.provider_sort = None
parent.max_tokens = None
parent.reasoning_config = None
parent.prefill_messages = None
parent._session_db = None
parent._delegate_depth = 0
parent._delegate_spinner = None
parent.tool_progress_callback = None
parent.iteration_budget = IterationBudget(max_total=100)
parent._client_kwargs = {"api_key": "test", "base_url": "http://localhost:1"}
from tools.delegate_tool import _run_single_child
child_started = threading.Event()
result_holder = [None]
error_holder = [None]
def run_delegate():
try:
# Patch the OpenAI client creation inside AIAgent.__init__
with patch('run_agent.OpenAI') as MockOpenAI:
mock_client = MagicMock()
# API call takes 5 seconds — should be interrupted before that
mock_client.chat.completions.create = _make_slow_api_response(delay=5.0)
mock_client.close = MagicMock()
MockOpenAI.return_value = mock_client
# Also need to patch the system prompt builder
with patch('run_agent.build_system_prompt', return_value="You are a test agent"):
# Signal when child starts
original_run = AIAgent.run_conversation
def patched_run(self_agent, *args, **kwargs):
child_started.set()
return original_run(self_agent, *args, **kwargs)
with patch.object(AIAgent, 'run_conversation', patched_run):
result = _run_single_child(
task_index=0,
goal="Test task",
context=None,
toolsets=["terminal"],
model="test/model",
max_iterations=5,
parent_agent=parent,
task_count=1,
override_provider="test",
override_base_url="http://localhost:1",
override_api_key="test",
override_api_mode="chat_completions",
)
result_holder[0] = result
except Exception as e:
import traceback
traceback.print_exc()
error_holder[0] = e
agent_thread = threading.Thread(target=run_delegate, daemon=True)
agent_thread.start()
# Wait for child to start run_conversation
started = child_started.wait(timeout=10)
if not started:
agent_thread.join(timeout=1)
if error_holder[0]:
raise error_holder[0]
self.fail("Child never started run_conversation")
# Give child time to enter main loop and start API call
time.sleep(0.5)
# Verify child is registered
print(f"Active children: {len(parent._active_children)}")
self.assertGreaterEqual(len(parent._active_children), 1,
"Child not registered in _active_children")
# Interrupt! (simulating what CLI does)
start = time.monotonic()
parent.interrupt("User typed a new message")
# Check propagation
child = parent._active_children[0] if parent._active_children else None
if child:
print(f"Child._interrupt_requested after parent.interrupt(): {child._interrupt_requested}")
self.assertTrue(child._interrupt_requested,
"Interrupt did not propagate to child!")
# Wait for delegate to finish (should be fast since interrupted)
agent_thread.join(timeout=5)
elapsed = time.monotonic() - start
if error_holder[0]:
raise error_holder[0]
result = result_holder[0]
self.assertIsNotNone(result, "Delegate returned no result")
print(f"Result status: {result['status']}, elapsed: {elapsed:.2f}s")
print(f"Full result: {result}")
# The child should have been interrupted, not completed the full 5s API call
self.assertLess(elapsed, 3.0,
f"Took {elapsed:.2f}s — interrupt was not detected quickly enough")
self.assertEqual(result["status"], "interrupted",
f"Expected 'interrupted', got '{result['status']}'")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,54 +0,0 @@
"""Verify that redirect_stdout in _run_single_child is process-wide.
This demonstrates that contextlib.redirect_stdout changes sys.stdout
for ALL threads, not just the current one. This means during subagent
execution, all output from other threads (including the CLI's process_thread)
is swallowed.
"""
import contextlib
import io
import sys
import threading
import time
import unittest
class TestRedirectStdoutIsProcessWide(unittest.TestCase):
def test_redirect_stdout_affects_other_threads(self):
"""contextlib.redirect_stdout changes sys.stdout for ALL threads."""
captured_from_other_thread = []
real_stdout = sys.stdout
other_thread_saw_devnull = threading.Event()
def other_thread_work():
"""Runs in a different thread, tries to use sys.stdout."""
time.sleep(0.2) # Let redirect_stdout take effect
# Check what sys.stdout is
if sys.stdout is not real_stdout:
other_thread_saw_devnull.set()
# Try to print — this should go to devnull
captured_from_other_thread.append(sys.stdout)
t = threading.Thread(target=other_thread_work, daemon=True)
t.start()
# redirect_stdout in main thread
devnull = io.StringIO()
with contextlib.redirect_stdout(devnull):
time.sleep(0.5) # Let the other thread check during redirect
t.join(timeout=2)
# The other thread should have seen devnull, NOT the real stdout
self.assertTrue(
other_thread_saw_devnull.is_set(),
"redirect_stdout was NOT process-wide — other thread still saw real stdout. "
"This test's premise is wrong."
)
print("Confirmed: redirect_stdout IS process-wide — affects all threads")
if __name__ == "__main__":
unittest.main()

View File

@@ -54,11 +54,8 @@ ENVIRONMENTS_DIR = TINKER_ATROPOS_ROOT / "tinker_atropos" / "environments"
CONFIGS_DIR = TINKER_ATROPOS_ROOT / "configs"
LOGS_DIR = TINKER_ATROPOS_ROOT / "logs"
def _ensure_logs_dir():
"""Lazily create logs directory on first use (avoid side effects at import time)."""
if TINKER_ATROPOS_ROOT.exists():
LOGS_DIR.mkdir(exist_ok=True)
# Ensure logs directory exists
LOGS_DIR.mkdir(exist_ok=True)
# ============================================================================
@@ -317,8 +314,6 @@ async def _spawn_training_run(run_state: RunState, config_path: Path):
"""
run_id = run_state.run_id
_ensure_logs_dir()
# Log file paths
api_log = LOGS_DIR / f"api_{run_id}.log"
trainer_log = LOGS_DIR / f"trainer_{run_id}.log"
@@ -1097,7 +1092,6 @@ async def rl_test_inference(
}
# Create output directory for test results
_ensure_logs_dir()
test_output_dir = LOGS_DIR / "inference_tests"
test_output_dir.mkdir(exist_ok=True)

View File

@@ -572,23 +572,14 @@ class ClawHubSource(SkillSource):
logger.warning("ClawHub fetch failed for %s: could not resolve latest version", slug)
return None
# Primary method: download the skill as a ZIP bundle from /download
files = self._download_zip(slug, latest_version)
# Fallback: try the version metadata endpoint for inline/raw content
if "SKILL.md" not in files:
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}")
if isinstance(version_data, dict):
# Files may be nested under version_data["version"]["files"]
files = self._extract_files(version_data) or files
if "SKILL.md" not in files:
nested = version_data.get("version", {})
if isinstance(nested, dict):
files = self._extract_files(nested) or files
version_data = self._get_json(f"{self.BASE_URL}/skills/{slug}/versions/{latest_version}")
if not isinstance(version_data, dict):
return None
files = self._extract_files(version_data)
if "SKILL.md" not in files:
logger.warning(
"ClawHub fetch for %s resolved version %s but could not retrieve file content",
"ClawHub fetch for %s resolved version %s but no inline/raw file content was available",
slug,
latest_version,
)
@@ -683,65 +674,6 @@ class ClawHubSource(SkillSource):
return files
def _download_zip(self, slug: str, version: str) -> Dict[str, str]:
"""Download skill as a ZIP bundle from the /download endpoint and extract text files."""
import io
import zipfile
files: Dict[str, str] = {}
max_retries = 3
for attempt in range(max_retries):
try:
resp = httpx.get(
f"{self.BASE_URL}/download",
params={"slug": slug, "version": version},
timeout=30,
follow_redirects=True,
)
if resp.status_code == 429:
retry_after = int(resp.headers.get("retry-after", "5"))
retry_after = min(retry_after, 15) # Cap wait time
logger.debug(
"ClawHub download rate-limited for %s, retrying in %ds (attempt %d/%d)",
slug, retry_after, attempt + 1, max_retries,
)
time.sleep(retry_after)
continue
if resp.status_code != 200:
logger.debug("ClawHub ZIP download for %s v%s returned %s", slug, version, resp.status_code)
return files
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
for info in zf.infolist():
if info.is_dir():
continue
# Sanitize path — strip leading slashes and ..
name = info.filename.lstrip("/")
if ".." in name or name.startswith("/"):
continue
# Only extract text-sized files (skip large binaries)
if info.file_size > 500_000:
logger.debug("Skipping large file in ZIP: %s (%d bytes)", name, info.file_size)
continue
try:
raw = zf.read(info.filename)
files[name] = raw.decode("utf-8")
except (UnicodeDecodeError, KeyError):
logger.debug("Skipping non-text file in ZIP: %s", name)
continue
return files
except zipfile.BadZipFile:
logger.warning("ClawHub returned invalid ZIP for %s v%s", slug, version)
return files
except httpx.HTTPError as exc:
logger.debug("ClawHub ZIP download failed for %s v%s: %s", slug, version, exc)
return files
logger.debug("ClawHub ZIP download exhausted retries for %s v%s", slug, version)
return files
def _fetch_text(self, url: str) -> Optional[str]:
try:
resp = httpx.get(url, timeout=20)

View File

@@ -26,20 +26,6 @@ const config: Config = {
locales: ['en'],
},
themes: [
[
require.resolve('@easyops-cn/docusaurus-search-local'),
/** @type {import("@easyops-cn/docusaurus-search-local").PluginOptions} */
({
hashed: true,
language: ['en'],
indexBlog: false,
docsRouteBasePath: '/',
highlightSearchTermsOnTargetPage: true,
}),
],
],
presets: [
[
'classic',

View File

@@ -10,7 +10,6 @@
"dependencies": {
"@docusaurus/core": "3.9.2",
"@docusaurus/preset-classic": "3.9.2",
"@easyops-cn/docusaurus-search-local": "^0.55.1",
"@mdx-js/react": "^3.0.0",
"clsx": "^2.0.0",
"prism-react-renderer": "^2.3.0",
@@ -4064,156 +4063,6 @@
"node": ">=20.0"
}
},
"node_modules/@easyops-cn/autocomplete.js": {
"version": "0.38.1",
"resolved": "https://registry.npmjs.org/@easyops-cn/autocomplete.js/-/autocomplete.js-0.38.1.tgz",
"integrity": "sha512-drg76jS6syilOUmVNkyo1c7ZEBPcPuK+aJA7AksM5ZIIbV57DMHCywiCr+uHyv8BE5jUTU98j/H7gVrkHrWW3Q==",
"license": "MIT",
"dependencies": {
"cssesc": "^3.0.0",
"immediate": "^3.2.3"
}
},
"node_modules/@easyops-cn/docusaurus-search-local": {
"version": "0.55.1",
"resolved": "https://registry.npmjs.org/@easyops-cn/docusaurus-search-local/-/docusaurus-search-local-0.55.1.tgz",
"integrity": "sha512-jmBKj1J+tajqNrCvECwKCQYTWwHVZDGApy8lLOYEPe+Dm0/f3Ccdw8BP5/OHNpltr7WDNY2roQXn+TWn2f1kig==",
"license": "MIT",
"dependencies": {
"@docusaurus/plugin-content-docs": "^2 || ^3",
"@docusaurus/theme-translations": "^2 || ^3",
"@docusaurus/utils": "^2 || ^3",
"@docusaurus/utils-common": "^2 || ^3",
"@docusaurus/utils-validation": "^2 || ^3",
"@easyops-cn/autocomplete.js": "^0.38.1",
"@node-rs/jieba": "^1.6.0",
"cheerio": "^1.0.0",
"clsx": "^2.1.1",
"comlink": "^4.4.2",
"debug": "^4.2.0",
"fs-extra": "^10.0.0",
"klaw-sync": "^6.0.0",
"lunr": "^2.3.9",
"lunr-languages": "^1.4.0",
"mark.js": "^8.11.1",
"tslib": "^2.4.0"
},
"engines": {
"node": ">=12"
},
"peerDependencies": {
"@docusaurus/theme-common": "^2 || ^3",
"open-ask-ai": "^0.7.3",
"react": "^16.14.0 || ^17 || ^18 || ^19",
"react-dom": "^16.14.0 || 17 || ^18 || ^19"
},
"peerDependenciesMeta": {
"open-ask-ai": {
"optional": true
}
}
},
"node_modules/@easyops-cn/docusaurus-search-local/node_modules/cheerio": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/cheerio/-/cheerio-1.2.0.tgz",
"integrity": "sha512-WDrybc/gKFpTYQutKIK6UvfcuxijIZfMfXaYm8NMsPQxSYvf+13fXUJ4rztGGbJcBQ/GF55gvrZ0Bc0bj/mqvg==",
"license": "MIT",
"dependencies": {
"cheerio-select": "^2.1.0",
"dom-serializer": "^2.0.0",
"domhandler": "^5.0.3",
"domutils": "^3.2.2",
"encoding-sniffer": "^0.2.1",
"htmlparser2": "^10.1.0",
"parse5": "^7.3.0",
"parse5-htmlparser2-tree-adapter": "^7.1.0",
"parse5-parser-stream": "^7.1.2",
"undici": "^7.19.0",
"whatwg-mimetype": "^4.0.0"
},
"engines": {
"node": ">=20.18.1"
},
"funding": {
"url": "https://github.com/cheeriojs/cheerio?sponsor=1"
}
},
"node_modules/@easyops-cn/docusaurus-search-local/node_modules/entities": {
"version": "7.0.1",
"resolved": "https://registry.npmjs.org/entities/-/entities-7.0.1.tgz",
"integrity": "sha512-TWrgLOFUQTH994YUyl1yT4uyavY5nNB5muff+RtWaqNVCAK408b5ZnnbNAUEWLTCpum9w6arT70i1XdQ4UeOPA==",
"license": "BSD-2-Clause",
"engines": {
"node": ">=0.12"
},
"funding": {
"url": "https://github.com/fb55/entities?sponsor=1"
}
},
"node_modules/@easyops-cn/docusaurus-search-local/node_modules/fs-extra": {
"version": "10.1.0",
"resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-10.1.0.tgz",
"integrity": "sha512-oRXApq54ETRj4eMiFzGnHWGy+zo5raudjuxN0b8H7s/RU2oW0Wvsx9O0ACRN/kRq9E8Vu/ReskGB5o3ji+FzHQ==",
"license": "MIT",
"dependencies": {
"graceful-fs": "^4.2.0",
"jsonfile": "^6.0.1",
"universalify": "^2.0.0"
},
"engines": {
"node": ">=12"
}
},
"node_modules/@easyops-cn/docusaurus-search-local/node_modules/htmlparser2": {
"version": "10.1.0",
"resolved": "https://registry.npmjs.org/htmlparser2/-/htmlparser2-10.1.0.tgz",
"integrity": "sha512-VTZkM9GWRAtEpveh7MSF6SjjrpNVNNVJfFup7xTY3UpFtm67foy9HDVXneLtFVt4pMz5kZtgNcvCniNFb1hlEQ==",
"funding": [
"https://github.com/fb55/htmlparser2?sponsor=1",
{
"type": "github",
"url": "https://github.com/sponsors/fb55"
}
],
"license": "MIT",
"dependencies": {
"domelementtype": "^2.3.0",
"domhandler": "^5.0.3",
"domutils": "^3.2.2",
"entities": "^7.0.1"
}
},
"node_modules/@emnapi/core": {
"version": "1.9.0",
"resolved": "https://registry.npmjs.org/@emnapi/core/-/core-1.9.0.tgz",
"integrity": "sha512-0DQ98G9ZQZOxfUcQn1waV2yS8aWdZ6kJMbYCJB3oUBecjWYO1fqJ+a1DRfPF3O5JEkwqwP1A9QEN/9mYm2Yd0w==",
"license": "MIT",
"optional": true,
"dependencies": {
"@emnapi/wasi-threads": "1.2.0",
"tslib": "^2.4.0"
}
},
"node_modules/@emnapi/runtime": {
"version": "1.9.0",
"resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.9.0.tgz",
"integrity": "sha512-QN75eB0IH2ywSpRpNddCRfQIhmJYBCJ1x5Lb3IscKAL8bMnVAKnRg8dCoXbHzVLLH7P38N2Z3mtulB7W0J0FKw==",
"license": "MIT",
"optional": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@emnapi/wasi-threads": {
"version": "1.2.0",
"resolved": "https://registry.npmjs.org/@emnapi/wasi-threads/-/wasi-threads-1.2.0.tgz",
"integrity": "sha512-N10dEJNSsUx41Z6pZsXU8FjPjpBEplgH24sfkmITrBED1/U2Esum9F3lfLrMjKHHjmi557zQn7kR9R+XWXu5Rg==",
"license": "MIT",
"optional": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@hapi/hoek": {
"version": "9.3.0",
"resolved": "https://registry.npmjs.org/@hapi/hoek/-/hoek-9.3.0.tgz",
@@ -4782,18 +4631,6 @@
"react": ">=16"
}
},
"node_modules/@napi-rs/wasm-runtime": {
"version": "0.2.12",
"resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-0.2.12.tgz",
"integrity": "sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==",
"license": "MIT",
"optional": true,
"dependencies": {
"@emnapi/core": "^1.4.3",
"@emnapi/runtime": "^1.4.3",
"@tybys/wasm-util": "^0.10.0"
}
},
"node_modules/@noble/hashes": {
"version": "1.4.0",
"resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.4.0.tgz",
@@ -4806,259 +4643,6 @@
"url": "https://paulmillr.com/funding/"
}
},
"node_modules/@node-rs/jieba": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba/-/jieba-1.10.4.tgz",
"integrity": "sha512-GvDgi8MnBiyWd6tksojej8anIx18244NmIOc1ovEw8WKNUejcccLfyu8vj66LWSuoZuKILVtNsOy4jvg3aoxIw==",
"license": "MIT",
"engines": {
"node": ">= 10"
},
"funding": {
"type": "github",
"url": "https://github.com/sponsors/Brooooooklyn"
},
"optionalDependencies": {
"@node-rs/jieba-android-arm-eabi": "1.10.4",
"@node-rs/jieba-android-arm64": "1.10.4",
"@node-rs/jieba-darwin-arm64": "1.10.4",
"@node-rs/jieba-darwin-x64": "1.10.4",
"@node-rs/jieba-freebsd-x64": "1.10.4",
"@node-rs/jieba-linux-arm-gnueabihf": "1.10.4",
"@node-rs/jieba-linux-arm64-gnu": "1.10.4",
"@node-rs/jieba-linux-arm64-musl": "1.10.4",
"@node-rs/jieba-linux-x64-gnu": "1.10.4",
"@node-rs/jieba-linux-x64-musl": "1.10.4",
"@node-rs/jieba-wasm32-wasi": "1.10.4",
"@node-rs/jieba-win32-arm64-msvc": "1.10.4",
"@node-rs/jieba-win32-ia32-msvc": "1.10.4",
"@node-rs/jieba-win32-x64-msvc": "1.10.4"
}
},
"node_modules/@node-rs/jieba-android-arm-eabi": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-android-arm-eabi/-/jieba-android-arm-eabi-1.10.4.tgz",
"integrity": "sha512-MhyvW5N3Fwcp385d0rxbCWH42kqDBatQTyP8XbnYbju2+0BO/eTeCCLYj7Agws4pwxn2LtdldXRSKavT7WdzNA==",
"cpu": [
"arm"
],
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-android-arm64": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-android-arm64/-/jieba-android-arm64-1.10.4.tgz",
"integrity": "sha512-XyDwq5+rQ+Tk55A+FGi6PtJbzf974oqnpyCcCPzwU3QVXJCa2Rr4Lci+fx8oOpU4plT3GuD+chXMYLsXipMgJA==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"android"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-darwin-arm64": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-darwin-arm64/-/jieba-darwin-arm64-1.10.4.tgz",
"integrity": "sha512-G++RYEJ2jo0rxF9626KUy90wp06TRUjAsvY/BrIzEOX/ingQYV/HjwQzNPRR1P1o32a6/U8RGo7zEBhfdybL6w==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-darwin-x64": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-darwin-x64/-/jieba-darwin-x64-1.10.4.tgz",
"integrity": "sha512-MmDNeOb2TXIZCPyWCi2upQnZpPjAxw5ZGEj6R8kNsPXVFALHIKMa6ZZ15LCOkSTsKXVC17j2t4h+hSuyYb6qfQ==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"darwin"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-freebsd-x64": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-freebsd-x64/-/jieba-freebsd-x64-1.10.4.tgz",
"integrity": "sha512-/x7aVQ8nqUWhpXU92RZqd333cq639i/olNpd9Z5hdlyyV5/B65LLy+Je2B2bfs62PVVm5QXRpeBcZqaHelp/bg==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"freebsd"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-linux-arm-gnueabihf": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-linux-arm-gnueabihf/-/jieba-linux-arm-gnueabihf-1.10.4.tgz",
"integrity": "sha512-crd2M35oJBRLkoESs0O6QO3BBbhpv+tqXuKsqhIG94B1d02RVxtRIvSDwO33QurxqSdvN9IeSnVpHbDGkuXm3g==",
"cpu": [
"arm"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-linux-arm64-gnu": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-linux-arm64-gnu/-/jieba-linux-arm64-gnu-1.10.4.tgz",
"integrity": "sha512-omIzNX1psUzPcsdnUhGU6oHeOaTCuCjUgOA/v/DGkvWC1jLcnfXe4vdYbtXMh4XOCuIgS1UCcvZEc8vQLXFbXQ==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-linux-arm64-musl": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-linux-arm64-musl/-/jieba-linux-arm64-musl-1.10.4.tgz",
"integrity": "sha512-Y/tiJ1+HeS5nnmLbZOE+66LbsPOHZ/PUckAYVeLlQfpygLEpLYdlh0aPpS5uiaWMjAXYZYdFkpZHhxDmSLpwpw==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-linux-x64-gnu": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-linux-x64-gnu/-/jieba-linux-x64-gnu-1.10.4.tgz",
"integrity": "sha512-WZO8ykRJpWGE9MHuZpy1lu3nJluPoeB+fIJJn5CWZ9YTVhNDWoCF4i/7nxz1ntulINYGQ8VVuCU9LD86Mek97g==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-linux-x64-musl": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-linux-x64-musl/-/jieba-linux-x64-musl-1.10.4.tgz",
"integrity": "sha512-uBBD4S1rGKcgCyAk6VCKatEVQb6EDD5I40v/DxODi5CuZVCANi9m5oee/MQbAoaX7RydA2f0OSCE9/tcwXEwUg==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"linux"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-wasm32-wasi": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-wasm32-wasi/-/jieba-wasm32-wasi-1.10.4.tgz",
"integrity": "sha512-Y2umiKHjuIJy0uulNDz9SDYHdfq5Hmy7jY5nORO99B4pySKkcrMjpeVrmWXJLIsEKLJwcCXHxz8tjwU5/uhz0A==",
"cpu": [
"wasm32"
],
"license": "MIT",
"optional": true,
"dependencies": {
"@napi-rs/wasm-runtime": "^0.2.3"
},
"engines": {
"node": ">=14.0.0"
}
},
"node_modules/@node-rs/jieba-win32-arm64-msvc": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-win32-arm64-msvc/-/jieba-win32-arm64-msvc-1.10.4.tgz",
"integrity": "sha512-nwMtViFm4hjqhz1it/juQnxpXgqlGltCuWJ02bw70YUDMDlbyTy3grCJPpQQpueeETcALUnTxda8pZuVrLRcBA==",
"cpu": [
"arm64"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-win32-ia32-msvc": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-win32-ia32-msvc/-/jieba-win32-ia32-msvc-1.10.4.tgz",
"integrity": "sha512-DCAvLx7Z+W4z5oKS+7vUowAJr0uw9JBw8x1Y23Xs/xMA4Em+OOSiaF5/tCJqZUCJ8uC4QeImmgDFiBqGNwxlyA==",
"cpu": [
"ia32"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@node-rs/jieba-win32-x64-msvc": {
"version": "1.10.4",
"resolved": "https://registry.npmjs.org/@node-rs/jieba-win32-x64-msvc/-/jieba-win32-x64-msvc-1.10.4.tgz",
"integrity": "sha512-+sqemSfS1jjb+Tt7InNbNzrRh1Ua3vProVvC4BZRPg010/leCbGFFiQHpzcPRfpxAXZrzG5Y0YBTsPzN/I4yHQ==",
"cpu": [
"x64"
],
"license": "MIT",
"optional": true,
"os": [
"win32"
],
"engines": {
"node": ">= 10"
}
},
"node_modules/@nodelib/fs.scandir": {
"version": "2.1.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
@@ -5608,16 +5192,6 @@
"node": ">=14.16"
}
},
"node_modules/@tybys/wasm-util": {
"version": "0.10.1",
"resolved": "https://registry.npmjs.org/@tybys/wasm-util/-/wasm-util-0.10.1.tgz",
"integrity": "sha512-9tTaPJLSiejZKx+Bmog4uSubteqTvFrVrURwkmHixBo0G4seD0zUxp98E1DzUBJxLQ3NPwXrGKDiVjwx/DpPsg==",
"license": "MIT",
"optional": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/@types/body-parser": {
"version": "1.19.6",
"resolved": "https://registry.npmjs.org/@types/body-parser/-/body-parser-1.19.6.tgz",
@@ -7293,12 +6867,6 @@
"node": ">=10"
}
},
"node_modules/comlink": {
"version": "4.4.2",
"resolved": "https://registry.npmjs.org/comlink/-/comlink-4.4.2.tgz",
"integrity": "sha512-OxGdvBmJuNKSCMO4NTl1L47VRp6xn2wG4F/2hYzB6tiCb709otOxtEYCSvK80PtjODfXXZu8ds+Nw5kVCjqd2g==",
"license": "Apache-2.0"
},
"node_modules/comma-separated-tokens": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/comma-separated-tokens/-/comma-separated-tokens-2.0.3.tgz",
@@ -8503,31 +8071,6 @@
"node": ">= 0.8"
}
},
"node_modules/encoding-sniffer": {
"version": "0.2.1",
"resolved": "https://registry.npmjs.org/encoding-sniffer/-/encoding-sniffer-0.2.1.tgz",
"integrity": "sha512-5gvq20T6vfpekVtqrYQsSCFZ1wEg5+wW0/QaZMWkFr6BqD3NfKs0rLCx4rrVlSWJeZb5NBJgVLswK/w2MWU+Gw==",
"license": "MIT",
"dependencies": {
"iconv-lite": "^0.6.3",
"whatwg-encoding": "^3.1.1"
},
"funding": {
"url": "https://github.com/fb55/encoding-sniffer?sponsor=1"
}
},
"node_modules/encoding-sniffer/node_modules/iconv-lite": {
"version": "0.6.3",
"resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz",
"integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==",
"license": "MIT",
"dependencies": {
"safer-buffer": ">= 2.1.2 < 3.0.0"
},
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/enhanced-resolve": {
"version": "5.20.0",
"resolved": "https://registry.npmjs.org/enhanced-resolve/-/enhanced-resolve-5.20.0.tgz",
@@ -10242,12 +9785,6 @@
"node": ">=16.x"
}
},
"node_modules/immediate": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/immediate/-/immediate-3.3.0.tgz",
"integrity": "sha512-HR7EVodfFUdQCTIeySw+WDRFJlPcLOJbXfwwZ7Oom6tjsvZ3bOkCDJHehQC3nxJrv7+f9XecwazynjU8e4Vw3Q==",
"license": "MIT"
},
"node_modules/import-fresh": {
"version": "3.3.1",
"resolved": "https://registry.npmjs.org/import-fresh/-/import-fresh-3.3.1.tgz",
@@ -10834,15 +10371,6 @@
"node": ">=0.10.0"
}
},
"node_modules/klaw-sync": {
"version": "6.0.0",
"resolved": "https://registry.npmjs.org/klaw-sync/-/klaw-sync-6.0.0.tgz",
"integrity": "sha512-nIeuVSzdCCs6TDPTqI8w1Yre34sSq7AkZ4B3sfOBbI2CgVSB4Du4aLQijFU2+lhAFCwt9+42Hel6lQNIv6AntQ==",
"license": "MIT",
"dependencies": {
"graceful-fs": "^4.1.11"
}
},
"node_modules/kleur": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/kleur/-/kleur-3.0.3.tgz",
@@ -11022,24 +10550,6 @@
"yallist": "^3.0.2"
}
},
"node_modules/lunr": {
"version": "2.3.9",
"resolved": "https://registry.npmjs.org/lunr/-/lunr-2.3.9.tgz",
"integrity": "sha512-zTU3DaZaF3Rt9rhN3uBMGQD3dD2/vFQqnvZCDv4dl5iOzq2IZQqTxu90r4E5J+nP70J3ilqVCrbho2eWaeW8Ow==",
"license": "MIT"
},
"node_modules/lunr-languages": {
"version": "1.14.0",
"resolved": "https://registry.npmjs.org/lunr-languages/-/lunr-languages-1.14.0.tgz",
"integrity": "sha512-hWUAb2KqM3L7J5bcrngszzISY4BxrXn/Xhbb9TTCJYEGqlR1nG67/M14sp09+PTIRklobrn57IAxcdcO/ZFyNA==",
"license": "MPL-1.1"
},
"node_modules/mark.js": {
"version": "8.11.1",
"resolved": "https://registry.npmjs.org/mark.js/-/mark.js-8.11.1.tgz",
"integrity": "sha512-1I+1qpDt4idfgLQG+BNWmrqku+7/2bi5nLf4YwF8y8zXvmfiTBY3PV3ZibfrjBueCByROpuBjLLFCajqkgYoLQ==",
"license": "MIT"
},
"node_modules/markdown-extensions": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/markdown-extensions/-/markdown-extensions-2.0.0.tgz",
@@ -14000,18 +13510,6 @@
"url": "https://github.com/inikulin/parse5?sponsor=1"
}
},
"node_modules/parse5-parser-stream": {
"version": "7.1.2",
"resolved": "https://registry.npmjs.org/parse5-parser-stream/-/parse5-parser-stream-7.1.2.tgz",
"integrity": "sha512-JyeQc9iwFLn5TbvvqACIF/VXG6abODeB3Fwmv/TGdLk2LfbWkaySGY72at4+Ty7EkPZj854u4CrICqNk2qIbow==",
"license": "MIT",
"dependencies": {
"parse5": "^7.0.0"
},
"funding": {
"url": "https://github.com/inikulin/parse5?sponsor=1"
}
},
"node_modules/parse5/node_modules/entities": {
"version": "6.0.1",
"resolved": "https://registry.npmjs.org/entities/-/entities-6.0.1.tgz",
@@ -17820,15 +17318,6 @@
"node": ">=14.17"
}
},
"node_modules/undici": {
"version": "7.23.0",
"resolved": "https://registry.npmjs.org/undici/-/undici-7.23.0.tgz",
"integrity": "sha512-HVMxHKZKi+eL2mrUZDzDkKW3XvCjynhbtpSq20xQp4ePDFeSFuAfnvM0GIwZIv8fiKHjXFQ5WjxhCt15KRNj+g==",
"license": "MIT",
"engines": {
"node": ">=20.18.1"
}
},
"node_modules/undici-types": {
"version": "7.18.2",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.18.2.tgz",
@@ -18748,40 +18237,6 @@
"node": ">=0.8.0"
}
},
"node_modules/whatwg-encoding": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/whatwg-encoding/-/whatwg-encoding-3.1.1.tgz",
"integrity": "sha512-6qN4hJdMwfYBtE3YBTTHhoeuUrDBPZmbQaxWAqSALV/MeEnR5z1xd8UKud2RAkFoPkmB+hli1TZSnyi84xz1vQ==",
"deprecated": "Use @exodus/bytes instead for a more spec-conformant and faster implementation",
"license": "MIT",
"dependencies": {
"iconv-lite": "0.6.3"
},
"engines": {
"node": ">=18"
}
},
"node_modules/whatwg-encoding/node_modules/iconv-lite": {
"version": "0.6.3",
"resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.6.3.tgz",
"integrity": "sha512-4fCk79wshMdzMp2rH06qWrJE4iolqLhCUH+OiuIgU++RB0+94NlDL81atO7GX55uUKueo0txHNtvEyI6D7WdMw==",
"license": "MIT",
"dependencies": {
"safer-buffer": ">= 2.1.2 < 3.0.0"
},
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/whatwg-mimetype": {
"version": "4.0.0",
"resolved": "https://registry.npmjs.org/whatwg-mimetype/-/whatwg-mimetype-4.0.0.tgz",
"integrity": "sha512-QaKxh0eNIi2mE9p2vEdzfagOKHCcj1pJ56EEHGQOVxp8r9/iszLUUV7v89x9O1p/T+NlTM5W7jW6+cz4Fq1YVg==",
"license": "MIT",
"engines": {
"node": ">=18"
}
},
"node_modules/which": {
"version": "2.0.2",
"resolved": "https://registry.npmjs.org/which/-/which-2.0.2.tgz",

View File

@@ -17,7 +17,6 @@
"dependencies": {
"@docusaurus/core": "3.9.2",
"@docusaurus/preset-classic": "3.9.2",
"@easyops-cn/docusaurus-search-local": "^0.55.1",
"@mdx-js/react": "^3.0.0",
"clsx": "^2.0.0",
"prism-react-renderer": "^2.3.0",