Compare commits

..

10 Commits

Author SHA1 Message Date
hjc-puro
31c733383b add tracking for cluster failurse 2025-11-15 00:01:19 -05:00
hjc-puro
0c618482c4 add logging of prefix of tool call and tool response 2025-11-07 14:43:44 -05:00
hjc-puro
2d8f6c46f1 log first 20 chars 2025-11-07 14:08:06 -05:00
teknium
c27787f09f fix gitignore again 2025-11-05 06:43:03 +00:00
teknium
d90fcd4e2b update gitignore 2025-11-05 06:43:03 +00:00
Teknium
69fd0ca9aa Merge pull request #7 from NousResearch/test
some cleanups
2025-11-04 19:54:49 -08:00
Teknium
4135cf4682 Merge branch 'main' into test 2025-11-04 19:54:40 -08:00
teknium
c82741c3d8 some cleanups 2025-11-05 03:47:17 +00:00
Teknium
9573b2ac2d Merge pull request #6 from NousResearch/fix-leakage
Fix VM instance sharing across tasks
2025-11-04 02:15:32 -08:00
Teknium
ab5c9fc37b Merge pull request #5 from NousResearch/update-snapshot
Update snapshot
2025-11-02 21:30:08 -08:00
17 changed files with 1372 additions and 196 deletions

6
.gitignore vendored
View File

