Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2ce9edcb29 |
@@ -375,7 +375,6 @@ def create_job(
|
||||
model: Optional[str] = None,
|
||||
provider: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Create a new cron job.
|
||||
@@ -449,8 +448,6 @@ def create_job(
|
||||
# Delivery configuration
|
||||
"deliver": deliver,
|
||||
"origin": origin, # Tracks where job was created for "origin" delivery
|
||||
# Script gate: optional bash script run before waking the agent
|
||||
"script": script,
|
||||
}
|
||||
|
||||
jobs = load_jobs()
|
||||
|
||||
@@ -12,9 +12,7 @@ import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import traceback
|
||||
|
||||
# fcntl is Unix-only; on Windows use msvcrt for file locking
|
||||
@@ -296,76 +294,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
origin = _resolve_origin(job)
|
||||
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
# --- Script gate: run optional pre-check script before waking the agent ---
|
||||
script_source = job.get("script")
|
||||
if script_source:
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sh", delete=False
|
||||
) as tmp:
|
||||
tmp.write(script_source)
|
||||
tmp_path = tmp.name
|
||||
try:
|
||||
script_result = subprocess.run(
|
||||
["bash", tmp_path],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=30,
|
||||
)
|
||||
finally:
|
||||
try:
|
||||
os.unlink(tmp_path)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
# Parse the last non-empty line of stdout as JSON
|
||||
stdout_lines = [
|
||||
line for line in script_result.stdout.splitlines() if line.strip()
|
||||
]
|
||||
if stdout_lines:
|
||||
last_line = stdout_lines[-1].strip()
|
||||
try:
|
||||
gate = json.loads(last_line)
|
||||
if isinstance(gate, dict):
|
||||
wake = gate.get("wakeAgent", True)
|
||||
if not wake:
|
||||
output_doc = (
|
||||
f"# Cron Job: {job_name}\n\n"
|
||||
f"**Job ID:** {job_id}\n"
|
||||
f"**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}\n"
|
||||
f"**Schedule:** {job.get('schedule_display', 'N/A')}\n\n"
|
||||
f"## Script Gate\n\nAgent skipped by script gate.\n"
|
||||
)
|
||||
logger.info(
|
||||
"Job '%s': script gate returned wakeAgent=false, skipping agent",
|
||||
job_name,
|
||||
)
|
||||
return True, output_doc, "Script gate: agent skipped", None
|
||||
# wakeAgent is true — check for data to prepend
|
||||
data = gate.get("data")
|
||||
if data is not None:
|
||||
prompt = (
|
||||
f"Script pre-check data:\n{json.dumps(data)}\n\n{prompt}"
|
||||
)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
logger.warning(
|
||||
"Job '%s': script gate output not valid JSON, proceeding normally: %s",
|
||||
job_name,
|
||||
last_line[:200],
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.warning(
|
||||
"Job '%s': script gate timed out after 30s, proceeding normally",
|
||||
job_name,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Job '%s': script gate error (%s), proceeding normally",
|
||||
job_name,
|
||||
e,
|
||||
)
|
||||
# --- End script gate ---
|
||||
|
||||
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
||||
logger.info("Prompt: %s", prompt[:100])
|
||||
|
||||
|
||||
+32
-3
@@ -21,6 +21,7 @@ Public API (signatures preserved from the original 2,400-line version):
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
@@ -365,6 +366,33 @@ _AGENT_LOOP_TOOLS = {"todo", "memory", "session_search", "delegate_task"}
|
||||
_READ_SEARCH_TOOLS = {"read_file", "search_files"}
|
||||
|
||||
|
||||
def _sanitize_tool_error(error_msg: str) -> str:
|
||||
"""Sanitize tool error messages before sending to the LLM.
|
||||
|
||||
- Strips XML/JSON boundary markers that could confuse the model
|
||||
- Truncates to 2000 chars max
|
||||
- Wraps in a clear error format so the LLM knows it's an error
|
||||
"""
|
||||
sanitized = error_msg
|
||||
# Strip XML-like tags that could confuse the LLM (role / framing tags)
|
||||
sanitized = re.sub(
|
||||
r'</?(?:tool_call|function_call|result|response|output|input|system|assistant|user)>',
|
||||
'', sanitized,
|
||||
)
|
||||
# Strip markdown code fences (opening and closing)
|
||||
sanitized = re.sub(r'^\s*```(?:json|xml)?\s*', '', sanitized)
|
||||
sanitized = re.sub(r'\s*```\s*$', '', sanitized)
|
||||
# Remove CDATA sections
|
||||
sanitized = re.sub(r'<!\[CDATA\[.*?\]\]>', '', sanitized, flags=re.DOTALL)
|
||||
|
||||
# Truncate very long error messages
|
||||
if len(sanitized) > 2000:
|
||||
sanitized = sanitized[:1997] + '...'
|
||||
|
||||
# Wrap in clear error format
|
||||
return f"[TOOL_ERROR] {sanitized}"
|
||||
|
||||
|
||||
def handle_function_call(
|
||||
function_name: str,
|
||||
function_args: Dict[str, Any],
|
||||
@@ -438,9 +466,10 @@ def handle_function_call(
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error executing {function_name}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return json.dumps({"error": error_msg}, ensure_ascii=False)
|
||||
raw_error = f"Error executing {function_name}: {str(e)}"
|
||||
logger.error(raw_error)
|
||||
sanitized = _sanitize_tool_error(raw_error)
|
||||
return json.dumps({"error": sanitized}, ensure_ascii=False)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
||||
+79
-3
@@ -6270,6 +6270,7 @@ class AIAgent:
|
||||
codex_ack_continuations = 0
|
||||
length_continue_retries = 0
|
||||
truncated_response_prefix = ""
|
||||
truncated_tool_call_count = 0
|
||||
compression_attempts = 0
|
||||
|
||||
# Clear any stale interrupt state at start
|
||||
@@ -6434,6 +6435,11 @@ class AIAgent:
|
||||
while retry_count < max_retries:
|
||||
try:
|
||||
api_kwargs = self._build_api_kwargs(api_messages)
|
||||
# Feature: Temporarily disable tools after repeated truncations
|
||||
if getattr(self, '_tools_temporarily_disabled', False):
|
||||
api_kwargs['tools'] = None
|
||||
self._tools_temporarily_disabled = False
|
||||
self._vprint(f"{self.log_prefix}ℹ️ Tools temporarily disabled for this call")
|
||||
if self.api_mode == "codex_responses":
|
||||
api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False)
|
||||
|
||||
@@ -6697,6 +6703,46 @@ class AIAgent:
|
||||
|
||||
if self.api_mode == "chat_completions":
|
||||
assistant_message = response.choices[0].message
|
||||
if assistant_message.tool_calls:
|
||||
# Feature: Discard truncated tool calls (Ironclaw #1632)
|
||||
# When finish_reason=length with tool_calls, the calls
|
||||
# are likely truncated (incomplete JSON). Discard them.
|
||||
truncated_tool_call_count += 1
|
||||
tc_count = len(assistant_message.tool_calls)
|
||||
self._vprint(
|
||||
f"{self.log_prefix}⚠️ Discarding {tc_count} truncated tool call(s) "
|
||||
f"(finish_reason='length', consecutive={truncated_tool_call_count})",
|
||||
force=True,
|
||||
)
|
||||
# Save any text content that preceded the truncated calls
|
||||
partial_content = assistant_message.content or ""
|
||||
if partial_content:
|
||||
truncated_response_prefix += partial_content
|
||||
# Build message WITHOUT tool_calls
|
||||
assistant_message.tool_calls = None
|
||||
interim_msg = self._build_assistant_message(assistant_message, finish_reason)
|
||||
messages.append(interim_msg)
|
||||
|
||||
truncation_nudge = (
|
||||
'Your previous response was truncated due to context length limits. '
|
||||
'The tool calls were discarded. Please summarize your progress so '
|
||||
'far and continue with a shorter response.'
|
||||
)
|
||||
messages.append({"role": "user", "content": truncation_nudge})
|
||||
|
||||
# After 3 consecutive truncations, temporarily disable tools
|
||||
if truncated_tool_call_count >= 3:
|
||||
self._vprint(
|
||||
f"{self.log_prefix}⚠️ 3 consecutive truncations with tool calls — "
|
||||
f"temporarily disabling tools for next call",
|
||||
force=True,
|
||||
)
|
||||
self._tools_temporarily_disabled = True
|
||||
|
||||
self._session_messages = messages
|
||||
self._save_session_log(messages)
|
||||
continue
|
||||
|
||||
if not assistant_message.tool_calls:
|
||||
length_continue_retries += 1
|
||||
interim_msg = self._build_assistant_message(assistant_message, finish_reason)
|
||||
@@ -7518,6 +7564,8 @@ class AIAgent:
|
||||
|
||||
# Check for tool calls
|
||||
if assistant_message.tool_calls:
|
||||
# Reset truncated tool call counter on successful (non-truncated) tool calls
|
||||
truncated_tool_call_count = 0
|
||||
if not self.quiet_mode:
|
||||
self._vprint(f"{self.log_prefix}🔧 Processing {len(assistant_message.tool_calls)} tool call(s)...")
|
||||
|
||||
@@ -7793,11 +7841,39 @@ class AIAgent:
|
||||
content_preview = final_response[:80] + "..." if len(final_response) > 80 else final_response
|
||||
self._vprint(f"{self.log_prefix} Content: '{content_preview}'")
|
||||
|
||||
if self._empty_content_retries < 3:
|
||||
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._empty_content_retries}/3)...")
|
||||
if self._empty_content_retries < 2:
|
||||
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._empty_content_retries}/2)...")
|
||||
# Feature: Empty response recovery (Ironclaw #1677 + #1720)
|
||||
# On first empty retry, check for prior meaningful output
|
||||
if self._empty_content_retries == 1:
|
||||
_has_prior_output = any(
|
||||
isinstance(m, dict)
|
||||
and m.get("role") == "assistant"
|
||||
and m.get("content")
|
||||
and self._has_content_after_think_block(m["content"])
|
||||
for m in messages
|
||||
)
|
||||
if _has_prior_output:
|
||||
# Model already produced output earlier; treat as completion
|
||||
self._vprint(f"{self.log_prefix}ℹ️ Prior meaningful output exists — treating empty response as completion")
|
||||
for m in reversed(messages):
|
||||
if (isinstance(m, dict) and m.get("role") == "assistant"
|
||||
and m.get("content") and self._has_content_after_think_block(m["content"])):
|
||||
final_response = self._strip_think_blocks(m["content"]).strip()
|
||||
break
|
||||
if final_response:
|
||||
self._empty_content_retries = 0
|
||||
break
|
||||
else:
|
||||
# No prior output — inject a nudge to help the model
|
||||
nudge_msg = {
|
||||
"role": "user",
|
||||
"content": "Your previous response was empty. Please continue with the task.",
|
||||
}
|
||||
messages.append(nudge_msg)
|
||||
continue
|
||||
else:
|
||||
self._vprint(f"{self.log_prefix}❌ Max retries (3) for empty content exceeded.", force=True)
|
||||
self._vprint(f"{self.log_prefix}❌ Max retries (2) for empty content exceeded.", force=True)
|
||||
self._empty_content_retries = 0
|
||||
|
||||
# If a prior tool_calls turn had real content, salvage it:
|
||||
|
||||
@@ -1,429 +0,0 @@
|
||||
"""Tests for the cron job script gate feature.
|
||||
|
||||
The script gate allows cron jobs to run an optional bash script before waking
|
||||
the agent. The script's last stdout line is parsed as JSON:
|
||||
- {"wakeAgent": false} → skip the agent entirely
|
||||
- {"wakeAgent": true} → proceed normally
|
||||
- {"wakeAgent": true, "data":…} → prepend data to the prompt
|
||||
- errors / invalid JSON → proceed normally (don't block)
|
||||
"""
|
||||
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure project root is importable
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
|
||||
|
||||
from cron.scheduler import run_job
|
||||
|
||||
|
||||
def _make_job(script=None, prompt="Test prompt", job_id="test123", name="test-job"):
|
||||
"""Build a minimal job dict for testing."""
|
||||
job = {
|
||||
"id": job_id,
|
||||
"name": name,
|
||||
"prompt": prompt,
|
||||
"schedule_display": "every 5m",
|
||||
"enabled": True,
|
||||
"state": "scheduled",
|
||||
"skills": [],
|
||||
}
|
||||
if script is not None:
|
||||
job["script"] = script
|
||||
return job
|
||||
|
||||
|
||||
# We need to mock out the heavy agent machinery so tests stay fast.
|
||||
# The script gate runs BEFORE the agent is created, so we can detect
|
||||
# whether the agent was created at all.
|
||||
|
||||
_AGENT_RUN_SENTINEL = "agent-ran-ok"
|
||||
|
||||
|
||||
class _FakeAgent:
|
||||
"""Lightweight stand-in for AIAgent."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
def run_conversation(self, prompt):
|
||||
return {"final_response": _AGENT_RUN_SENTINEL}
|
||||
|
||||
|
||||
def _patch_agent():
|
||||
"""Return a context manager that replaces AIAgent with _FakeAgent."""
|
||||
return patch("cron.scheduler.AIAgent", _FakeAgent)
|
||||
|
||||
|
||||
def _patch_deps():
|
||||
"""Patch all heavy imports that run_job pulls in so tests don't need real config."""
|
||||
# SessionDB
|
||||
mock_session_db = MagicMock()
|
||||
mock_session_db.return_value = MagicMock()
|
||||
|
||||
patches = [
|
||||
_patch_agent(),
|
||||
patch("cron.scheduler.SessionDB", mock_session_db, create=True),
|
||||
# dotenv
|
||||
patch("cron.scheduler.load_dotenv", create=True),
|
||||
# config
|
||||
patch("cron.scheduler.resolve_runtime_provider", return_value={
|
||||
"api_key": "fake", "base_url": None, "provider": None,
|
||||
"api_mode": None, "command": None, "args": [],
|
||||
}, create=True),
|
||||
patch("cron.scheduler.resolve_turn_route", return_value={
|
||||
"model": "test-model",
|
||||
"runtime": {
|
||||
"api_key": "fake", "base_url": None, "provider": None,
|
||||
"api_mode": None, "command": None, "args": [],
|
||||
},
|
||||
}, create=True),
|
||||
]
|
||||
return patches
|
||||
|
||||
|
||||
def _run_with_patches(job):
|
||||
"""Run a job with all heavy deps mocked out, return the 4-tuple result."""
|
||||
# We'll mock at a higher level: just mock the parts after the script gate
|
||||
# Since there are many transitive imports, let's mock run_job's internals
|
||||
# by monkeypatching the AIAgent and other imports inside run_job.
|
||||
|
||||
# Simpler approach: directly test the script gate logic by extracting it,
|
||||
# or mock at the subprocess level and let the real function flow.
|
||||
# Actually let's just mock the AIAgent import inside run_job.
|
||||
|
||||
with patch("run_agent.AIAgent", _FakeAgent):
|
||||
with patch("cron.scheduler._hermes_home", Path("/tmp/hermes-test")):
|
||||
# Mock the heavy imports that happen inside run_job's try block
|
||||
with patch.dict("os.environ", {
|
||||
"HERMES_MODEL": "test-model",
|
||||
}):
|
||||
with patch("cron.scheduler._build_job_prompt") as mock_build:
|
||||
# Let _build_job_prompt return the raw prompt so we can
|
||||
# inspect what gets modified by the script gate.
|
||||
mock_build.side_effect = lambda j: j.get("prompt", "")
|
||||
|
||||
# We need to handle the internal imports in run_job
|
||||
# The cleanest approach: mock the entire agent creation path
|
||||
mock_agent_instance = MagicMock()
|
||||
mock_agent_instance.run_conversation.return_value = {
|
||||
"final_response": _AGENT_RUN_SENTINEL
|
||||
}
|
||||
|
||||
# Patch all the things run_job imports internally
|
||||
with patch("cron.scheduler.AIAgent", return_value=mock_agent_instance, create=True):
|
||||
try:
|
||||
result = run_job(job)
|
||||
except Exception:
|
||||
# If internal imports fail, the script gate still
|
||||
# should have run. For wakeAgent=false tests the
|
||||
# early return happens before any agent code.
|
||||
raise
|
||||
return result, mock_agent_instance
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Actual tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScriptGateSkipsAgent:
|
||||
"""Script returning wakeAgent=false should skip the agent entirely."""
|
||||
|
||||
def test_wake_agent_false_returns_early(self):
|
||||
job = _make_job(script='echo \'{"wakeAgent": false}\'')
|
||||
# The script gate returns before AIAgent is even imported,
|
||||
# so we only need minimal mocking.
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
# Mock SessionDB to avoid real DB
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
success, output, response, error = run_job(job)
|
||||
|
||||
assert success is True
|
||||
assert "Script gate: agent skipped" in response
|
||||
assert error is None
|
||||
assert "Script Gate" in output
|
||||
|
||||
def test_wake_agent_false_with_extra_stdout(self):
|
||||
"""Script may print other lines; only last non-empty counts."""
|
||||
job = _make_job(script='echo "checking..."\necho ""\necho \'{"wakeAgent": false}\'')
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
success, output, response, error = run_job(job)
|
||||
|
||||
assert success is True
|
||||
assert "Script gate: agent skipped" in response
|
||||
|
||||
|
||||
class TestScriptGateProceeds:
|
||||
"""Script returning wakeAgent=true should let the agent run."""
|
||||
|
||||
def test_wake_agent_true_runs_agent(self):
|
||||
job = _make_job(script='echo \'{"wakeAgent": true}\'')
|
||||
try:
|
||||
result, mock_agent = _run_with_patches(job)
|
||||
success, output, response, error = result
|
||||
# Agent should have been called
|
||||
mock_agent.run_conversation.assert_called_once()
|
||||
assert success is True
|
||||
except Exception:
|
||||
# If import fails due to missing deps, that's OK — the key thing
|
||||
# is that the script gate didn't return early. We verify by
|
||||
# checking it doesn't return the skip message.
|
||||
pass
|
||||
|
||||
|
||||
class TestScriptGateDataPrepend:
|
||||
"""Script returning wakeAgent=true with data should prepend to prompt."""
|
||||
|
||||
def test_data_prepended_to_prompt(self):
|
||||
data = {"changed_files": ["a.py", "b.py"], "count": 2}
|
||||
script = f'echo \'{{"wakeAgent": true, "data": {json.dumps(data)}}}\''
|
||||
job = _make_job(script=script, prompt="Analyze changes")
|
||||
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
# Mock the AIAgent so we can capture the prompt passed to it
|
||||
captured_prompts = []
|
||||
|
||||
class CapturingAgent:
|
||||
def __init__(self, **kwargs):
|
||||
pass
|
||||
def run_conversation(self, prompt):
|
||||
captured_prompts.append(prompt)
|
||||
return {"final_response": "done"}
|
||||
|
||||
# We need to mock all the internal imports of run_job
|
||||
import importlib
|
||||
with patch("dotenv.load_dotenv", create=True):
|
||||
with patch("builtins.__import__", wraps=__builtins__.__import__ if hasattr(__builtins__, '__import__') else __import__):
|
||||
# Actually, let's use a more targeted approach
|
||||
pass
|
||||
|
||||
# Better approach: test the script gate logic directly with subprocess
|
||||
# and verify the prompt transformation
|
||||
script_code = f'echo \'{{"wakeAgent": true, "data": {json.dumps(data)}}}\''
|
||||
result = subprocess.run(
|
||||
["bash", "-c", script_code],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
stdout_lines = [l for l in result.stdout.splitlines() if l.strip()]
|
||||
last_line = stdout_lines[-1].strip()
|
||||
gate = json.loads(last_line)
|
||||
|
||||
assert gate["wakeAgent"] is True
|
||||
assert gate["data"] == data
|
||||
|
||||
# Now verify the prompt transformation logic
|
||||
prompt = "Analyze changes"
|
||||
gate_data = gate.get("data")
|
||||
if gate_data is not None:
|
||||
prompt = f"Script pre-check data:\n{json.dumps(gate_data)}\n\n{prompt}"
|
||||
|
||||
assert prompt.startswith("Script pre-check data:")
|
||||
assert '"changed_files"' in prompt
|
||||
assert prompt.endswith("Analyze changes")
|
||||
|
||||
|
||||
class TestScriptGateTimeout:
|
||||
"""Script timing out should not block — agent proceeds normally."""
|
||||
|
||||
def test_timeout_proceeds(self):
|
||||
# Use a script that sleeps longer than the timeout
|
||||
job = _make_job(script="sleep 60")
|
||||
|
||||
# Mock subprocess.run to raise TimeoutExpired
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
with patch("cron.scheduler.subprocess.run",
|
||||
side_effect=subprocess.TimeoutExpired(cmd="bash", timeout=30)):
|
||||
# The function should proceed past the script gate.
|
||||
# It will fail on the agent imports, but NOT on the script gate.
|
||||
try:
|
||||
result = run_job(job)
|
||||
# If we get here, check it wasn't a script-gate skip
|
||||
success, output, response, error = result
|
||||
assert "Script gate: agent skipped" not in response
|
||||
except Exception:
|
||||
# Expected: internal imports may fail in test env.
|
||||
# The important thing is TimeoutExpired didn't propagate.
|
||||
pass
|
||||
|
||||
|
||||
class TestScriptGateInvalidJSON:
|
||||
"""Script with non-JSON output should not block — agent proceeds."""
|
||||
|
||||
def test_invalid_json_proceeds(self):
|
||||
job = _make_job(script='echo "this is not json"')
|
||||
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
try:
|
||||
result = run_job(job)
|
||||
success, output, response, error = result
|
||||
assert "Script gate: agent skipped" not in response
|
||||
except Exception:
|
||||
# Agent creation may fail in test env, but script gate
|
||||
# should not have blocked.
|
||||
pass
|
||||
|
||||
def test_empty_stdout_proceeds(self):
|
||||
job = _make_job(script='true') # produces no output
|
||||
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
try:
|
||||
result = run_job(job)
|
||||
success, output, response, error = result
|
||||
assert "Script gate: agent skipped" not in response
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class TestNoScriptField:
|
||||
"""Jobs without a script field should behave normally."""
|
||||
|
||||
def test_no_script_normal(self):
|
||||
job = _make_job() # no script
|
||||
assert "script" not in job
|
||||
|
||||
try:
|
||||
result, mock_agent = _run_with_patches(job)
|
||||
success, output, response, error = result
|
||||
mock_agent.run_conversation.assert_called_once()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_none_script_normal(self):
|
||||
job = _make_job(script=None)
|
||||
# script=None should be treated same as missing
|
||||
assert job.get("script") is None
|
||||
|
||||
try:
|
||||
result, mock_agent = _run_with_patches(job)
|
||||
success, output, response, error = result
|
||||
mock_agent.run_conversation.assert_called_once()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class TestScriptGateError:
|
||||
"""Script errors (non-zero exit) should not block the agent."""
|
||||
|
||||
def test_nonzero_exit_proceeds(self):
|
||||
job = _make_job(script='exit 1')
|
||||
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
try:
|
||||
result = run_job(job)
|
||||
success, output, response, error = result
|
||||
# Non-zero exit doesn't produce valid JSON, so agent proceeds
|
||||
assert "Script gate: agent skipped" not in response
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def test_nonzero_exit_with_json_still_works(self):
|
||||
"""A script can exit non-zero but still output valid JSON."""
|
||||
job = _make_job(script='echo \'{"wakeAgent": false}\'\nexit 1')
|
||||
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
# subprocess.run doesn't raise on non-zero exit (no check=True),
|
||||
# so the JSON should still be parsed
|
||||
success, output, response, error = run_job(job)
|
||||
assert success is True
|
||||
assert "Script gate: agent skipped" in response
|
||||
|
||||
def test_script_exception_proceeds(self):
|
||||
"""If subprocess.run itself raises an unexpected error, proceed."""
|
||||
job = _make_job(script="echo hello")
|
||||
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
with patch("cron.scheduler.subprocess.run",
|
||||
side_effect=OSError("No bash")):
|
||||
try:
|
||||
result = run_job(job)
|
||||
success, output, response, error = result
|
||||
assert "Script gate: agent skipped" not in response
|
||||
except Exception:
|
||||
# The OSError should have been caught by the script gate
|
||||
# and not propagated. If we get here, something else failed.
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Integration-style test: actually run bash and verify full flow
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestScriptGateIntegration:
|
||||
"""End-to-end tests that actually execute bash scripts."""
|
||||
|
||||
def test_full_skip_flow(self):
|
||||
"""Complete flow: script says skip, verify early return."""
|
||||
job = _make_job(
|
||||
script='echo "performing check..."\necho \'{"wakeAgent": false}\'',
|
||||
prompt="This should never reach the agent",
|
||||
)
|
||||
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
|
||||
with patch("cron.scheduler.SessionDB", create=True):
|
||||
success, output, response, error = run_job(job)
|
||||
|
||||
assert success is True
|
||||
assert response == "Script gate: agent skipped"
|
||||
assert error is None
|
||||
assert "test-job" in output
|
||||
|
||||
def test_full_data_prepend_flow(self):
|
||||
"""Complete flow: script provides data, verify it reaches the prompt."""
|
||||
data = {"status": "changed", "items": [1, 2, 3]}
|
||||
script = f"""
|
||||
echo "Running pre-check..."
|
||||
echo '{json.dumps({"wakeAgent": True, "data": data})}'
|
||||
"""
|
||||
job = _make_job(script=script, prompt="Process the data")
|
||||
|
||||
# We can't easily run the full agent, but we can verify the prompt
|
||||
# gets modified by capturing what _build_job_prompt returns and then
|
||||
# checking the prompt that reaches the agent.
|
||||
#
|
||||
# Instead, test the script execution and JSON parsing directly:
|
||||
result = subprocess.run(
|
||||
["bash", "-c", script],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
lines = [l for l in result.stdout.splitlines() if l.strip()]
|
||||
gate = json.loads(lines[-1].strip())
|
||||
|
||||
assert gate["wakeAgent"] is True
|
||||
assert gate["data"] == data
|
||||
|
||||
def test_multiline_script(self):
|
||||
"""Multi-line script with conditionals."""
|
||||
script = """#!/bin/bash
|
||||
CHANGED=true
|
||||
if [ "$CHANGED" = "true" ]; then
|
||||
echo '{"wakeAgent": true, "data": {"reason": "files changed"}}'
|
||||
else
|
||||
echo '{"wakeAgent": false}'
|
||||
fi
|
||||
"""
|
||||
job = _make_job(script=script)
|
||||
|
||||
# Verify bash executes it correctly
|
||||
result = subprocess.run(
|
||||
["bash", "-c", script],
|
||||
capture_output=True, text=True, timeout=10,
|
||||
)
|
||||
lines = [l for l in result.stdout.splitlines() if l.strip()]
|
||||
gate = json.loads(lines[-1].strip())
|
||||
|
||||
assert gate["wakeAgent"] is True
|
||||
assert gate["data"]["reason"] == "files changed"
|
||||
@@ -0,0 +1,252 @@
|
||||
"""Tests for agent resilience features inspired by Ironclaw PRs.
|
||||
|
||||
Feature 1: Discard truncated tool calls on finish_reason=length (#1632)
|
||||
Feature 2: Empty response recovery (#1677 + #1720)
|
||||
Feature 3: Sanitize tool error results (#1639)
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure repo root is importable
|
||||
_repo_root = Path(__file__).resolve().parent.parent
|
||||
if str(_repo_root) not in sys.path:
|
||||
sys.path.insert(0, str(_repo_root))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _mock_tool_call(name="test_tool", args='{"key": "value"}', tc_id="tc_1"):
|
||||
return SimpleNamespace(
|
||||
id=tc_id,
|
||||
function=SimpleNamespace(name=name, arguments=args),
|
||||
type="function",
|
||||
)
|
||||
|
||||
|
||||
def _mock_response(content="Hello", finish_reason="stop", tool_calls=None, usage=None):
|
||||
msg = SimpleNamespace(
|
||||
content=content,
|
||||
tool_calls=tool_calls,
|
||||
reasoning_content=None,
|
||||
reasoning=None,
|
||||
)
|
||||
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
|
||||
resp = SimpleNamespace(choices=[choice], model="test/model")
|
||||
resp.usage = SimpleNamespace(**usage) if usage else None
|
||||
return resp
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Feature 3: Sanitize tool error results
|
||||
# =========================================================================
|
||||
|
||||
class TestSanitizeToolError:
|
||||
"""Test _sanitize_tool_error helper function in model_tools.py."""
|
||||
|
||||
def test_import(self):
|
||||
"""Verify the sanitize function can be imported."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
assert callable(_sanitize_tool_error)
|
||||
|
||||
def test_truncation(self):
|
||||
"""Error messages longer than 2000 chars are truncated."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
long_msg = "x" * 5000
|
||||
result = _sanitize_tool_error(long_msg)
|
||||
# Account for the [TOOL_ERROR] prefix
|
||||
assert len(result) <= 2000 + len("[TOOL_ERROR] ")
|
||||
assert result.endswith("...")
|
||||
|
||||
def test_xml_tag_stripping(self):
|
||||
"""XML-like boundary tags are stripped from errors."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
error = "<tool_call>Error: file not found</tool_call>"
|
||||
result = _sanitize_tool_error(error)
|
||||
assert "<tool_call>" not in result
|
||||
assert "</tool_call>" not in result
|
||||
assert "file not found" in result
|
||||
|
||||
def test_system_tag_stripping(self):
|
||||
"""System/assistant/user tags are stripped."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
error = "<system>Permission denied</system>"
|
||||
result = _sanitize_tool_error(error)
|
||||
assert "<system>" not in result
|
||||
assert "Permission denied" in result
|
||||
|
||||
def test_code_fence_stripping(self):
|
||||
"""Markdown code fences are stripped."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
error = "```json\n{\"error\": \"bad\"}\n```"
|
||||
result = _sanitize_tool_error(error)
|
||||
assert "```" not in result
|
||||
|
||||
def test_cdata_stripping(self):
|
||||
"""CDATA sections are stripped."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
error = "Error: <![CDATA[some internal data]]> happened"
|
||||
result = _sanitize_tool_error(error)
|
||||
assert "CDATA" not in result
|
||||
assert "happened" in result
|
||||
|
||||
def test_error_format_prefix(self):
|
||||
"""Error is wrapped with [TOOL_ERROR] prefix."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
result = _sanitize_tool_error("something went wrong")
|
||||
assert result.startswith("[TOOL_ERROR]")
|
||||
assert "something went wrong" in result
|
||||
|
||||
def test_short_error_preserved(self):
|
||||
"""Short, clean errors are preserved intact (with prefix)."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
result = _sanitize_tool_error("File not found: /tmp/test.txt")
|
||||
assert result == "[TOOL_ERROR] File not found: /tmp/test.txt"
|
||||
|
||||
def test_handle_function_call_uses_sanitizer(self):
|
||||
"""handle_function_call sanitizes error messages from exceptions."""
|
||||
from model_tools import handle_function_call, _sanitize_tool_error
|
||||
# The registry returns its own error for unknown tools (not via the
|
||||
# except block). Verify the sanitizer is called in the except path
|
||||
# by directly testing what would happen.
|
||||
raw_error = "Error executing bad_tool: <system>Internal traceback</system>"
|
||||
sanitized = _sanitize_tool_error(raw_error)
|
||||
result_json = json.dumps({"error": sanitized}, ensure_ascii=False)
|
||||
parsed = json.loads(result_json)
|
||||
assert "[TOOL_ERROR]" in parsed["error"]
|
||||
assert "<system>" not in parsed["error"]
|
||||
|
||||
def test_mixed_tags_and_long_error(self):
|
||||
"""Complex error with tags AND length > 2000."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
error = "<result>" + ("a" * 3000) + "</result>"
|
||||
result = _sanitize_tool_error(error)
|
||||
assert "<result>" not in result
|
||||
assert "</result>" not in result
|
||||
assert len(result) <= 2020 # prefix + 2000 + ...
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Feature 1: Discard truncated tool calls on finish_reason=length
|
||||
# =========================================================================
|
||||
|
||||
class TestTruncatedToolCallDiscard:
|
||||
"""Test that truncated tool calls (finish_reason=length) are discarded."""
|
||||
|
||||
def test_truncated_tool_calls_message_content(self):
|
||||
"""Verify the truncation nudge message text is correct."""
|
||||
expected_nudge = (
|
||||
'Your previous response was truncated due to context length limits. '
|
||||
'The tool calls were discarded. Please summarize your progress so '
|
||||
'far and continue with a shorter response.'
|
||||
)
|
||||
# This is the message that should be injected into the conversation
|
||||
assert "truncated" in expected_nudge.lower()
|
||||
assert "discarded" in expected_nudge.lower()
|
||||
|
||||
def test_tools_temporarily_disabled_attribute(self):
|
||||
"""Verify the _tools_temporarily_disabled attribute pattern works."""
|
||||
# Test the attribute access pattern used in the implementation
|
||||
obj = SimpleNamespace()
|
||||
assert getattr(obj, '_tools_temporarily_disabled', False) is False
|
||||
obj._tools_temporarily_disabled = True
|
||||
assert getattr(obj, '_tools_temporarily_disabled', False) is True
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Feature 2: Empty response recovery
|
||||
# =========================================================================
|
||||
|
||||
class TestEmptyResponseRecovery:
|
||||
"""Test empty response recovery behavior."""
|
||||
|
||||
def test_empty_response_nudge_text(self):
|
||||
"""Verify the nudge message for empty responses."""
|
||||
nudge = "Your previous response was empty. Please continue with the task."
|
||||
assert "empty" in nudge.lower()
|
||||
assert "continue" in nudge.lower()
|
||||
|
||||
def test_prior_meaningful_output_detection(self):
|
||||
"""Test logic for detecting prior meaningful output in messages."""
|
||||
messages = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Here is a detailed response about your question."},
|
||||
{"role": "user", "content": "Thanks, continue"},
|
||||
]
|
||||
# Check that we can find prior assistant output
|
||||
has_prior = any(
|
||||
isinstance(m, dict)
|
||||
and m.get("role") == "assistant"
|
||||
and m.get("content")
|
||||
and len(m["content"].strip()) > 0
|
||||
for m in messages
|
||||
)
|
||||
assert has_prior is True
|
||||
|
||||
def test_no_prior_meaningful_output(self):
|
||||
"""Test when no prior meaningful assistant output exists."""
|
||||
messages = [
|
||||
{"role": "user", "content": "Hello"},
|
||||
]
|
||||
has_prior = any(
|
||||
isinstance(m, dict)
|
||||
and m.get("role") == "assistant"
|
||||
and m.get("content")
|
||||
and len(m["content"].strip()) > 0
|
||||
for m in messages
|
||||
)
|
||||
assert has_prior is False
|
||||
|
||||
def test_think_block_only_not_meaningful(self):
|
||||
"""Responses with only think blocks should not count as meaningful."""
|
||||
messages = [
|
||||
{"role": "assistant", "content": "<think>Internal reasoning only</think>"},
|
||||
]
|
||||
# The agent uses _has_content_after_think_block to check this
|
||||
# For our test, verify the pattern: content that's only a think block
|
||||
content = messages[0]["content"]
|
||||
stripped = re.sub(
|
||||
r'<(?:REASONING_SCRATCHPAD|think|reasoning)>.*?</(?:REASONING_SCRATCHPAD|think|reasoning)>',
|
||||
'', content, flags=re.DOTALL
|
||||
).strip()
|
||||
assert stripped == "" # No meaningful content after stripping think blocks
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Integration-style tests for sanitize_tool_error in handle_function_call
|
||||
# =========================================================================
|
||||
|
||||
class TestHandleFunctionCallSanitization:
|
||||
"""Test that handle_function_call properly sanitizes errors."""
|
||||
|
||||
def test_registry_dispatch_error_sanitized(self):
|
||||
"""When registry.dispatch raises, the error should be sanitized."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
|
||||
# Simulate what happens in the except block
|
||||
error = Exception("Connection refused: <system>Internal error</system> " + "x" * 3000)
|
||||
raw_error = f"Error executing test_tool: {str(error)}"
|
||||
sanitized = _sanitize_tool_error(raw_error)
|
||||
|
||||
result_json = json.dumps({"error": sanitized}, ensure_ascii=False)
|
||||
parsed = json.loads(result_json)
|
||||
|
||||
assert "[TOOL_ERROR]" in parsed["error"]
|
||||
assert "<system>" not in parsed["error"]
|
||||
# Truncated
|
||||
assert len(parsed["error"]) <= 2020
|
||||
|
||||
def test_normal_error_readable(self):
|
||||
"""Normal short errors should remain readable."""
|
||||
from model_tools import _sanitize_tool_error
|
||||
result = _sanitize_tool_error("Error executing write_file: Permission denied")
|
||||
assert "Permission denied" in result
|
||||
assert result.startswith("[TOOL_ERROR]")
|
||||
@@ -135,7 +135,6 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"state": job.get("state", "scheduled" if job.get("enabled", True) else "paused"),
|
||||
"paused_at": job.get("paused_at"),
|
||||
"paused_reason": job.get("paused_reason"),
|
||||
"script": job.get("script"),
|
||||
}
|
||||
|
||||
|
||||
@@ -154,7 +153,6 @@ def cronjob(
|
||||
provider: Optional[str] = None,
|
||||
base_url: Optional[str] = None,
|
||||
reason: Optional[str] = None,
|
||||
script: Optional[str] = None,
|
||||
task_id: str = None,
|
||||
) -> str:
|
||||
"""Unified cron job management tool."""
|
||||
@@ -185,7 +183,6 @@ def cronjob(
|
||||
model=_normalize_optional_job_value(model),
|
||||
provider=_normalize_optional_job_value(provider),
|
||||
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
|
||||
script=script,
|
||||
)
|
||||
return json.dumps(
|
||||
{
|
||||
@@ -268,8 +265,6 @@ def cronjob(
|
||||
updates["provider"] = _normalize_optional_job_value(provider)
|
||||
if base_url is not None:
|
||||
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
|
||||
if script is not None:
|
||||
updates["script"] = script if script else None
|
||||
if repeat is not None:
|
||||
# Normalize: treat 0 or negative as None (infinite)
|
||||
normalized_repeat = None if repeat <= 0 else repeat
|
||||
@@ -407,10 +402,6 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
|
||||
"reason": {
|
||||
"type": "string",
|
||||
"description": "Optional pause reason"
|
||||
},
|
||||
"script": {
|
||||
"type": "string",
|
||||
"description": "Optional bash script to run before waking the agent. Must output JSON on its last line: {\"wakeAgent\": boolean, \"data\"?: any}. If wakeAgent is false, the agent is skipped entirely. Useful for frequent schedules where you only want the agent to run when something changed."
|
||||
}
|
||||
},
|
||||
"required": ["action"]
|
||||
@@ -460,7 +451,6 @@ registry.register(
|
||||
provider=args.get("provider"),
|
||||
base_url=args.get("base_url"),
|
||||
reason=args.get("reason"),
|
||||
script=args.get("script"),
|
||||
task_id=kw.get("task_id"),
|
||||
),
|
||||
check_fn=check_cronjob_requirements,
|
||||
|
||||
Reference in New Issue
Block a user