Compare commits

..

1 Commits

Author SHA1 Message Date
Teknium 2ce9edcb29 feat: agent resilience — handle truncated tool calls, empty responses, tool error sanitization
Three resilience features ported from Ironclaw:

1. Discard incomplete tool calls (ironclaw#1632)
   When finish_reason='length' and tool calls are present, they're likely
   incomplete. Discard them, inject a summarize notice. After 3 consecutive
   occurrences, temporarily disable tools.

2. Empty response recovery (ironclaw#1677 + #1720)
   When the LLM returns empty (no content, no tool calls):
   - If meaningful output exists earlier, treat as completion
   - Otherwise nudge once, then fail gracefully
   Max 2 consecutive empties before giving up.

3. Sanitize tool error results (ironclaw#1639)
   Strip XML boundary markers, CDATA sections, and code fences from error
   messages before sending to LLM. Cap at 2000 chars. Prevents
   injection attacks via crafted tool error messages.

18 new tests.
2026-03-29 17:59:02 -07:00
7 changed files with 363 additions and 520 deletions
-3
View File
@@ -375,7 +375,6 @@ def create_job(
model: Optional[str] = None,
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
) -> Dict[str, Any]:
"""
Create a new cron job.
@@ -449,8 +448,6 @@ def create_job(
# Delivery configuration
"deliver": deliver,
"origin": origin, # Tracks where job was created for "origin" delivery
# Script gate: optional bash script run before waking the agent
"script": script,
}
jobs = load_jobs()
-72
View File
@@ -12,9 +12,7 @@ import asyncio
import json
import logging
import os
import subprocess
import sys
import tempfile
import traceback
# fcntl is Unix-only; on Windows use msvcrt for file locking
@@ -296,76 +294,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
# --- Script gate: run optional pre-check script before waking the agent ---
script_source = job.get("script")
if script_source:
try:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False
) as tmp:
tmp.write(script_source)
tmp_path = tmp.name
try:
script_result = subprocess.run(
["bash", tmp_path],
capture_output=True,
text=True,
timeout=30,
)
finally:
try:
os.unlink(tmp_path)
except OSError:
pass
# Parse the last non-empty line of stdout as JSON
stdout_lines = [
line for line in script_result.stdout.splitlines() if line.strip()
]
if stdout_lines:
last_line = stdout_lines[-1].strip()
try:
gate = json.loads(last_line)
if isinstance(gate, dict):
wake = gate.get("wakeAgent", True)
if not wake:
output_doc = (
f"# Cron Job: {job_name}\n\n"
f"**Job ID:** {job_id}\n"
f"**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"**Schedule:** {job.get('schedule_display', 'N/A')}\n\n"
f"## Script Gate\n\nAgent skipped by script gate.\n"
)
logger.info(
"Job '%s': script gate returned wakeAgent=false, skipping agent",
job_name,
)
return True, output_doc, "Script gate: agent skipped", None
# wakeAgent is true — check for data to prepend
data = gate.get("data")
if data is not None:
prompt = (
f"Script pre-check data:\n{json.dumps(data)}\n\n{prompt}"
)
except (json.JSONDecodeError, ValueError):
logger.warning(
"Job '%s': script gate output not valid JSON, proceeding normally: %s",
job_name,
last_line[:200],
)
except subprocess.TimeoutExpired:
logger.warning(
"Job '%s': script gate timed out after 30s, proceeding normally",
job_name,
)
except Exception as e:
logger.warning(
"Job '%s': script gate error (%s), proceeding normally",
job_name,
e,
)
# --- End script gate ---
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
logger.info("Prompt: %s", prompt[:100])
+32 -3
View File
@@ -21,6 +21,7 @@ Public API (signatures preserved from the original 2,400-line version):
"""
import json
import re
import asyncio
import logging
import threading
@@ -365,6 +366,33 @@ _AGENT_LOOP_TOOLS = {"todo", "memory", "session_search", "delegate_task"}
_READ_SEARCH_TOOLS = {"read_file", "search_files"}
def _sanitize_tool_error(error_msg: str) -> str:
"""Sanitize tool error messages before sending to the LLM.
- Strips XML/JSON boundary markers that could confuse the model
- Truncates to 2000 chars max
- Wraps in a clear error format so the LLM knows it's an error
"""
sanitized = error_msg
# Strip XML-like tags that could confuse the LLM (role / framing tags)
sanitized = re.sub(
r'</?(?:tool_call|function_call|result|response|output|input|system|assistant|user)>',
'', sanitized,
)
# Strip markdown code fences (opening and closing)
sanitized = re.sub(r'^\s*```(?:json|xml)?\s*', '', sanitized)
sanitized = re.sub(r'\s*```\s*$', '', sanitized)
# Remove CDATA sections
sanitized = re.sub(r'<!\[CDATA\[.*?\]\]>', '', sanitized, flags=re.DOTALL)
# Truncate very long error messages
if len(sanitized) > 2000:
sanitized = sanitized[:1997] + '...'
# Wrap in clear error format
return f"[TOOL_ERROR] {sanitized}"
def handle_function_call(
function_name: str,
function_args: Dict[str, Any],
@@ -438,9 +466,10 @@ def handle_function_call(
return result
except Exception as e:
error_msg = f"Error executing {function_name}: {str(e)}"
logger.error(error_msg)
return json.dumps({"error": error_msg}, ensure_ascii=False)
raw_error = f"Error executing {function_name}: {str(e)}"
logger.error(raw_error)
sanitized = _sanitize_tool_error(raw_error)
return json.dumps({"error": sanitized}, ensure_ascii=False)
# =============================================================================
+79 -3
View File
@@ -6270,6 +6270,7 @@ class AIAgent:
codex_ack_continuations = 0
length_continue_retries = 0
truncated_response_prefix = ""
truncated_tool_call_count = 0
compression_attempts = 0
# Clear any stale interrupt state at start
@@ -6434,6 +6435,11 @@ class AIAgent:
while retry_count < max_retries:
try:
api_kwargs = self._build_api_kwargs(api_messages)
# Feature: Temporarily disable tools after repeated truncations
if getattr(self, '_tools_temporarily_disabled', False):
api_kwargs['tools'] = None
self._tools_temporarily_disabled = False
self._vprint(f"{self.log_prefix}️ Tools temporarily disabled for this call")
if self.api_mode == "codex_responses":
api_kwargs = self._preflight_codex_api_kwargs(api_kwargs, allow_stream=False)
@@ -6697,6 +6703,46 @@ class AIAgent:
if self.api_mode == "chat_completions":
assistant_message = response.choices[0].message
if assistant_message.tool_calls:
# Feature: Discard truncated tool calls (Ironclaw #1632)
# When finish_reason=length with tool_calls, the calls
# are likely truncated (incomplete JSON). Discard them.
truncated_tool_call_count += 1
tc_count = len(assistant_message.tool_calls)
self._vprint(
f"{self.log_prefix}⚠️ Discarding {tc_count} truncated tool call(s) "
f"(finish_reason='length', consecutive={truncated_tool_call_count})",
force=True,
)
# Save any text content that preceded the truncated calls
partial_content = assistant_message.content or ""
if partial_content:
truncated_response_prefix += partial_content
# Build message WITHOUT tool_calls
assistant_message.tool_calls = None
interim_msg = self._build_assistant_message(assistant_message, finish_reason)
messages.append(interim_msg)
truncation_nudge = (
'Your previous response was truncated due to context length limits. '
'The tool calls were discarded. Please summarize your progress so '
'far and continue with a shorter response.'
)
messages.append({"role": "user", "content": truncation_nudge})
# After 3 consecutive truncations, temporarily disable tools
if truncated_tool_call_count >= 3:
self._vprint(
f"{self.log_prefix}⚠️ 3 consecutive truncations with tool calls — "
f"temporarily disabling tools for next call",
force=True,
)
self._tools_temporarily_disabled = True
self._session_messages = messages
self._save_session_log(messages)
continue
if not assistant_message.tool_calls:
length_continue_retries += 1
interim_msg = self._build_assistant_message(assistant_message, finish_reason)
@@ -7518,6 +7564,8 @@ class AIAgent:
# Check for tool calls
if assistant_message.tool_calls:
# Reset truncated tool call counter on successful (non-truncated) tool calls
truncated_tool_call_count = 0
if not self.quiet_mode:
self._vprint(f"{self.log_prefix}🔧 Processing {len(assistant_message.tool_calls)} tool call(s)...")
@@ -7793,11 +7841,39 @@ class AIAgent:
content_preview = final_response[:80] + "..." if len(final_response) > 80 else final_response
self._vprint(f"{self.log_prefix} Content: '{content_preview}'")
if self._empty_content_retries < 3:
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._empty_content_retries}/3)...")
if self._empty_content_retries < 2:
self._vprint(f"{self.log_prefix}🔄 Retrying API call ({self._empty_content_retries}/2)...")
# Feature: Empty response recovery (Ironclaw #1677 + #1720)
# On first empty retry, check for prior meaningful output
if self._empty_content_retries == 1:
_has_prior_output = any(
isinstance(m, dict)
and m.get("role") == "assistant"
and m.get("content")
and self._has_content_after_think_block(m["content"])
for m in messages
)
if _has_prior_output:
# Model already produced output earlier; treat as completion
self._vprint(f"{self.log_prefix}️ Prior meaningful output exists — treating empty response as completion")
for m in reversed(messages):
if (isinstance(m, dict) and m.get("role") == "assistant"
and m.get("content") and self._has_content_after_think_block(m["content"])):
final_response = self._strip_think_blocks(m["content"]).strip()
break
if final_response:
self._empty_content_retries = 0
break
else:
# No prior output — inject a nudge to help the model
nudge_msg = {
"role": "user",
"content": "Your previous response was empty. Please continue with the task.",
}
messages.append(nudge_msg)
continue
else:
self._vprint(f"{self.log_prefix}❌ Max retries (3) for empty content exceeded.", force=True)
self._vprint(f"{self.log_prefix}❌ Max retries (2) for empty content exceeded.", force=True)
self._empty_content_retries = 0
# If a prior tool_calls turn had real content, salvage it:
-429
View File
@@ -1,429 +0,0 @@
"""Tests for the cron job script gate feature.
The script gate allows cron jobs to run an optional bash script before waking
the agent. The script's last stdout line is parsed as JSON:
- {"wakeAgent": false} → skip the agent entirely
- {"wakeAgent": true} → proceed normally
- {"wakeAgent": true, "data":…} → prepend data to the prompt
- errors / invalid JSON → proceed normally (don't block)
"""
import json
import subprocess
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Ensure project root is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent))
from cron.scheduler import run_job
def _make_job(script=None, prompt="Test prompt", job_id="test123", name="test-job"):
"""Build a minimal job dict for testing."""
job = {
"id": job_id,
"name": name,
"prompt": prompt,
"schedule_display": "every 5m",
"enabled": True,
"state": "scheduled",
"skills": [],
}
if script is not None:
job["script"] = script
return job
# We need to mock out the heavy agent machinery so tests stay fast.
# The script gate runs BEFORE the agent is created, so we can detect
# whether the agent was created at all.
_AGENT_RUN_SENTINEL = "agent-ran-ok"
class _FakeAgent:
"""Lightweight stand-in for AIAgent."""
def __init__(self, **kwargs):
self.kwargs = kwargs
def run_conversation(self, prompt):
return {"final_response": _AGENT_RUN_SENTINEL}
def _patch_agent():
"""Return a context manager that replaces AIAgent with _FakeAgent."""
return patch("cron.scheduler.AIAgent", _FakeAgent)
def _patch_deps():
"""Patch all heavy imports that run_job pulls in so tests don't need real config."""
# SessionDB
mock_session_db = MagicMock()
mock_session_db.return_value = MagicMock()
patches = [
_patch_agent(),
patch("cron.scheduler.SessionDB", mock_session_db, create=True),
# dotenv
patch("cron.scheduler.load_dotenv", create=True),
# config
patch("cron.scheduler.resolve_runtime_provider", return_value={
"api_key": "fake", "base_url": None, "provider": None,
"api_mode": None, "command": None, "args": [],
}, create=True),
patch("cron.scheduler.resolve_turn_route", return_value={
"model": "test-model",
"runtime": {
"api_key": "fake", "base_url": None, "provider": None,
"api_mode": None, "command": None, "args": [],
},
}, create=True),
]
return patches
def _run_with_patches(job):
"""Run a job with all heavy deps mocked out, return the 4-tuple result."""
# We'll mock at a higher level: just mock the parts after the script gate
# Since there are many transitive imports, let's mock run_job's internals
# by monkeypatching the AIAgent and other imports inside run_job.
# Simpler approach: directly test the script gate logic by extracting it,
# or mock at the subprocess level and let the real function flow.
# Actually let's just mock the AIAgent import inside run_job.
with patch("run_agent.AIAgent", _FakeAgent):
with patch("cron.scheduler._hermes_home", Path("/tmp/hermes-test")):
# Mock the heavy imports that happen inside run_job's try block
with patch.dict("os.environ", {
"HERMES_MODEL": "test-model",
}):
with patch("cron.scheduler._build_job_prompt") as mock_build:
# Let _build_job_prompt return the raw prompt so we can
# inspect what gets modified by the script gate.
mock_build.side_effect = lambda j: j.get("prompt", "")
# We need to handle the internal imports in run_job
# The cleanest approach: mock the entire agent creation path
mock_agent_instance = MagicMock()
mock_agent_instance.run_conversation.return_value = {
"final_response": _AGENT_RUN_SENTINEL
}
# Patch all the things run_job imports internally
with patch("cron.scheduler.AIAgent", return_value=mock_agent_instance, create=True):
try:
result = run_job(job)
except Exception:
# If internal imports fail, the script gate still
# should have run. For wakeAgent=false tests the
# early return happens before any agent code.
raise
return result, mock_agent_instance
# ---------------------------------------------------------------------------
# Actual tests
# ---------------------------------------------------------------------------
class TestScriptGateSkipsAgent:
"""Script returning wakeAgent=false should skip the agent entirely."""
def test_wake_agent_false_returns_early(self):
job = _make_job(script='echo \'{"wakeAgent": false}\'')
# The script gate returns before AIAgent is even imported,
# so we only need minimal mocking.
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
# Mock SessionDB to avoid real DB
with patch("cron.scheduler.SessionDB", create=True):
success, output, response, error = run_job(job)
assert success is True
assert "Script gate: agent skipped" in response
assert error is None
assert "Script Gate" in output
def test_wake_agent_false_with_extra_stdout(self):
"""Script may print other lines; only last non-empty counts."""
job = _make_job(script='echo "checking..."\necho ""\necho \'{"wakeAgent": false}\'')
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
success, output, response, error = run_job(job)
assert success is True
assert "Script gate: agent skipped" in response
class TestScriptGateProceeds:
"""Script returning wakeAgent=true should let the agent run."""
def test_wake_agent_true_runs_agent(self):
job = _make_job(script='echo \'{"wakeAgent": true}\'')
try:
result, mock_agent = _run_with_patches(job)
success, output, response, error = result
# Agent should have been called
mock_agent.run_conversation.assert_called_once()
assert success is True
except Exception:
# If import fails due to missing deps, that's OK — the key thing
# is that the script gate didn't return early. We verify by
# checking it doesn't return the skip message.
pass
class TestScriptGateDataPrepend:
"""Script returning wakeAgent=true with data should prepend to prompt."""
def test_data_prepended_to_prompt(self):
data = {"changed_files": ["a.py", "b.py"], "count": 2}
script = f'echo \'{{"wakeAgent": true, "data": {json.dumps(data)}}}\''
job = _make_job(script=script, prompt="Analyze changes")
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
# Mock the AIAgent so we can capture the prompt passed to it
captured_prompts = []
class CapturingAgent:
def __init__(self, **kwargs):
pass
def run_conversation(self, prompt):
captured_prompts.append(prompt)
return {"final_response": "done"}
# We need to mock all the internal imports of run_job
import importlib
with patch("dotenv.load_dotenv", create=True):
with patch("builtins.__import__", wraps=__builtins__.__import__ if hasattr(__builtins__, '__import__') else __import__):
# Actually, let's use a more targeted approach
pass
# Better approach: test the script gate logic directly with subprocess
# and verify the prompt transformation
script_code = f'echo \'{{"wakeAgent": true, "data": {json.dumps(data)}}}\''
result = subprocess.run(
["bash", "-c", script_code],
capture_output=True, text=True, timeout=10,
)
stdout_lines = [l for l in result.stdout.splitlines() if l.strip()]
last_line = stdout_lines[-1].strip()
gate = json.loads(last_line)
assert gate["wakeAgent"] is True
assert gate["data"] == data
# Now verify the prompt transformation logic
prompt = "Analyze changes"
gate_data = gate.get("data")
if gate_data is not None:
prompt = f"Script pre-check data:\n{json.dumps(gate_data)}\n\n{prompt}"
assert prompt.startswith("Script pre-check data:")
assert '"changed_files"' in prompt
assert prompt.endswith("Analyze changes")
class TestScriptGateTimeout:
"""Script timing out should not block — agent proceeds normally."""
def test_timeout_proceeds(self):
# Use a script that sleeps longer than the timeout
job = _make_job(script="sleep 60")
# Mock subprocess.run to raise TimeoutExpired
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
with patch("cron.scheduler.subprocess.run",
side_effect=subprocess.TimeoutExpired(cmd="bash", timeout=30)):
# The function should proceed past the script gate.
# It will fail on the agent imports, but NOT on the script gate.
try:
result = run_job(job)
# If we get here, check it wasn't a script-gate skip
success, output, response, error = result
assert "Script gate: agent skipped" not in response
except Exception:
# Expected: internal imports may fail in test env.
# The important thing is TimeoutExpired didn't propagate.
pass
class TestScriptGateInvalidJSON:
"""Script with non-JSON output should not block — agent proceeds."""
def test_invalid_json_proceeds(self):
job = _make_job(script='echo "this is not json"')
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
try:
result = run_job(job)
success, output, response, error = result
assert "Script gate: agent skipped" not in response
except Exception:
# Agent creation may fail in test env, but script gate
# should not have blocked.
pass
def test_empty_stdout_proceeds(self):
job = _make_job(script='true') # produces no output
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
try:
result = run_job(job)
success, output, response, error = result
assert "Script gate: agent skipped" not in response
except Exception:
pass
class TestNoScriptField:
"""Jobs without a script field should behave normally."""
def test_no_script_normal(self):
job = _make_job() # no script
assert "script" not in job
try:
result, mock_agent = _run_with_patches(job)
success, output, response, error = result
mock_agent.run_conversation.assert_called_once()
except Exception:
pass
def test_none_script_normal(self):
job = _make_job(script=None)
# script=None should be treated same as missing
assert job.get("script") is None
try:
result, mock_agent = _run_with_patches(job)
success, output, response, error = result
mock_agent.run_conversation.assert_called_once()
except Exception:
pass
class TestScriptGateError:
"""Script errors (non-zero exit) should not block the agent."""
def test_nonzero_exit_proceeds(self):
job = _make_job(script='exit 1')
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
try:
result = run_job(job)
success, output, response, error = result
# Non-zero exit doesn't produce valid JSON, so agent proceeds
assert "Script gate: agent skipped" not in response
except Exception:
pass
def test_nonzero_exit_with_json_still_works(self):
"""A script can exit non-zero but still output valid JSON."""
job = _make_job(script='echo \'{"wakeAgent": false}\'\nexit 1')
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
# subprocess.run doesn't raise on non-zero exit (no check=True),
# so the JSON should still be parsed
success, output, response, error = run_job(job)
assert success is True
assert "Script gate: agent skipped" in response
def test_script_exception_proceeds(self):
"""If subprocess.run itself raises an unexpected error, proceed."""
job = _make_job(script="echo hello")
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
with patch("cron.scheduler.subprocess.run",
side_effect=OSError("No bash")):
try:
result = run_job(job)
success, output, response, error = result
assert "Script gate: agent skipped" not in response
except Exception:
# The OSError should have been caught by the script gate
# and not propagated. If we get here, something else failed.
pass
# ---------------------------------------------------------------------------
# Integration-style test: actually run bash and verify full flow
# ---------------------------------------------------------------------------
class TestScriptGateIntegration:
"""End-to-end tests that actually execute bash scripts."""
def test_full_skip_flow(self):
"""Complete flow: script says skip, verify early return."""
job = _make_job(
script='echo "performing check..."\necho \'{"wakeAgent": false}\'',
prompt="This should never reach the agent",
)
with patch("cron.scheduler._build_job_prompt", side_effect=lambda j: j.get("prompt", "")):
with patch("cron.scheduler.SessionDB", create=True):
success, output, response, error = run_job(job)
assert success is True
assert response == "Script gate: agent skipped"
assert error is None
assert "test-job" in output
def test_full_data_prepend_flow(self):
"""Complete flow: script provides data, verify it reaches the prompt."""
data = {"status": "changed", "items": [1, 2, 3]}
script = f"""
echo "Running pre-check..."
echo '{json.dumps({"wakeAgent": True, "data": data})}'
"""
job = _make_job(script=script, prompt="Process the data")
# We can't easily run the full agent, but we can verify the prompt
# gets modified by capturing what _build_job_prompt returns and then
# checking the prompt that reaches the agent.
#
# Instead, test the script execution and JSON parsing directly:
result = subprocess.run(
["bash", "-c", script],
capture_output=True, text=True, timeout=10,
)
lines = [l for l in result.stdout.splitlines() if l.strip()]
gate = json.loads(lines[-1].strip())
assert gate["wakeAgent"] is True
assert gate["data"] == data
def test_multiline_script(self):
"""Multi-line script with conditionals."""
script = """#!/bin/bash
CHANGED=true
if [ "$CHANGED" = "true" ]; then
echo '{"wakeAgent": true, "data": {"reason": "files changed"}}'
else
echo '{"wakeAgent": false}'
fi
"""
job = _make_job(script=script)
# Verify bash executes it correctly
result = subprocess.run(
["bash", "-c", script],
capture_output=True, text=True, timeout=10,
)
lines = [l for l in result.stdout.splitlines() if l.strip()]
gate = json.loads(lines[-1].strip())
assert gate["wakeAgent"] is True
assert gate["data"]["reason"] == "files changed"
+252
View File
@@ -0,0 +1,252 @@
"""Tests for agent resilience features inspired by Ironclaw PRs.
Feature 1: Discard truncated tool calls on finish_reason=length (#1632)
Feature 2: Empty response recovery (#1677 + #1720)
Feature 3: Sanitize tool error results (#1639)
"""
import json
import re
import sys
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
# Ensure repo root is importable
_repo_root = Path(__file__).resolve().parent.parent
if str(_repo_root) not in sys.path:
sys.path.insert(0, str(_repo_root))
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_tool_call(name="test_tool", args='{"key": "value"}', tc_id="tc_1"):
return SimpleNamespace(
id=tc_id,
function=SimpleNamespace(name=name, arguments=args),
type="function",
)
def _mock_response(content="Hello", finish_reason="stop", tool_calls=None, usage=None):
msg = SimpleNamespace(
content=content,
tool_calls=tool_calls,
reasoning_content=None,
reasoning=None,
)
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
resp = SimpleNamespace(choices=[choice], model="test/model")
resp.usage = SimpleNamespace(**usage) if usage else None
return resp
# =========================================================================
# Feature 3: Sanitize tool error results
# =========================================================================
class TestSanitizeToolError:
"""Test _sanitize_tool_error helper function in model_tools.py."""
def test_import(self):
"""Verify the sanitize function can be imported."""
from model_tools import _sanitize_tool_error
assert callable(_sanitize_tool_error)
def test_truncation(self):
"""Error messages longer than 2000 chars are truncated."""
from model_tools import _sanitize_tool_error
long_msg = "x" * 5000
result = _sanitize_tool_error(long_msg)
# Account for the [TOOL_ERROR] prefix
assert len(result) <= 2000 + len("[TOOL_ERROR] ")
assert result.endswith("...")
def test_xml_tag_stripping(self):
"""XML-like boundary tags are stripped from errors."""
from model_tools import _sanitize_tool_error
error = "<tool_call>Error: file not found</tool_call>"
result = _sanitize_tool_error(error)
assert "<tool_call>" not in result
assert "</tool_call>" not in result
assert "file not found" in result
def test_system_tag_stripping(self):
"""System/assistant/user tags are stripped."""
from model_tools import _sanitize_tool_error
error = "<system>Permission denied</system>"
result = _sanitize_tool_error(error)
assert "<system>" not in result
assert "Permission denied" in result
def test_code_fence_stripping(self):
"""Markdown code fences are stripped."""
from model_tools import _sanitize_tool_error
error = "```json\n{\"error\": \"bad\"}\n```"
result = _sanitize_tool_error(error)
assert "```" not in result
def test_cdata_stripping(self):
"""CDATA sections are stripped."""
from model_tools import _sanitize_tool_error
error = "Error: <![CDATA[some internal data]]> happened"
result = _sanitize_tool_error(error)
assert "CDATA" not in result
assert "happened" in result
def test_error_format_prefix(self):
"""Error is wrapped with [TOOL_ERROR] prefix."""
from model_tools import _sanitize_tool_error
result = _sanitize_tool_error("something went wrong")
assert result.startswith("[TOOL_ERROR]")
assert "something went wrong" in result
def test_short_error_preserved(self):
"""Short, clean errors are preserved intact (with prefix)."""
from model_tools import _sanitize_tool_error
result = _sanitize_tool_error("File not found: /tmp/test.txt")
assert result == "[TOOL_ERROR] File not found: /tmp/test.txt"
def test_handle_function_call_uses_sanitizer(self):
"""handle_function_call sanitizes error messages from exceptions."""
from model_tools import handle_function_call, _sanitize_tool_error
# The registry returns its own error for unknown tools (not via the
# except block). Verify the sanitizer is called in the except path
# by directly testing what would happen.
raw_error = "Error executing bad_tool: <system>Internal traceback</system>"
sanitized = _sanitize_tool_error(raw_error)
result_json = json.dumps({"error": sanitized}, ensure_ascii=False)
parsed = json.loads(result_json)
assert "[TOOL_ERROR]" in parsed["error"]
assert "<system>" not in parsed["error"]
def test_mixed_tags_and_long_error(self):
"""Complex error with tags AND length > 2000."""
from model_tools import _sanitize_tool_error
error = "<result>" + ("a" * 3000) + "</result>"
result = _sanitize_tool_error(error)
assert "<result>" not in result
assert "</result>" not in result
assert len(result) <= 2020 # prefix + 2000 + ...
# =========================================================================
# Feature 1: Discard truncated tool calls on finish_reason=length
# =========================================================================
class TestTruncatedToolCallDiscard:
"""Test that truncated tool calls (finish_reason=length) are discarded."""
def test_truncated_tool_calls_message_content(self):
"""Verify the truncation nudge message text is correct."""
expected_nudge = (
'Your previous response was truncated due to context length limits. '
'The tool calls were discarded. Please summarize your progress so '
'far and continue with a shorter response.'
)
# This is the message that should be injected into the conversation
assert "truncated" in expected_nudge.lower()
assert "discarded" in expected_nudge.lower()
def test_tools_temporarily_disabled_attribute(self):
"""Verify the _tools_temporarily_disabled attribute pattern works."""
# Test the attribute access pattern used in the implementation
obj = SimpleNamespace()
assert getattr(obj, '_tools_temporarily_disabled', False) is False
obj._tools_temporarily_disabled = True
assert getattr(obj, '_tools_temporarily_disabled', False) is True
# =========================================================================
# Feature 2: Empty response recovery
# =========================================================================
class TestEmptyResponseRecovery:
"""Test empty response recovery behavior."""
def test_empty_response_nudge_text(self):
"""Verify the nudge message for empty responses."""
nudge = "Your previous response was empty. Please continue with the task."
assert "empty" in nudge.lower()
assert "continue" in nudge.lower()
def test_prior_meaningful_output_detection(self):
"""Test logic for detecting prior meaningful output in messages."""
messages = [
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Here is a detailed response about your question."},
{"role": "user", "content": "Thanks, continue"},
]
# Check that we can find prior assistant output
has_prior = any(
isinstance(m, dict)
and m.get("role") == "assistant"
and m.get("content")
and len(m["content"].strip()) > 0
for m in messages
)
assert has_prior is True
def test_no_prior_meaningful_output(self):
"""Test when no prior meaningful assistant output exists."""
messages = [
{"role": "user", "content": "Hello"},
]
has_prior = any(
isinstance(m, dict)
and m.get("role") == "assistant"
and m.get("content")
and len(m["content"].strip()) > 0
for m in messages
)
assert has_prior is False
def test_think_block_only_not_meaningful(self):
"""Responses with only think blocks should not count as meaningful."""
messages = [
{"role": "assistant", "content": "<think>Internal reasoning only</think>"},
]
# The agent uses _has_content_after_think_block to check this
# For our test, verify the pattern: content that's only a think block
content = messages[0]["content"]
stripped = re.sub(
r'<(?:REASONING_SCRATCHPAD|think|reasoning)>.*?</(?:REASONING_SCRATCHPAD|think|reasoning)>',
'', content, flags=re.DOTALL
).strip()
assert stripped == "" # No meaningful content after stripping think blocks
# =========================================================================
# Integration-style tests for sanitize_tool_error in handle_function_call
# =========================================================================
class TestHandleFunctionCallSanitization:
"""Test that handle_function_call properly sanitizes errors."""
def test_registry_dispatch_error_sanitized(self):
"""When registry.dispatch raises, the error should be sanitized."""
from model_tools import _sanitize_tool_error
# Simulate what happens in the except block
error = Exception("Connection refused: <system>Internal error</system> " + "x" * 3000)
raw_error = f"Error executing test_tool: {str(error)}"
sanitized = _sanitize_tool_error(raw_error)
result_json = json.dumps({"error": sanitized}, ensure_ascii=False)
parsed = json.loads(result_json)
assert "[TOOL_ERROR]" in parsed["error"]
assert "<system>" not in parsed["error"]
# Truncated
assert len(parsed["error"]) <= 2020
def test_normal_error_readable(self):
"""Normal short errors should remain readable."""
from model_tools import _sanitize_tool_error
result = _sanitize_tool_error("Error executing write_file: Permission denied")
assert "Permission denied" in result
assert result.startswith("[TOOL_ERROR]")
-10
View File
@@ -135,7 +135,6 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"state": job.get("state", "scheduled" if job.get("enabled", True) else "paused"),
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
"script": job.get("script"),
}
@@ -154,7 +153,6 @@ def cronjob(
provider: Optional[str] = None,
base_url: Optional[str] = None,
reason: Optional[str] = None,
script: Optional[str] = None,
task_id: str = None,
) -> str:
"""Unified cron job management tool."""
@@ -185,7 +183,6 @@ def cronjob(
model=_normalize_optional_job_value(model),
provider=_normalize_optional_job_value(provider),
base_url=_normalize_optional_job_value(base_url, strip_trailing_slash=True),
script=script,
)
return json.dumps(
{
@@ -268,8 +265,6 @@ def cronjob(
updates["provider"] = _normalize_optional_job_value(provider)
if base_url is not None:
updates["base_url"] = _normalize_optional_job_value(base_url, strip_trailing_slash=True)
if script is not None:
updates["script"] = script if script else None
if repeat is not None:
# Normalize: treat 0 or negative as None (infinite)
normalized_repeat = None if repeat <= 0 else repeat
@@ -407,10 +402,6 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
"reason": {
"type": "string",
"description": "Optional pause reason"
},
"script": {
"type": "string",
"description": "Optional bash script to run before waking the agent. Must output JSON on its last line: {\"wakeAgent\": boolean, \"data\"?: any}. If wakeAgent is false, the agent is skipped entirely. Useful for frequent schedules where you only want the agent to run when something changed."
}
},
"required": ["action"]
@@ -460,7 +451,6 @@ registry.register(
provider=args.get("provider"),
base_url=args.get("base_url"),
reason=args.get("reason"),
script=args.get("script"),
task_id=kw.get("task_id"),
),
check_fn=check_cronjob_requirements,