@@ -20,4 +20,8 @@ logs/
data/
.pytest_cache/
tmp/
temp_vision_images/
temp_vision_images/
hermes-*/*
examples/
tests/quick_test_dataset.jsonl
tests/sample_dataset.jsonl

View File

@@ -9,15 +9,21 @@ across multiple prompts from a dataset. It includes:
- Checkpointing for fault tolerance and resumption
- Trajectory saving in the proper format (from/value pairs)
- Tool usage statistics aggregation across all batches
- Cluster failure detection and graceful shutdown (morph, firecrawl, API errors)
- Configurable failure thresholds with automatic data consolidation
Usage:
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run
# Resume an interrupted run
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run --resume
# Use a specific toolset distribution
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run --distribution=image_gen
# Configure tool failure thresholds
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run \\
--max_tool_failures=20 --max_tool_failure_rate=0.3
"""
import json
@@ -29,22 +35,94 @@ from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from multiprocessing import Pool, Manager, Lock
import traceback
import re
import fire
from run_agent import AIAgent
from toolset_distributions import (
get_distribution,
list_distributions,
get_distribution,
list_distributions,
sample_toolsets_from_distribution,
validate_distribution
)
from safe_print import safe_print
# Global configuration for worker processes
_WORKER_CONFIG = {}
def _extract_tool_errors_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Extract tool errors from message history with tool names.
Args:
messages (List[Dict]): Message history
Returns:
List[Dict]: List of tool errors with tool name, error message, and context
"""
tool_errors = []
tool_calls_map = {} # Map tool_call_id to tool name
for msg in messages:
# Track tool calls from assistant messages
if msg["role"] == "assistant" and "tool_calls" in msg and msg["tool_calls"]:
for tool_call in msg["tool_calls"]:
tool_name = tool_call["function"]["name"]
tool_call_id = tool_call["id"]
tool_calls_map[tool_call_id] = tool_name
# Check tool responses for errors
elif msg["role"] == "tool":
tool_call_id = msg.get("tool_call_id", "")
content = msg.get("content", "")
# Determine if tool call had an error
has_error = False
error_msg = None
try:
content_json = json.loads(content) if isinstance(content, str) else content
if isinstance(content_json, dict):
# Check if error field exists AND has a non-null value
if "error" in content_json and content_json["error"] is not None:
has_error = True
error_msg = str(content_json["error"])
# Special handling for terminal tool responses
if "content" in content_json and isinstance(content_json["content"], dict):
inner_content = content_json["content"]
if inner_content.get("error") is not None or inner_content.get("exit_code", 0) != 0:
has_error = True
error_msg = inner_content.get("error") or f"Exit code: {inner_content.get('exit_code')}"
# Check for "success": false pattern
if content_json.get("success") is False:
has_error = True
if not error_msg:
error_msg = str(content_json.get("message", content_json.get("error", "Unknown error")))
except:
# If not JSON, check if content explicitly states an error
if content.strip().lower().startswith("error:"):
has_error = True
error_msg = content.strip()
# Record error if found
if has_error and tool_call_id in tool_calls_map:
tool_name = tool_calls_map[tool_call_id]
tool_errors.append({
"tool_name": tool_name,
"error_message": error_msg or "Unknown error",
"full_content": content[:500] # Keep first 500 chars of full response
})
return tool_errors
def _extract_tool_stats(messages: List[Dict[str, Any]]) -> Dict[str, Dict[str, int]]:
"""
Extract tool usage statistics from message history.
@@ -164,27 +242,32 @@ def _process_single_prompt(
enabled_toolsets=selected_toolsets,
save_trajectories=False, # We handle saving ourselves
verbose_logging=config.get("verbose", False),
ephemeral_system_prompt=config.get("ephemeral_system_prompt")
ephemeral_system_prompt=config.get("ephemeral_system_prompt"),
log_prefix_chars=config.get("log_prefix_chars", 100)
)
# Run the agent with task_id to ensure each task gets its own isolated VM
result = agent.run_conversation(prompt, task_id=f"task_{prompt_index}")
# Extract tool usage statistics
tool_stats = _extract_tool_stats(result["messages"])
# Extract tool errors from conversation
tool_errors = _extract_tool_errors_from_messages(result["messages"])
# Convert to trajectory format (using existing method)
trajectory = agent._convert_to_trajectory_format(
result["messages"],
prompt,
result["completed"]
)
return {
"success": True,
"prompt_index": prompt_index,
"trajectory": trajectory,
"tool_stats": tool_stats,
"tool_errors": tool_errors,
"completed": result["completed"],
"api_calls": result["api_calls"],
"toolsets_used": selected_toolsets,
@@ -196,14 +279,18 @@ def _process_single_prompt(
}
except Exception as e:
print(f"❌ Error processing prompt {prompt_index}: {e}")
error_msg = str(e)
tb = traceback.format_exc()
safe_print(f"[bold red]❌ Error processing prompt {prompt_index}:[/bold red] {error_msg}")
if config.get("verbose"):
traceback.print_exc()
safe_print(tb)
return {
"success": False,
"prompt_index": prompt_index,
"error": str(e),
"error": error_msg,
"traceback": tb,
"tool_errors": [],
"trajectory": None,
"tool_stats": {},
"toolsets_used": [],
@@ -253,7 +340,9 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
# Initialize aggregated stats for this batch
batch_tool_stats = {}
completed_in_batch = []
all_tool_errors = [] # Track all tool errors in this batch
exception_errors = [] # Track top-level exceptions
# Process each prompt sequentially in this batch
for prompt_index, prompt_data in prompts_to_process:
# Process the prompt
@@ -263,7 +352,26 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
batch_num,
config
)
# Track tool errors from the conversation
if result.get("tool_errors"):
for tool_error in result["tool_errors"]:
all_tool_errors.append({
"prompt_index": prompt_index,
"tool_name": tool_error["tool_name"],
"error_message": tool_error["error_message"],
"full_content": tool_error.get("full_content", "")
})
# Track top-level exceptions (not tool errors)
if not result["success"]:
exception_errors.append({
"prompt_index": prompt_index,
"error": result.get("error", "Unknown error"),
"traceback": result.get("traceback", "")
})
safe_print(f"[bold red]❌ Exception in prompt {prompt_index}:[/bold red] {result.get('error', '')[:100]}")
# Save trajectory if successful
if result["success"] and result["trajectory"]:
trajectory_entry = {
@@ -274,7 +382,7 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
"api_calls": result["api_calls"],
"toolsets_used": result["toolsets_used"]
}
# Append to batch output file
with open(batch_output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(trajectory_entry, ensure_ascii=False) + "\n")
@@ -302,7 +410,9 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
"processed": len(prompts_to_process),
"skipped": len(batch_data) - len(prompts_to_process),
"tool_stats": batch_tool_stats,
"completed_prompts": completed_in_batch
"completed_prompts": completed_in_batch,
"tool_errors": all_tool_errors,
"exception_errors": exception_errors
}
@@ -323,11 +433,15 @@ class BatchRunner:
model: str = "claude-opus-4-20250514",
num_workers: int = 4,
verbose: bool = False,
ephemeral_system_prompt: str = None
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
max_tool_failures: int = 10,
max_tool_failure_rate: float = 0.5,
keep_recent_errors: int = 5,
):
"""
Initialize the batch runner.
Args:
dataset_file (str): Path to the dataset JSONL file with 'prompt' field
batch_size (int): Number of prompts per batch
@@ -340,6 +454,10 @@ class BatchRunner:
num_workers (int): Number of parallel workers
verbose (bool): Enable verbose logging
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
max_tool_failures (int): Maximum number of tool failures before stopping (default: 10)
max_tool_failure_rate (float): Maximum tool failure rate (0.0-1.0) before stopping (default: 0.5)
keep_recent_errors (int): Number of recent errors to keep per tool (default: 5)
"""
self.dataset_file = Path(dataset_file)
self.batch_size = batch_size
@@ -352,6 +470,10 @@ class BatchRunner:
self.num_workers = num_workers
self.verbose = verbose
self.ephemeral_system_prompt = ephemeral_system_prompt
self.log_prefix_chars = log_prefix_chars
self.max_tool_failures = max_tool_failures
self.max_tool_failure_rate = max_tool_failure_rate
self.keep_recent_errors = keep_recent_errors
# Validate distribution
if not validate_distribution(distribution):
@@ -373,17 +495,21 @@ class BatchRunner:
# Create batches
self.batches = self._create_batches()
print(f"📊 Batch Runner Initialized")
print(f" Dataset: {self.dataset_file} ({len(self.dataset)} prompts)")
print(f" Batch size: {self.batch_size}")
print(f" Total batches: {len(self.batches)}")
print(f" Run name: {self.run_name}")
print(f" Distribution: {self.distribution}")
print(f" Output directory: {self.output_dir}")
print(f" Workers: {self.num_workers}")
safe_print("[bold cyan]📊 Batch Runner Initialized[/bold cyan]")
safe_print(f" Dataset: {self.dataset_file} ({len(self.dataset)} prompts)")
safe_print(f" Batch size: {self.batch_size}")
safe_print(f" Total batches: {len(self.batches)}")
safe_print(f" Run name: {self.run_name}")
safe_print(f" Distribution: {self.distribution}")
safe_print(f" Output directory: {self.output_dir}")
safe_print(f" Workers: {self.num_workers}")
safe_print(f" [yellow]Tool failure limits:[/yellow]")
safe_print(f" Max failures: {self.max_tool_failures}")
safe_print(f" Max failure rate: {self.max_tool_failure_rate:.1%}")
safe_print(f" Keep recent errors: {self.keep_recent_errors}")
if self.ephemeral_system_prompt:
prompt_preview = self.ephemeral_system_prompt[:60] + "..." if len(self.ephemeral_system_prompt) > 60 else self.ephemeral_system_prompt
print(f" 🔒 Ephemeral system prompt: '{prompt_preview}'")
safe_print(f" 🔒 Ephemeral system prompt: '{prompt_preview}'")
def _load_dataset(self) -> List[Dict[str, Any]]:
"""
@@ -461,20 +587,83 @@ class BatchRunner:
def _save_checkpoint(self, checkpoint_data: Dict[str, Any], lock: Optional[Lock] = None):
"""
Save checkpoint data.
Args:
checkpoint_data (Dict): Checkpoint data to save
lock (Lock): Optional lock for thread-safe access
"""
checkpoint_data["last_updated"] = datetime.now().isoformat()
if lock:
with lock:
with open(self.checkpoint_file, 'w', encoding='utf-8') as f:
json.dump(checkpoint_data, f, indent=2)
json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
else:
with open(self.checkpoint_file, 'w', encoding='utf-8') as f:
json.dump(checkpoint_data, f, indent=2)
json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
def _consolidate_data(self, num_batches: int, tool_stats: Dict[str, Dict[str, int]],
start_time: float, tool_errors_by_tool: Dict[str, List[Dict]],
exception_errors: List[Dict], early_exit: bool = False, exit_reason: str = None):
"""
Consolidate batch data into trajectories.jsonl and save statistics.
Args:
num_batches (int): Number of batches processed
tool_stats (Dict): Aggregated tool statistics
start_time (float): Start time of the run
tool_errors_by_tool (Dict): Tool errors grouped by tool name with k most recent
exception_errors (List): Top-level exceptions
early_exit (bool): Whether this is an early exit
exit_reason (str): Reason for early exit
"""
# Combine all batch files into a single trajectories.jsonl file
combined_file = self.output_dir / "trajectories.jsonl"
safe_print(f"\n[cyan]📦 Combining batch files into {combined_file.name}...[/cyan]")
entries_written = 0
with open(combined_file, 'w', encoding='utf-8') as outfile:
for batch_num in range(num_batches):
batch_file = self.output_dir / f"batch_{batch_num}.jsonl"
if batch_file.exists():
with open(batch_file, 'r', encoding='utf-8') as infile:
for line in infile:
outfile.write(line)
entries_written += 1
safe_print(f"[green]✅ Combined {num_batches} batch files into trajectories.jsonl ({entries_written} entries)[/green]")
# Calculate success rates for tool stats
for tool_name in tool_stats:
stats = tool_stats[tool_name]
total_calls = stats["success"] + stats["failure"]
if total_calls > 0:
stats["success_rate"] = round(stats["success"] / total_calls * 100, 2)
stats["failure_rate"] = round(stats["failure"] / total_calls * 100, 2)
else:
stats["success_rate"] = 0.0
stats["failure_rate"] = 0.0
# Save final statistics
final_stats = {
"run_name": self.run_name,
"distribution": self.distribution,
"total_prompts": len(self.dataset),
"total_batches": len(self.batches),
"batches_processed": num_batches,
"batch_size": self.batch_size,
"model": self.model,
"completed_at": datetime.now().isoformat(),
"duration_seconds": round(time.time() - start_time, 2),
"early_exit": early_exit,
"exit_reason": exit_reason,
"tool_errors": tool_errors_by_tool,
"exception_errors": exception_errors[:self.keep_recent_errors], # Keep k most recent
"tool_statistics": tool_stats
}
with open(self.stats_file, 'w', encoding='utf-8') as f:
json.dump(final_stats, f, indent=2, ensure_ascii=False)
def run(self, resume: bool = False):
@@ -507,7 +696,8 @@ class BatchRunner:
"base_url": self.base_url,
"api_key": self.api_key,
"verbose": self.verbose,
"ephemeral_system_prompt": self.ephemeral_system_prompt
"ephemeral_system_prompt": self.ephemeral_system_prompt,
"log_prefix_chars": self.log_prefix_chars
}
# Get completed prompts set
@@ -515,9 +705,16 @@ class BatchRunner:
# Aggregate statistics across all batches
total_tool_stats = {}
tool_errors_by_tool = {} # {tool_name: [list of k most recent errors]}
all_exception_errors = []
all_completed_prompts = list(completed_prompts_set)
total_processed = len(completed_prompts_set)
total_tool_errors = 0
early_exit = False
exit_reason = None
start_time = time.time()
# Process batches in parallel
with Pool(processes=self.num_workers) as pool:
# Create tasks for each batch
@@ -531,84 +728,147 @@ class BatchRunner:
)
for batch_num, batch_data in enumerate(self.batches)
]
# Use map to process batches in parallel
results = pool.map(_process_batch_worker, tasks)
# Aggregate all batch statistics and update checkpoint
all_completed_prompts = list(completed_prompts_set)
for batch_result in results:
# Add newly completed prompts
all_completed_prompts.extend(batch_result.get("completed_prompts", []))
# Aggregate tool stats
for tool_name, stats in batch_result.get("tool_stats", {}).items():
if tool_name not in total_tool_stats:
total_tool_stats[tool_name] = {
"count": 0,
"success": 0,
"failure": 0
}
total_tool_stats[tool_name]["count"] += stats["count"]
total_tool_stats[tool_name]["success"] += stats["success"]
total_tool_stats[tool_name]["failure"] += stats["failure"]
# Process batches and check tool failure threshold after each batch
for batch_num, task in enumerate(tasks):
# Process single batch
result = pool.apply(_process_batch_worker, (task,))
# Update statistics
all_completed_prompts.extend(result.get("completed_prompts", []))
total_processed += result.get("processed", 0)
# Aggregate tool stats
for tool_name, stats in result.get("tool_stats", {}).items():
if tool_name not in total_tool_stats:
total_tool_stats[tool_name] = {
"count": 0,
"success": 0,
"failure": 0
}
total_tool_stats[tool_name]["count"] += stats["count"]
total_tool_stats[tool_name]["success"] += stats["success"]
total_tool_stats[tool_name]["failure"] += stats["failure"]
# Aggregate tool errors (keep k most recent per tool)
for tool_error in result.get("tool_errors", []):
tool_name = tool_error["tool_name"]
if tool_name not in tool_errors_by_tool:
tool_errors_by_tool[tool_name] = []
# Add error and keep only k most recent
tool_errors_by_tool[tool_name].append(tool_error)
if len(tool_errors_by_tool[tool_name]) > self.keep_recent_errors:
tool_errors_by_tool[tool_name] = tool_errors_by_tool[tool_name][-self.keep_recent_errors:]
total_tool_errors += 1
# Track exception errors
all_exception_errors.extend(result.get("exception_errors", []))
# Check tool failure thresholds
if total_processed > 0:
tool_failure_rate = total_tool_errors / total_processed
# Check absolute count threshold
if total_tool_errors >= self.max_tool_failures:
early_exit = True
exit_reason = f"Exceeded maximum tool failures ({total_tool_errors}/{self.max_tool_failures})"
safe_print(f"\n[bold red]🛑 STOPPING: {exit_reason}[/bold red]")
break
# Check rate threshold
if tool_failure_rate >= self.max_tool_failure_rate:
early_exit = True
exit_reason = f"Exceeded tool failure rate ({tool_failure_rate:.2%} >= {self.max_tool_failure_rate:.2%})"
safe_print(f"\n[bold red]🛑 STOPPING: {exit_reason}[/bold red]")
break
# Update checkpoint after each batch
checkpoint_data["completed_prompts"] = all_completed_prompts
self._save_checkpoint(checkpoint_data)
# Save final checkpoint
checkpoint_data["completed_prompts"] = all_completed_prompts
self._save_checkpoint(checkpoint_data)
# Calculate success rates
for tool_name in total_tool_stats:
stats = total_tool_stats[tool_name]
total_calls = stats["success"] + stats["failure"]
if total_calls > 0:
stats["success_rate"] = round(stats["success"] / total_calls * 100, 2)
stats["failure_rate"] = round(stats["failure"] / total_calls * 100, 2)
else:
stats["success_rate"] = 0.0
stats["failure_rate"] = 0.0
# Combine all batch files into a single trajectories.jsonl file
combined_file = self.output_dir / "trajectories.jsonl"
print(f"\n📦 Combining batch files into {combined_file.name}...")
with open(combined_file, 'w', encoding='utf-8') as outfile:
for batch_num in range(len(self.batches)):
batch_file = self.output_dir / f"batch_{batch_num}.jsonl"
if batch_file.exists():
with open(batch_file, 'r', encoding='utf-8') as infile:
for line in infile:
outfile.write(line)
print(f"✅ Combined {len(self.batches)} batch files into trajectories.jsonl")
# Save final statistics
final_stats = {
"run_name": self.run_name,
"distribution": self.distribution,
"total_prompts": len(self.dataset),
"total_batches": len(self.batches),
"batch_size": self.batch_size,
"model": self.model,
"completed_at": datetime.now().isoformat(),
"duration_seconds": round(time.time() - start_time, 2),
"tool_statistics": total_tool_stats
}
with open(self.stats_file, 'w', encoding='utf-8') as f:
json.dump(final_stats, f, indent=2)
# Consolidate data and save statistics
num_batches_processed = batch_num + 1 if early_exit else len(self.batches)
self._consolidate_data(
num_batches_processed,
total_tool_stats,
start_time,
tool_errors_by_tool,
all_exception_errors,
early_exit,
exit_reason
)
# Print summary
print("\n" + "=" * 70)
print("📊 BATCH PROCESSING COMPLETE")
print("=" * 70)
print(f"✅ Total prompts processed: {len(self.dataset)}")
print(f"✅ Total batches: {len(self.batches)}")
print(f"⏱️ Total duration: {round(time.time() - start_time, 2)}s")
print(f"\n📈 Tool Usage Statistics:")
print("-" * 70)
safe_print("\n" + "=" * 70)
if early_exit:
safe_print("[bold yellow]⚠️ BATCH PROCESSING STOPPED EARLY[/bold yellow]")
safe_print(f"[yellow]Reason: {exit_reason}[/yellow]")
else:
safe_print("[bold green]📊 BATCH PROCESSING COMPLETE[/bold green]")
safe_print("=" * 70)
safe_print(f"✅ Total prompts processed: {total_processed}")
safe_print(f"✅ Batches completed: {num_batches_processed}/{len(self.batches)}")
safe_print(f"⏱️ Total duration: {round(time.time() - start_time, 2)}s")
# Tool error summary
if tool_errors_by_tool:
total_errors = sum(len(errors) for errors in tool_errors_by_tool.values())
safe_print(f"\n[bold red]🚨 Tool Errors: {total_tool_errors} total ({len(tool_errors_by_tool)} tools)[/bold red]")
safe_print("[red]-[/red]" * 70)
# Sort tools by error count
sorted_tools = sorted(
tool_errors_by_tool.items(),
key=lambda x: len(x[1]),
reverse=True
)
for tool_name, errors in sorted_tools:
# Count unique error messages
unique_errors = {}
for error in errors:
error_msg = error["error_message"][:100] # Truncate for grouping
if error_msg not in unique_errors:
unique_errors[error_msg] = []
unique_errors[error_msg].append(error)
safe_print(f"\n [red]{tool_name}:[/red] {len(errors)} errors ({len(unique_errors)} unique)")
# Show up to 3 most recent unique error types
for idx, (error_msg, instances) in enumerate(list(unique_errors.items())[:3]):
error_preview = error_msg if len(error_msg) <= 100 else error_msg[:97] + "..."
safe_print(f" [{idx+1}] [dim]{error_preview}[/dim] (x{len(instances)})")
# Show one example with prompt index
example = instances[-1] # Most recent
safe_print(f" [dim]Prompt {example['prompt_index']}[/dim]")
if len(unique_errors) > 3:
safe_print(f" [dim]... and {len(unique_errors) - 3} more error types[/dim]")
tool_failure_rate = total_tool_errors / total_processed if total_processed > 0 else 0
safe_print(f"\n [red]Tool failure rate: {tool_failure_rate:.2%}[/red]")
# Exception errors
if all_exception_errors:
safe_print(f"\n[bold red]💥 Top-level Exceptions: {len(all_exception_errors)}[/bold red]")
safe_print("[red]-[/red]" * 70)
for error in all_exception_errors[:self.keep_recent_errors]:
error_preview = error["error"][:100]
if len(error["error"]) > 100:
error_preview += "..."
safe_print(f" Prompt {error['prompt_index']}: [dim]{error_preview}[/dim]")
safe_print(f"\n[cyan]📈 Tool Usage Statistics:[/cyan]")
safe_print("-" * 70)
if total_tool_stats:
# Sort by count descending
sorted_tools = sorted(
@@ -616,25 +876,30 @@ class BatchRunner:
key=lambda x: x[1]["count"],
reverse=True
)
print(f"{'Tool Name':<25} {'Count':<10} {'Success':<10} {'Failure':<10} {'Success Rate':<12}")
print("-" * 70)
safe_print(f"{'Tool Name':<25} {'Count':<10} {'Success':<10} {'Failure':<10} {'Success Rate':<12}")
safe_print("-" * 70)
for tool_name, stats in sorted_tools:
print(
safe_print(
f"{tool_name:<25} "
f"{stats['count']:<10} "
f"{stats['success']:<10} "
f"{stats['failure']:<10} "
f"{stats['success_rate']:.1f}%"
f"{stats.get('success_rate', 0):.1f}%"
)
else:
print("No tool calls were made during this run.")
print(f"\n💾 Results saved to: {self.output_dir}")
print(f" - Trajectories: trajectories.jsonl (combined)")
print(f" - Individual batches: batch_*.jsonl (for debugging)")
print(f" - Statistics: {self.stats_file.name}")
print(f" - Checkpoint: {self.checkpoint_file.name}")
safe_print("No tool calls were made during this run.")
safe_print(f"\n[cyan]💾 Results saved to:[/cyan] {self.output_dir}")
safe_print(f" - Trajectories: trajectories.jsonl (combined)")
safe_print(f" - Individual batches: batch_*.jsonl (for debugging)")
safe_print(f" - Statistics: {self.stats_file.name}")
safe_print(f" - Checkpoint: {self.checkpoint_file.name}")
if early_exit:
safe_print(f"\n[bold yellow] Run was stopped early due to tool failures.[/bold yellow]")
safe_print(f"[yellow] Check {self.stats_file.name} for detailed error information including tracebacks.[/yellow]")
safe_print(f"[yellow] You can resume this run later with --resume flag.[/yellow]")
def main(
@@ -650,11 +915,15 @@ def main(
resume: bool = False,
verbose: bool = False,
list_distributions: bool = False,
ephemeral_system_prompt: str = None
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
max_tool_failures: int = 10,
max_tool_failure_rate: float = 0.5,
keep_recent_errors: int = 5,
):
"""
Run batch processing of agent prompts from a dataset.
Args:
dataset_file (str): Path to JSONL file with 'prompt' field in each entry
batch_size (int): Number of prompts per batch
@@ -669,7 +938,11 @@ def main(
verbose (bool): Enable verbose logging (default: False)
list_distributions (bool): List available toolset distributions and exit
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
max_tool_failures (int): Maximum number of tool failures before stopping (default: 10)
max_tool_failure_rate (float): Maximum tool failure rate (0.0-1.0) before stopping (default: 0.5)
keep_recent_errors (int): Number of recent errors to keep per tool for reporting (default: 5)
Examples:
# Basic usage
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run
@@ -683,7 +956,11 @@ def main(
# With ephemeral system prompt (not saved to dataset)
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run \\
--ephemeral_system_prompt="You are a helpful assistant focused on image generation."
# With custom tool failure thresholds
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run \\
--max_tool_failures=20 --max_tool_failure_rate=0.3 --keep_recent_errors=10
# List available distributions
python batch_runner.py --list_distributions
"""
@@ -729,9 +1006,13 @@ def main(
model=model,
num_workers=num_workers,
verbose=verbose,
ephemeral_system_prompt=ephemeral_system_prompt
ephemeral_system_prompt=ephemeral_system_prompt,
log_prefix_chars=log_prefix_chars,
max_tool_failures=max_tool_failures,
max_tool_failure_rate=max_tool_failure_rate,
keep_recent_errors=keep_recent_errors
)
runner.run(resume=resume)
except Exception as e:

View File

@@ -478,7 +478,7 @@ def handle_web_function_call(function_name: str, function_args: Dict[str, Any])
return asyncio.run(web_crawl_tool(url, instructions, "basic"))
else:
return json.dumps({"error": f"Unknown web function: {function_name}"})
return json.dumps({"error": f"Unknown web function: {function_name}"}, ensure_ascii=False)
def handle_terminal_function_call(function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None) -> str:
"""
@@ -502,7 +502,7 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A
return terminal_tool(command, input_keys, None, background, idle_threshold, timeout, task_id)
else:
return json.dumps({"error": f"Unknown terminal function: {function_name}"})
return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False)
def handle_vision_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
@@ -526,7 +526,7 @@ def handle_vision_function_call(function_name: str, function_args: Dict[str, Any
return asyncio.run(vision_analyze_tool(image_url, full_prompt, "gemini-2.5-flash"))
else:
return json.dumps({"error": f"Unknown vision function: {function_name}"})
return json.dumps({"error": f"Unknown vision function: {function_name}"}, ensure_ascii=False)
def handle_moa_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
@@ -544,13 +544,13 @@ def handle_moa_function_call(function_name: str, function_args: Dict[str, Any])
user_prompt = function_args.get("user_prompt", "")
if not user_prompt:
return json.dumps({"error": "user_prompt is required for MoA processing"})
return json.dumps({"error": "user_prompt is required for MoA processing"}, ensure_ascii=False)
# Run async function in event loop
return asyncio.run(mixture_of_agents_tool(user_prompt=user_prompt))
else:
return json.dumps({"error": f"Unknown MoA function: {function_name}"})
return json.dumps({"error": f"Unknown MoA function: {function_name}"}, ensure_ascii=False)
def handle_image_function_call(function_name: str, function_args: Dict[str, Any]) -> str:
@@ -568,7 +568,7 @@ def handle_image_function_call(function_name: str, function_args: Dict[str, Any]
prompt = function_args.get("prompt", "")
if not prompt:
return json.dumps({"success": False, "image": None})
return json.dumps({"success": False, "image": None}, ensure_ascii=False)
image_size = function_args.get("image_size", "landscape_16_9")
@@ -612,7 +612,7 @@ def handle_image_function_call(function_name: str, function_args: Dict[str, Any]
return result
else:
return json.dumps({"error": f"Unknown image generation function: {function_name}"})
return json.dumps({"error": f"Unknown image generation function: {function_name}"}, ensure_ascii=False)
def handle_function_call(function_name: str, function_args: Dict[str, Any], task_id: Optional[str] = None) -> str:
@@ -658,12 +658,13 @@ def handle_function_call(function_name: str, function_args: Dict[str, Any], task
else:
error_msg = f"Unknown function: {function_name}"
print(f"{error_msg}")
return json.dumps({"error": error_msg})
return json.dumps({"error": error_msg}, ensure_ascii=False)
except Exception as e:
error_msg = f"Error executing {function_name}: {str(e)}"
print(f"{error_msg}")
return json.dumps({"error": error_msg})
return json.dumps({"error": error_msg}, ensure_ascii=False)
def get_available_toolsets() -> Dict[str, Dict[str, Any]]:
"""

View File

@@ -65,7 +65,8 @@ class AIAgent:
disabled_toolsets: List[str] = None,
save_trajectories: bool = False,
verbose_logging: bool = False,
ephemeral_system_prompt: str = None
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
):
"""
Initialize the AI Agent.
@@ -81,6 +82,7 @@ class AIAgent:
save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False)
verbose_logging (bool): Enable verbose logging for debugging (default: False)
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
"""
self.model = model
self.max_iterations = max_iterations
@@ -88,6 +90,7 @@ class AIAgent:
self.save_trajectories = save_trajectories
self.verbose_logging = verbose_logging
self.ephemeral_system_prompt = ephemeral_system_prompt
self.log_prefix_chars = log_prefix_chars
# Store toolset filtering options
self.enabled_toolsets = enabled_toolsets
@@ -190,7 +193,7 @@ class AIAgent:
}
formatted_tools.append(formatted_tool)
return json.dumps(formatted_tools)
return json.dumps(formatted_tools, ensure_ascii=False)
def _convert_to_trajectory_format(self, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]:
"""
@@ -251,7 +254,7 @@ class AIAgent:
"name": tool_call["function"]["name"],
"arguments": json.loads(tool_call["function"]["arguments"]) if isinstance(tool_call["function"]["arguments"], str) else tool_call["function"]["arguments"]
}
content += f"<tool_call>\n{json.dumps(tool_call_json)}\n</tool_call>\n"
content += f"<tool_call>\n{json.dumps(tool_call_json, ensure_ascii=False)}\n</tool_call>\n"
trajectory.append({
"from": "gpt",
@@ -278,7 +281,7 @@ class AIAgent:
"tool_call_id": tool_msg.get("tool_call_id", ""),
"name": msg["tool_calls"][len(tool_responses)]["function"]["name"] if len(tool_responses) < len(msg["tool_calls"]) else "unknown",
"content": tool_content
})
}, ensure_ascii=False)
tool_response += "\n</tool_response>"
tool_responses.append(tool_response)
j += 1
@@ -474,7 +477,10 @@ class AIAgent:
print(f"❌ Invalid JSON in tool call arguments: {e}")
function_args = {}
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())})")
# Preview tool call arguments
args_str = json.dumps(function_args, ensure_ascii=False)
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())}) - {args_preview}")
tool_start_time = time.time()
@@ -483,19 +489,21 @@ class AIAgent:
tool_duration = time.time() - tool_start_time
result_preview = function_result[:200] if len(function_result) > 200 else function_result
if self.verbose_logging:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result preview: {result_preview}...")
# Add tool result to conversation
messages.append({
"role": "tool",
"content": function_result,
"tool_call_id": tool_call.id
})
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s")
# Preview tool response
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i} completed in {tool_duration:.2f}s - {response_preview}")
# Delay between tool calls
if self.tool_delay > 0 and i < len(assistant_message.tool_calls):
@@ -577,7 +585,7 @@ class AIAgent:
def main(
query: str = None,
model: str = "claude-opus-4-20250514",
model: str = "claude-opus-4-20250514",
api_key: str = None,
base_url: str = "https://api.anthropic.com/v1/",
max_turns: int = 10,
@@ -585,25 +593,27 @@ def main(
disabled_toolsets: str = None,
list_tools: bool = False,
save_trajectories: bool = False,
verbose: bool = False
verbose: bool = False,
log_prefix_chars: int = 20
):
"""
Main function for running the agent directly.
Args:
query (str): Natural language query for the agent. Defaults to Python 3.13 example.
model (str): Model name to use. Defaults to claude-opus-4-20250514.
api_key (str): API key for authentication. Uses ANTHROPIC_API_KEY env var if not provided.
base_url (str): Base URL for the model API. Defaults to https://api.anthropic.com/v1/
max_turns (int): Maximum number of API call iterations. Defaults to 10.
enabled_toolsets (str): Comma-separated list of toolsets to enable. Supports predefined
toolsets (e.g., "research", "development", "safe").
enabled_toolsets (str): Comma-separated list of toolsets to enable. Supports predefined
toolsets (e.g., "research", "development", "safe").
Multiple toolsets can be combined: "web,vision"
disabled_toolsets (str): Comma-separated list of toolsets to disable (e.g., "terminal")
list_tools (bool): Just list available tools and exit
save_trajectories (bool): Save conversation trajectories to JSONL files. Defaults to False.
verbose (bool): Enable verbose logging for debugging. Defaults to False.
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses. Defaults to 20.
Toolset Examples:
- "research": Web search, extract, crawl + vision tools
"""
@@ -720,7 +730,8 @@ def main(
enabled_toolsets=enabled_toolsets_list,
disabled_toolsets=disabled_toolsets_list,
save_trajectories=save_trajectories,
verbose_logging=verbose
verbose_logging=verbose,
log_prefix_chars=log_prefix_chars
)
except RuntimeError as e:
print(f"❌ Failed to initialize agent: {e}")

View File

@@ -1,7 +1,7 @@
python batch_runner.py \
--dataset_file="hermes-agent-megascience-data/hermes_agent_megascience_eval.jsonl" \
--batch_size=10 \
--run_name="megascience_eval_glm4-6-fixedterminal" \
--run_name="megascience_eval_glm4-6-fixedterminal-2" \
--distribution="science" \
--model="z-ai/glm-4.6" \
--base_url="https://openrouter.ai/api/v1" \
@@ -9,4 +9,4 @@ python batch_runner.py \
--num_workers=5 \
--max_turns=30 \
--verbose \
--ephemeral_system_prompt="You have access to a variety of tools to help you solve scientific, math, and technology problems presented to you. You can use them in sequence and build off of the results of prior tools you've used results. Always use a tool if it can provide additional context, verify formulas, double check concepts and recent studies and understanding, doing all calculations, etc. You should only be confident in your own reasoning, knowledge, or calculations if you've exhaustively used all tools available to you to that can help you verify or validate your work."
--ephemeral_system_prompt="You have access to a variety of tools to help you solve scientific, math, and technology problems presented to you. You can use them in sequence and build off of the results of prior tools you've used results. Always use a tool if it can provide additional context, verify formulas, double check concepts and recent studies and understanding, doing all calculations, etc. You should only be confident in your own reasoning, knowledge, or calculations if you've exhaustively used all tools available to you to that can help you verify or validate your work. Always pip install any packages you need to use the python scripts you want to run."

20
safe_print.py Normal file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""Simple safe print that tries rich, falls back to regular print."""
try:
from rich import print as rich_print
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
def safe_print(*args, **kwargs):
"""Try rich.print, fall back to regular print if it fails."""
if RICH_AVAILABLE:
try:
rich_print(*args, **kwargs)
return
except Exception:
pass
# Fallback to regular print
print(*args, **kwargs)

View File

@@ -24,7 +24,7 @@ def create_test_dataset():
with open(test_file, 'w') as f:
for prompt in prompts:
f.write(json.dumps(prompt) + "\n")
f.write(json.dumps(prompt, ensure_ascii=False) + "\n")
print(f"✅ Created test dataset: {test_file}")
return test_file

View File

@@ -0,0 +1,424 @@
#!/usr/bin/env python3
"""
Test script to verify checkpoint behavior in batch_runner.py
This script simulates batch processing with intentional failures to test:
1. Whether checkpoints are saved incrementally during processing
2. Whether resume functionality works correctly after interruption
3. Whether data integrity is maintained across checkpoint cycles
Usage:
# Test current implementation
python tests/test_checkpoint_resumption.py --test_current
# Test after fix is applied
python tests/test_checkpoint_resumption.py --test_fixed
# Run full comparison
python tests/test_checkpoint_resumption.py --compare
"""
import json
import os
import shutil
import sys
import time
import signal
from pathlib import Path
from typing import List, Dict, Any
import traceback
# Add parent directory to path to import batch_runner
sys.path.insert(0, str(Path(__file__).parent.parent))
def create_test_dataset(num_prompts: int = 20) -> Path:
"""Create a small test dataset for checkpoint testing."""
test_data_dir = Path("tests/test_data")
test_data_dir.mkdir(parents=True, exist_ok=True)
dataset_file = test_data_dir / "checkpoint_test_dataset.jsonl"
with open(dataset_file, 'w', encoding='utf-8') as f:
for i in range(num_prompts):
entry = {
"prompt": f"Test prompt {i}: What is 2+2? Just answer briefly.",
"test_id": i
}
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
print(f"✅ Created test dataset: {dataset_file} ({num_prompts} prompts)")
return dataset_file
def monitor_checkpoint_during_run(checkpoint_file: Path, duration: int = 30) -> List[Dict[str, Any]]:
"""
Monitor checkpoint file during a batch run to see when it gets updated.
Args:
checkpoint_file: Path to checkpoint file to monitor
duration: How long to monitor (seconds)
Returns:
List of checkpoint snapshots with timestamps
"""
snapshots = []
start_time = time.time()
last_mtime = None
print(f"\n🔍 Monitoring checkpoint file: {checkpoint_file}")
print(f" Duration: {duration}s")
print("-" * 70)
while time.time() - start_time < duration:
if checkpoint_file.exists():
current_mtime = checkpoint_file.stat().st_mtime
# Check if file was modified
if last_mtime is None or current_mtime != last_mtime:
elapsed = time.time() - start_time
try:
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
snapshot = {
"elapsed_seconds": round(elapsed, 2),
"completed_count": len(checkpoint_data.get("completed_prompts", [])),
"completed_prompts": checkpoint_data.get("completed_prompts", [])[:5], # First 5 for display
"timestamp": checkpoint_data.get("last_updated")
}
snapshots.append(snapshot)
print(f"[{elapsed:6.2f}s] Checkpoint updated: {snapshot['completed_count']} prompts completed")
except Exception as e:
print(f"[{elapsed:6.2f}s] Error reading checkpoint: {e}")
last_mtime = current_mtime
else:
if len(snapshots) == 0:
print(f"[{time.time() - start_time:6.2f}s] Checkpoint file not yet created...")
time.sleep(0.5) # Check every 0.5 seconds
return snapshots
def test_current_implementation():
"""Test the current checkpoint implementation."""
print("\n" + "=" * 70)
print("TEST 1: Current Implementation - Checkpoint Timing")
print("=" * 70)
print("\n📝 Testing whether checkpoints are saved incrementally during run...")
# Setup
dataset_file = create_test_dataset(num_prompts=12)
run_name = "checkpoint_test_current"
output_dir = Path("data") / run_name
# Clean up any existing test data
if output_dir.exists():
shutil.rmtree(output_dir)
# Import here to avoid issues if module changes
from batch_runner import BatchRunner
checkpoint_file = output_dir / "checkpoint.json"
# Start monitoring in a separate process would be ideal, but for simplicity
# we'll just check before and after
print(f"\n▶️ Starting batch run...")
print(f" Dataset: {dataset_file}")
print(f" Batch size: 3 (4 batches total)")
print(f" Workers: 2")
print(f" Expected behavior: If incremental, checkpoint should update during run")
start_time = time.time()
try:
runner = BatchRunner(
dataset_file=str(dataset_file),
batch_size=3,
run_name=run_name,
distribution="default",
max_iterations=3, # Keep it short
model="claude-opus-4-20250514",
num_workers=2,
verbose=False
)
# Run with monitoring
import threading
snapshots = []
def monitor():
nonlocal snapshots
snapshots = monitor_checkpoint_during_run(checkpoint_file, duration=60)
monitor_thread = threading.Thread(target=monitor, daemon=True)
monitor_thread.start()
runner.run(resume=False)
monitor_thread.join(timeout=2)
except Exception as e:
print(f"❌ Error during run: {e}")
traceback.print_exc()
return False
elapsed = time.time() - start_time
# Analyze results
print("\n" + "=" * 70)
print("📊 TEST RESULTS")
print("=" * 70)
print(f"Total run time: {elapsed:.2f}s")
print(f"Checkpoint updates observed: {len(snapshots)}")
if len(snapshots) == 0:
print("\n❌ ISSUE: No checkpoint updates observed during run")
print(" This suggests checkpoints are only saved at the end")
return False
elif len(snapshots) == 1:
print("\n⚠️ WARNING: Only 1 checkpoint update (likely at the end)")
print(" This confirms the bug - no incremental checkpointing")
return False
else:
print(f"\n✅ GOOD: Multiple checkpoint updates ({len(snapshots)}) observed")
print(" Checkpointing appears to be incremental")
# Show timeline
print("\n📈 Checkpoint Timeline:")
for i, snapshot in enumerate(snapshots, 1):
print(f" {i}. [{snapshot['elapsed_seconds']:6.2f}s] "
f"{snapshot['completed_count']} prompts completed")
return True
def test_interruption_and_resume():
"""Test that resume actually works after interruption."""
print("\n" + "=" * 70)
print("TEST 2: Interruption and Resume")
print("=" * 70)
print("\n📝 Testing whether resume works after manual interruption...")
# Setup
dataset_file = create_test_dataset(num_prompts=15)
run_name = "checkpoint_test_resume"
output_dir = Path("data") / run_name
# Clean up any existing test data
if output_dir.exists():
shutil.rmtree(output_dir)
from batch_runner import BatchRunner
checkpoint_file = output_dir / "checkpoint.json"
print(f"\n▶️ Starting first run (will process 5 prompts, then simulate interruption)...")
try:
# Create a modified dataset with only first 5 prompts for initial run
temp_dataset = Path("tests/test_data/checkpoint_test_resume_partial.jsonl")
with open(dataset_file, 'r') as f:
lines = f.readlines()[:5]
with open(temp_dataset, 'w') as f:
f.writelines(lines)
runner = BatchRunner(
dataset_file=str(temp_dataset),
batch_size=2,
run_name=run_name,
distribution="default",
max_iterations=3,
model="claude-opus-4-20250514",
num_workers=1,
verbose=False
)
runner.run(resume=False)
# Check checkpoint after first run
if not checkpoint_file.exists():
print("❌ ERROR: Checkpoint file not created after first run")
return False
with open(checkpoint_file, 'r') as f:
checkpoint_data = json.load(f)
initial_completed = len(checkpoint_data.get("completed_prompts", []))
print(f"✅ First run completed: {initial_completed} prompts saved to checkpoint")
# Now try to resume with full dataset
print(f"\n▶️ Starting resume run with full dataset (15 prompts)...")
runner2 = BatchRunner(
dataset_file=str(dataset_file),
batch_size=2,
run_name=run_name,
distribution="default",
max_iterations=3,
model="claude-opus-4-20250514",
num_workers=1,
verbose=False
)
runner2.run(resume=True)
# Check final checkpoint
with open(checkpoint_file, 'r') as f:
final_checkpoint = json.load(f)
final_completed = len(final_checkpoint.get("completed_prompts", []))
print("\n" + "=" * 70)
print("📊 TEST RESULTS")
print("=" * 70)
print(f"Initial completed: {initial_completed}")
print(f"Final completed: {final_completed}")
print(f"Expected: 15")
if final_completed == 15:
print("\n✅ PASS: Resume successfully completed all prompts")
return True
else:
print(f"\n❌ FAIL: Expected 15 completed, got {final_completed}")
return False
except Exception as e:
print(f"❌ Error during test: {e}")
traceback.print_exc()
return False
def test_simulated_crash():
"""Test behavior when process crashes mid-execution."""
print("\n" + "=" * 70)
print("TEST 3: Simulated Crash During Execution")
print("=" * 70)
print("\n📝 This test would require running in a subprocess and killing it...")
print(" Skipping for safety - manual testing recommended")
return None
def print_test_plan():
"""Print the detailed test and fix plan."""
print("\n" + "=" * 70)
print("CHECKPOINT FIX - DETAILED PLAN")
print("=" * 70)
print("""
📋 PROBLEM SUMMARY
------------------
Current implementation uses pool.map() which blocks until ALL batches complete.
Checkpoint is only saved after all batches finish (line 558-559).
If process crashes during batch processing:
- All progress is lost
- Resume does nothing (no incremental checkpoint was saved)
📋 PROPOSED SOLUTION
--------------------
Replace pool.map() with pool.imap_unordered() to get results as they complete.
Save checkpoint after EACH batch completes using a multiprocessing Lock.
Key changes:
1. Use Manager().Lock() for thread-safe checkpoint writes
2. Replace pool.map() with pool.imap_unordered()
3. Update checkpoint after each batch result
4. Maintain backward compatibility with existing checkpoints
📋 IMPLEMENTATION STEPS
-----------------------
1. Add Manager and Lock initialization before Pool creation
2. Pass shared checkpoint data and lock to workers (via Manager)
3. Replace pool.map() with pool.imap_unordered()
4. In result loop: save checkpoint after each batch
5. Add error handling for checkpoint write failures
📋 RISKS & MITIGATIONS
----------------------
Risk: Checkpoint file corruption if two processes write simultaneously
→ Mitigation: Use multiprocessing.Lock() for exclusive access
Risk: Performance impact from frequent checkpoint writes
→ Mitigation: Checkpoint writes are fast (small JSON), negligible impact
Risk: Breaking existing runs that are already checkpointed
→ Mitigation: Maintain checkpoint format, only change timing
Risk: Bugs in multiprocessing lock/manager code
→ Mitigation: Thorough testing with this test script
📋 TESTING STRATEGY
-------------------
1. Run test_current_implementation() - Confirm bug exists
2. Apply fix to batch_runner.py
3. Run test_current_implementation() again - Should see incremental updates
4. Run test_interruption_and_resume() - Verify resume works
5. Manual test: Start run, kill process mid-batch, resume
📋 ROLLBACK PLAN
----------------
If issues arise:
1. Git revert the changes
2. Original code is working (just missing incremental checkpoint)
3. No data corruption risk - checkpoints are write-only
""")
def main(
test_current: bool = False,
test_resume: bool = False,
test_crash: bool = False,
compare: bool = False,
show_plan: bool = False
):
"""
Run checkpoint behavior tests.
Args:
test_current: Test current implementation checkpoint timing
test_resume: Test interruption and resume functionality
test_crash: Test simulated crash scenario (manual)
compare: Run all tests and compare
show_plan: Show detailed fix plan
"""
if show_plan or (not any([test_current, test_resume, test_crash, compare])):
print_test_plan()
return
results = {}
if test_current or compare:
results['current'] = test_current_implementation()
if test_resume or compare:
results['resume'] = test_interruption_and_resume()
if test_crash or compare:
results['crash'] = test_simulated_crash()
# Summary
if results:
print("\n" + "=" * 70)
print("OVERALL TEST SUMMARY")
print("=" * 70)
for test_name, result in results.items():
if result is None:
status = "⏭️ SKIPPED"
elif result:
status = "✅ PASS"
else:
status = "❌ FAIL"
print(f"{status} - {test_name}")
if __name__ == "__main__":
import fire
fire.Fire(main)

176
tests/test_nous_api_limits.py Executable file
View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
Test script to diagnose Nous API 400 errors with gemini-2.5-flash model.
This tests various content lengths and parameters to identify what causes failures.
"""
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Initialize the Nous API client
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
MODEL = "gemini-2.5-flash"
async def test_api_call(test_name: str, content_length: int, **kwargs):
"""Test an API call with specific parameters."""
print(f"\n{'='*60}")
print(f"Test: {test_name}")
print(f"Content length: {content_length:,} characters")
print(f"Additional params: {kwargs}")
print(f"{'='*60}")
# Generate test content
content = "A" * content_length
system_prompt = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
1. Key excerpts (quotes, code snippets, important facts) in their original format
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized."""
user_prompt = f"""Please process this web content and create a comprehensive markdown summary:
CONTENT TO PROCESS:
{content}
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
**kwargs
)
result = response.choices[0].message.content
print(f"✅ SUCCESS")
print(f" Response length: {len(result)} characters")
print(f" Model used: {response.model}")
print(f" Usage: {response.usage}")
return True
except Exception as e:
print(f"❌ FAILED: {str(e)}")
return False
async def main():
"""Run all tests."""
print("Testing Nous API with gemini-2.5-flash model")
print(f"API Key present: {'Yes' if os.getenv('NOUS_API_KEY') else 'No'}")
results = {}
# Test 1: Small content (should always work)
results['small'] = await test_api_call(
"Small content (5,000 chars)",
5000,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 2: Medium content (around what was failing)
results['medium'] = await test_api_call(
"Medium content (20,000 chars)",
20000,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 3: Large content (79,625 chars like the error)
results['large'] = await test_api_call(
"Large content (79,625 chars)",
79625,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 4: Very large content (100k chars)
results['very_large'] = await test_api_call(
"Very large content (100,000 chars)",
100000,
temperature=0.1,
max_tokens=4000
)
await asyncio.sleep(1)
# Test 5: Same as working case but different max_tokens
results['diff_max_tokens'] = await test_api_call(
"Medium content with higher max_tokens",
20000,
temperature=0.1,
max_tokens=8000
)
await asyncio.sleep(1)
# Test 6: No max_tokens specified
results['no_max_tokens'] = await test_api_call(
"Medium content without max_tokens",
20000,
temperature=0.1
)
await asyncio.sleep(1)
# Test 7: With actual web content (mixed characters)
mixed_content = """
This is a test of web content with various characters:
- Unicode: 你好世界 🌍
- Special chars: <>&"'
- Numbers: 123456789
- Markdown: **bold** _italic_ `code`
- URLs: https://example.com
""" * 1000 # Repeat to make it ~79k chars
print(f"\n{'='*60}")
print(f"Test: Mixed content (real-world scenario)")
print(f"Content length: {len(mixed_content):,} characters")
print(f"{'='*60}")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "Summarize this content."},
{"role": "user", "content": mixed_content}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
results['mixed_content'] = True
except Exception as e:
print(f"❌ FAILED: {str(e)}")
results['mixed_content'] = False
# Summary
print(f"\n{'='*60}")
print("SUMMARY OF RESULTS:")
print(f"{'='*60}")
for test, passed in results.items():
status = "✅ PASS" if passed else "❌ FAIL"
print(f"{test:20s}: {status}")
passed = sum(results.values())
total = len(results)
print(f"\nTotal: {passed}/{total} tests passed")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Test to understand the pattern of failures - it's not about content length!
"""
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
MODEL = "gemini-2.5-flash"
async def quick_test(description: str, content: str, **kwargs):
"""Quick API test."""
print(f"\n{description} ({len(content):,} chars)...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "Summarize this."},
{"role": "user", "content": content}
],
**kwargs
)
print(f"✅ SUCCESS")
return True
except Exception as e:
print(f"❌ FAILED: {str(e)[:80]}")
return False
async def main():
print("Testing different content types and parameters...")
# Theory 1: Repeated characters trigger validation
print("\n" + "="*60)
print("THEORY 1: Repeated characters")
print("="*60)
await quick_test("Repeated 'A's (5k)", "A" * 5000, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("Repeated 'A's (79k)", "A" * 79625, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("Varied text (5k)", "Test content. " * 400, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("Varied text (79k)", "Test content with variety. " * 3000, temperature=0.1, max_tokens=4000)
# Theory 2: max_tokens parameter
print("\n" + "="*60)
print("THEORY 2: max_tokens parameter")
print("="*60)
content = "Test " * 4000 # 20k chars
await quick_test("max_tokens=4000", content, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("max_tokens=8000", content, temperature=0.1, max_tokens=8000)
await asyncio.sleep(0.5)
await quick_test("max_tokens=2000", content, temperature=0.1, max_tokens=2000)
await asyncio.sleep(0.5)
await quick_test("No max_tokens", content, temperature=0.1)
# Theory 3: Temperature parameter
print("\n" + "="*60)
print("THEORY 3: Temperature parameter")
print("="*60)
content = "Test " * 4000
await quick_test("temperature=0.1", content, temperature=0.1, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("temperature=0.0", content, temperature=0.0, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("temperature=0.5", content, temperature=0.5, max_tokens=4000)
await asyncio.sleep(0.5)
await quick_test("No temperature", content, max_tokens=4000)
# Theory 4: System prompt impact
print("\n" + "="*60)
print("THEORY 4: System prompt length")
print("="*60)
short_system = "Summarize this."
long_system = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
1. Key excerpts (quotes, code snippets, important facts) in their original format
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
Your goal is to preserve ALL important information while reducing length."""
content = "A" * 5000
print(f"\nShort system prompt...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": short_system},
{"role": "user", "content": content}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
except Exception as e:
print(f"❌ FAILED")
await asyncio.sleep(0.5)
print(f"Long system prompt...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": long_system},
{"role": "user", "content": content}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
except Exception as e:
print(f"❌ FAILED")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,109 @@
#!/usr/bin/env python3
"""
Test to confirm: temperature < 0.3 causes failures on Nous API
"""
import asyncio
import os
from openai import AsyncOpenAI
from dotenv import load_dotenv
load_dotenv()
nous_client = AsyncOpenAI(
api_key=os.getenv("NOUS_API_KEY"),
base_url="https://inference-api.nousresearch.com/v1"
)
MODEL = "gemini-2.5-flash"
async def test_temp(temp_value):
"""Test a specific temperature value."""
content = "Test content. " * 1000 # 14k chars
print(f"Testing temperature={temp_value}...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": "Summarize this content."},
{"role": "user", "content": content}
],
temperature=temp_value,
max_tokens=4000
)
print(f"✅ SUCCESS")
return True
except Exception as e:
print(f"❌ FAILED")
return False
async def main():
print("Testing temperature threshold for Nous API...")
print("="*60)
temps = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 1.0]
for temp in temps:
await test_temp(temp)
await asyncio.sleep(0.5)
print("="*60)
print("\nNow testing with ACTUAL web_tools.py content and parameters:")
print("="*60)
# Simulate the actual web_tools.py call
system_prompt = """You are an expert content analyst. Your job is to process web content and create a comprehensive yet concise summary that preserves all important information while dramatically reducing bulk.
Create a well-structured markdown summary that includes:
1. Key excerpts (quotes, code snippets, important facts) in their original format
2. Comprehensive summary of all other important information
3. Proper markdown formatting with headers, bullets, and emphasis
Your goal is to preserve ALL important information while reducing length. Never lose key facts, figures, insights, or actionable information. Make it scannable and well-organized."""
content = "Sample web page content. " * 3000 # ~75k chars like the real failures
user_prompt = f"""Please process this web content and create a comprehensive markdown summary:
CONTENT TO PROCESS:
{content}
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
print(f"\nActual web_tools call (temp=0.1, {len(content):,} chars)...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1,
max_tokens=4000
)
print(f"✅ SUCCESS")
except:
print(f"❌ FAILED")
await asyncio.sleep(0.5)
print(f"Same call but with temp=0.3...", end=" ")
try:
response = await nous_client.chat.completions.create(
model=MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.3,
max_tokens=4000
)
print(f"✅ SUCCESS")
except:
print(f"❌ FAILED")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -583,7 +583,7 @@ class WebToolsTester:
try:
with open(filename, 'w') as f:
json.dump(results, f, indent=2)
json.dump(results, f, indent=2, ensure_ascii=False)
print_info(f"Test results saved to: {filename}")
except Exception as e:
print_warning(f"Failed to save results: {e}")

View File

@@ -414,7 +414,7 @@ async def image_generate_tool(
_log_debug_call("image_generate_tool", debug_call_data)
_save_debug_log()
return json.dumps(response_data, indent=2)
return json.dumps(response_data, indent=2, ensure_ascii=False)
except Exception as e:
generation_time = (datetime.datetime.now() - start_time).total_seconds()
@@ -432,7 +432,7 @@ async def image_generate_tool(
_log_debug_call("image_generate_tool", debug_call_data)
_save_debug_log()
return json.dumps(response_data, indent=2)
return json.dumps(response_data, indent=2, ensure_ascii=False)
def check_fal_api_key() -> bool:

View File

@@ -410,7 +410,7 @@ async def mixture_of_agents_tool(
_log_debug_call("mixture_of_agents_tool", debug_call_data)
_save_debug_log()
return json.dumps(result, indent=2)
return json.dumps(result, indent=2, ensure_ascii=False)
except Exception as e:
error_msg = f"Error in MoA processing: {str(e)}"
@@ -436,7 +436,7 @@ async def mixture_of_agents_tool(
_log_debug_call("mixture_of_agents_tool", debug_call_data)
_save_debug_log()
return json.dumps(result, indent=2)
return json.dumps(result, indent=2, ensure_ascii=False)
def check_nous_api_key() -> bool:

View File

@@ -272,8 +272,10 @@ def terminal_tool(
"output": "",
"screen": "",
"exit_code": -1,
"error": f"Terminal tool is disabled due to import error: {import_error}"
})
"error": f"Terminal tool is disabled due to import error: {import_error}",
"status": "disabled"
}, ensure_ascii=False)
# Get configuration from environment
vm_lifetime_seconds = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300"))
@@ -287,8 +289,9 @@ def terminal_tool(
"output": "",
"screen": "",
"exit_code": -1,
"error": "MORPH_API_KEY environment variable not set"
})
"error": "MORPH_API_KEY environment variable not set",
"status": "disabled"
}, ensure_ascii=False)
# Use task_id to isolate VMs between concurrent tasks
# If no task_id provided, use "default" for backward compatibility
@@ -364,15 +367,16 @@ def terminal_tool(
"error": result.get("error")
}
return json.dumps(formatted_result)
return json.dumps(formatted_result, ensure_ascii=False)
except Exception as e:
return json.dumps({
"output": "",
"screen": "",
"exit_code": -1,
"error": f"Failed to execute terminal command: {str(e)}"
})
"error": f"Failed to execute terminal command: {str(e)}",
"status": "error"
}, ensure_ascii=False)
def check_hecate_requirements() -> bool:
"""

View File

@@ -346,7 +346,7 @@ async def vision_analyze_tool(
_log_debug_call("vision_analyze_tool", debug_call_data)
_save_debug_log()
return json.dumps(result, indent=2)
return json.dumps(result, indent=2, ensure_ascii=False)
except Exception as e:
error_msg = f"Error analyzing image: {str(e)}"
@@ -362,7 +362,7 @@ async def vision_analyze_tool(
_log_debug_call("vision_analyze_tool", debug_call_data)
_save_debug_log()
return json.dumps(result, indent=2)
return json.dumps(result, indent=2, ensure_ascii=False)
finally:
# Clean up temporary image file

View File

@@ -183,16 +183,33 @@ Your goal is to preserve ALL important information while reducing length. Never
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
# Call the LLM asynchronously
response = await nous_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1, # Low temperature for consistent extraction
max_tokens=4000 # Generous limit for comprehensive processing
)
# Call the LLM asynchronously with retry logic for flaky API
max_retries = 3
retry_delay = 2 # Start with 2 seconds
last_error = None
for attempt in range(max_retries):
try:
response = await nous_client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.1, # Low temperature for consistent extraction
max_tokens=4000 # Generous limit for comprehensive processing
)
break # Success, exit retry loop
except Exception as api_error:
last_error = api_error
if attempt < max_retries - 1:
print(f"⚠️ LLM API call failed (attempt {attempt + 1}/{max_retries}): {str(api_error)[:100]}")
print(f" Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff: 2s, 4s, 8s
else:
# All retries exhausted
raise last_error
# Get the markdown response directly
processed_content = response.choices[0].message.content.strip()
@@ -344,7 +361,7 @@ def web_search_tool(query: str, limit: int = 5) -> str:
debug_call_data["results_count"] = results_count
# Convert to JSON
result_json = json.dumps(response_data, indent=2)
result_json = json.dumps(response_data, indent=2, ensure_ascii=False)
debug_call_data["final_response_size"] = len(result_json)
@@ -362,7 +379,7 @@ def web_search_tool(query: str, limit: int = 5) -> str:
_log_debug_call("web_search_tool", debug_call_data)
_save_debug_log()
return json.dumps({"error": error_msg})
return json.dumps({"error": error_msg}, ensure_ascii=False)
async def web_extract_tool(
@@ -575,18 +592,20 @@ async def web_extract_tool(
"title": r.get("title", ""),
"content": r.get("content", ""),
"error": r.get("error"),
**({"llm_model": model} if use_llm_processing else {})
}
for r in response.get("results", [])
]
trimmed_response = {"results": trimmed_results}
# Include model name used for summarization when LLM processing was requested
if use_llm_processing:
trimmed_response["llm_model"] = model
if trimmed_response.get("results") == []:
result_json = json.dumps({"error": "Content was inaccessible or not found"}, ensure_ascii=False)
cleaned_result = clean_base64_images(result_json)
result_json = json.dumps(trimmed_response, indent=2)
# Clean base64 images from extracted content
cleaned_result = clean_base64_images(result_json)
else:
result_json = json.dumps(trimmed_response, indent=2, ensure_ascii=False)
cleaned_result = clean_base64_images(result_json)
debug_call_data["final_response_size"] = len(cleaned_result)
debug_call_data["processing_applied"].append("base64_image_removal")
@@ -605,7 +624,7 @@ async def web_extract_tool(
_log_debug_call("web_extract_tool", debug_call_data)
_save_debug_log()
return json.dumps({"error": error_msg})
return json.dumps({"error": error_msg}, ensure_ascii=False)
async def web_crawl_tool(
@@ -851,17 +870,13 @@ async def web_crawl_tool(
{
"title": r.get("title", ""),
"content": r.get("content", ""),
"error": r.get("error"),
**({"llm_model": model} if use_llm_processing else {})
"error": r.get("error")
}
for r in response.get("results", [])
]
trimmed_response = {"results": trimmed_results}
# Include model name used for summarization when LLM processing was requested
if use_llm_processing:
trimmed_response["llm_model"] = model
result_json = json.dumps(trimmed_response, indent=2)
result_json = json.dumps(trimmed_response, indent=2, ensure_ascii=False)
# Clean base64 images from crawled content
cleaned_result = clean_base64_images(result_json)
@@ -882,7 +897,7 @@ async def web_crawl_tool(
_log_debug_call("web_crawl_tool", debug_call_data)
_save_debug_log()
return json.dumps({"error": error_msg})
return json.dumps({"error": error_msg}, ensure_ascii=False)
# Convenience function to check if API key is available