Compare commits

..

1 Commits

Author SHA1 Message Date
hjc-puro
31c733383b add tracking for cluster failurse 2025-11-15 00:01:19 -05:00
14 changed files with 468 additions and 766 deletions

5
.gitignore vendored
View File

@@ -25,8 +25,3 @@ hermes-*/*
examples/
tests/quick_test_dataset.jsonl
tests/sample_dataset.jsonl
run_datagen_kimik2-thinking.sh
run_datagen_megascience_glm4-6.sh
run_datagen_sonnet.sh
source-data/*
run_datagen_megascience_glm4-6.sh

View File

@@ -1,55 +0,0 @@
# Agents
Agents can be viewed as an FSM using an LLM to generate inputs into the system that operates over a DAG.
What this really means is that the agent is just a function without memory that uses text inputs and outputs in a
defined order.
```python
def my_agent(*args, **kwargs) -> str:
# do whatever you want!
return "Hi I'm an agent!"
```
Now obviously, that's like saying water's wet, but we're going to be using that definition to inform our design of the
library, namely, that we should *not* store agent state outside the function call.
## The Agent Class
So we don't have state, why are we using a class?
Well, we want to initialize things, we want to have some configuration, and we want to have some helper functions.
Preferably all in a single place.
```python
class BaseAgent:
def agent_primitives(self) -> list[BaseAgent]:
# Returns a list of Agents that are utilized by this agent to generate inputs
# We use agent primitives here instead of subagents because these are going to be part
# of the message graph, not a subagent tool call.
raise NotImplementedError
def tools(self) -> list[BaseTool]:
# Returns a list of tools that the agent needs to run
raise NotImplementedError
def run(self, config, *args, **kwargs) -> ConversationGraph:
llm = get_llm(config)
tools = self.tools()
for agent in self.agent_primitives():
tools.extend(agent.tools())
tools = remove_duplicates(tools)
tools = initialize_tools(tools, config)
return self(llm, tools, config, *args, **kwargs)
@staticmethod
def __call__(self, llm, tools, config, *args, **kwargs) -> ConversationGraph:
# Returns a ConversationGraph that can be parsed to get the output of the agent
# Use w/e args/kwargs you want, as long as llm/tools/config are satisfied.
raise NotImplementedError
```
Doesn't seem too bad (I hope), it is a bit annoying that we don't initialize everything in the constructor, but
hopefully we all kinda like it :)

View File

@@ -1,14 +0,0 @@
# LLM Client
A quick wrapper over openai apis
## Responsibilities
- Transform "normal" chat/completions requests into graphs
- Translate graphs into LLM requests
- Keep a history of graphs parsed by it
- On Policy Data
- Deduplicating graphs, so we don't keep previous history as separate graphs
## How to use
Exactly the same as the openai api! Just with the additional support of graph inputs and outputs.

View File

@@ -1,114 +0,0 @@
# Message Graph
```mermaid
graph TD
%% Message nodes
SystemMsg["📋 System Message<br/>Role: System<br/>Content: Messages are nodes in a graph"]
UserMsg["👤 User Message<br/>Role: User<br/>Content: But messages aren't the only thing in the graph"]
subgraph PrevMessages["Previous Messages"]
PrevSystemMsg["📋 System Message<br/>Role: System<br/>Content: Edits are kept in the graph as context"]
PrevUserMsg["👤 User Message<br/>Role: User<br/>Content: So we can ensure they're immutable while keeping them editable"]
end
%% Chat Response as a subgraph
subgraph ChatResponseBox["💬 Chat Response"]
ChatMetadata["📊 Metadata<br/>Temp: 1.0<br/>..."]
ChatResponseText["📝 Response<br/>Hello, Here's a subagent call: &lt;tool&gt;subagent&lt;/tool&gt;"]
ChatContent["Content: Hello, Here's a subagent call..."]
end
%% Tool Response as a subgraph
subgraph ToolResponseBox["🔧 Tool Response"]
subgraph ToolMetadata["📊 Tool Metadata"]
ToolMetadataLength["Length: 3"]
subgraph ToolChat["💭 Subagent Chat"]
SubagentSystem["📋 System<br/>Content: Subagent call received"]
SubagentUser["👤 User<br/>Content: Process this request"]
SubagentAssistant["🤖 Assistant<br/>Content: Processing..."]
SubagentSystem --> SubagentUser
SubagentUser --> SubagentAssistant
end
end
ToolContent["Content: Subagent call output"]
end
%% Graph flow connections
SystemMsg --> UserMsg
PrevSystemMsg --> PrevUserMsg
PrevMessages -.-> UserMsg
UserMsg --> ChatResponseBox
ChatResponseBox --> ToolResponseBox
class SystemMsg,UserMsg messageNode
class ChatResponseBox responseNode
class ToolResponseBox responseNode
class ChatMetadata,ChatResponseText,ChatContent,ToolMetadata,ToolChat,ToolContent,ToolMetadataLength metadataNode
```
Messages should be a graph (DAG, specifically) of immutable elements.
## Why immutable elements?
We want to train on policy
- This means the context cannot change after we call a response.
## Why a graph?
Nodes and connections are a natural way to represent the flow of information in an agent conversation.
## Will this be annoying to deal with?
It shouldn't be! While there will be internal stuff that may look ???, for the interface, it should be as simple as your
normal context window edits, so `message_history[2]['content'] = my_edit`, but internally we'll deal with the recordkeeping
and how this ends up parsing into on policy training data, if requested.
## Edges
Edges are the connections between nodes, and there are two types we are concerned with:
- **Sequential edges**: These represent the flow of conversation, connecting messages in the order they were sent. For example, a user message followed by an assistant response.
- **Parallel edges**: These represent versioning, e.g. edit history, context squishing, etc.
We, however, are only concerned about parallel edges when we break the prefix, and ignore any other parallel edges.
## So what does this look like in practice?
```python
import copy
class MessageGraph:
def __init__(self):
self.messages = []
self.prev_graph = None
def append(self, message):
self.messages.append(message)
def __getitem__(self, index):
return self.messages[index]
def __setitem__(self, key, value):
# check if an assistant message is after this indx
needs_new_graph = False
first_idx = -1
for i in range(key, len(self.messages)):
if (i == key) and (value['role'] == 'assistant') and (value['content'] == self.messages[i]['content']):
# no op
return
needs_new_graph = needs_new_graph or (self.messages[i]['role'] == 'assistant')
if needs_new_graph and first_idx == -1:
first_idx = i
if needs_new_graph:
self.prev_graph = copy.deepcopy(self)
self.messages[key] = value
def __len__(self):
return len(self.messages)
def __eq__(self, other):
return "\n\n".join(f"{msg['role']}: {msg['content']}" for msg in self) == "\n\n".join(
f"{msg['role']}: {msg['content']}" for msg in other)
# in use
messages = MessageGraph()
messages.append({'role': 'system', 'content': 'Hello, I am a system message'})
messages[0] = {'role': 'user', 'content': 'Hello, I am a user message'}
```

View File

@@ -1,16 +0,0 @@
# Tools
Not much on this, yet. Tools are just a stateful wrapper around a function, so we can do things like:
- Keep a docker container running
- Keep a game online
```python
class BaseTool:
def definitions(self) -> List[Dict[str, Any]]:
# OpenAI API compatible definitions
raise NotImplementedError
def __call__(self, *args, **kwargs) -> Dict[str, Any]:
# Returns at minimum {'role': 'tool', 'content': '...'}
raise NotImplementedError
```

View File

@@ -9,15 +9,21 @@ across multiple prompts from a dataset. It includes:
- Checkpointing for fault tolerance and resumption
- Trajectory saving in the proper format (from/value pairs)
- Tool usage statistics aggregation across all batches
- Cluster failure detection and graceful shutdown (morph, firecrawl, API errors)
- Configurable failure thresholds with automatic data consolidation
Usage:
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run
# Resume an interrupted run
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run --resume
# Use a specific toolset distribution
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run --distribution=image_gen
# Configure tool failure thresholds
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run \\
--max_tool_failures=20 --max_tool_failure_rate=0.3
"""
import json
@@ -29,22 +35,94 @@ from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from multiprocessing import Pool, Manager, Lock
import traceback
import re
import fire
from run_agent import AIAgent
from toolset_distributions import (
get_distribution,
list_distributions,
get_distribution,
list_distributions,
sample_toolsets_from_distribution,
validate_distribution
)
from safe_print import safe_print
# Global configuration for worker processes
_WORKER_CONFIG = {}
def _extract_tool_errors_from_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Extract tool errors from message history with tool names.
Args:
messages (List[Dict]): Message history
Returns:
List[Dict]: List of tool errors with tool name, error message, and context
"""
tool_errors = []
tool_calls_map = {} # Map tool_call_id to tool name
for msg in messages:
# Track tool calls from assistant messages
if msg["role"] == "assistant" and "tool_calls" in msg and msg["tool_calls"]:
for tool_call in msg["tool_calls"]:
tool_name = tool_call["function"]["name"]
tool_call_id = tool_call["id"]
tool_calls_map[tool_call_id] = tool_name
# Check tool responses for errors
elif msg["role"] == "tool":
tool_call_id = msg.get("tool_call_id", "")
content = msg.get("content", "")
# Determine if tool call had an error
has_error = False
error_msg = None
try:
content_json = json.loads(content) if isinstance(content, str) else content
if isinstance(content_json, dict):
# Check if error field exists AND has a non-null value
if "error" in content_json and content_json["error"] is not None:
has_error = True
error_msg = str(content_json["error"])
# Special handling for terminal tool responses
if "content" in content_json and isinstance(content_json["content"], dict):
inner_content = content_json["content"]
if inner_content.get("error") is not None or inner_content.get("exit_code", 0) != 0:
has_error = True
error_msg = inner_content.get("error") or f"Exit code: {inner_content.get('exit_code')}"
# Check for "success": false pattern
if content_json.get("success") is False:
has_error = True
if not error_msg:
error_msg = str(content_json.get("message", content_json.get("error", "Unknown error")))
except:
# If not JSON, check if content explicitly states an error
if content.strip().lower().startswith("error:"):
has_error = True
error_msg = content.strip()
# Record error if found
if has_error and tool_call_id in tool_calls_map:
tool_name = tool_calls_map[tool_call_id]
tool_errors.append({
"tool_name": tool_name,
"error_message": error_msg or "Unknown error",
"full_content": content[:500] # Keep first 500 chars of full response
})
return tool_errors
def _extract_tool_stats(messages: List[Dict[str, Any]]) -> Dict[str, Dict[str, int]]:
"""
Extract tool usage statistics from message history.
@@ -98,9 +176,10 @@ def _extract_tool_stats(messages: List[Dict[str, Any]]) -> Dict[str, Dict[str, i
# Terminal wraps its response in a "content" field
if "content" in content_json and isinstance(content_json["content"], dict):
inner_content = content_json["content"]
# Check for actual error (non-null error field)
# Note: non-zero exit codes are not failures - the model can self-correct
if inner_content.get("error") is not None:
# Check for actual error (non-null error field or non-zero exit code)
has_error = (inner_content.get("error") is not None or
inner_content.get("exit_code", 0) != 0)
if has_error:
is_success = False
# Check for "success": false pattern used by some tools
@@ -169,22 +248,26 @@ def _process_single_prompt(
# Run the agent with task_id to ensure each task gets its own isolated VM
result = agent.run_conversation(prompt, task_id=f"task_{prompt_index}")
# Extract tool usage statistics
tool_stats = _extract_tool_stats(result["messages"])
# Extract tool errors from conversation
tool_errors = _extract_tool_errors_from_messages(result["messages"])
# Convert to trajectory format (using existing method)
trajectory = agent._convert_to_trajectory_format(
result["messages"],
prompt,
result["completed"]
)
return {
"success": True,
"prompt_index": prompt_index,
"trajectory": trajectory,
"tool_stats": tool_stats,
"tool_errors": tool_errors,
"completed": result["completed"],
"api_calls": result["api_calls"],
"toolsets_used": selected_toolsets,
@@ -196,14 +279,18 @@ def _process_single_prompt(
}
except Exception as e:
print(f"❌ Error processing prompt {prompt_index}: {e}")
error_msg = str(e)
tb = traceback.format_exc()
safe_print(f"[bold red]❌ Error processing prompt {prompt_index}:[/bold red] {error_msg}")
if config.get("verbose"):
traceback.print_exc()
safe_print(tb)
return {
"success": False,
"prompt_index": prompt_index,
"error": str(e),
"error": error_msg,
"traceback": tb,
"tool_errors": [],
"trajectory": None,
"tool_stats": {},
"toolsets_used": [],
@@ -253,7 +340,9 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
# Initialize aggregated stats for this batch
batch_tool_stats = {}
completed_in_batch = []
all_tool_errors = [] # Track all tool errors in this batch
exception_errors = [] # Track top-level exceptions
# Process each prompt sequentially in this batch
for prompt_index, prompt_data in prompts_to_process:
# Process the prompt
@@ -263,7 +352,26 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
batch_num,
config
)
# Track tool errors from the conversation
if result.get("tool_errors"):
for tool_error in result["tool_errors"]:
all_tool_errors.append({
"prompt_index": prompt_index,
"tool_name": tool_error["tool_name"],
"error_message": tool_error["error_message"],
"full_content": tool_error.get("full_content", "")
})
# Track top-level exceptions (not tool errors)
if not result["success"]:
exception_errors.append({
"prompt_index": prompt_index,
"error": result.get("error", "Unknown error"),
"traceback": result.get("traceback", "")
})
safe_print(f"[bold red]❌ Exception in prompt {prompt_index}:[/bold red] {result.get('error', '')[:100]}")
# Save trajectory if successful
if result["success"] and result["trajectory"]:
trajectory_entry = {
@@ -274,7 +382,7 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
"api_calls": result["api_calls"],
"toolsets_used": result["toolsets_used"]
}
# Append to batch output file
with open(batch_output_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(trajectory_entry, ensure_ascii=False) + "\n")
@@ -302,7 +410,9 @@ def _process_batch_worker(args: Tuple) -> Dict[str, Any]:
"processed": len(prompts_to_process),
"skipped": len(batch_data) - len(prompts_to_process),
"tool_stats": batch_tool_stats,
"completed_prompts": completed_in_batch
"completed_prompts": completed_in_batch,
"tool_errors": all_tool_errors,
"exception_errors": exception_errors
}
@@ -325,6 +435,9 @@ class BatchRunner:
verbose: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
max_tool_failures: int = 10,
max_tool_failure_rate: float = 0.5,
keep_recent_errors: int = 5,
):
"""
Initialize the batch runner.
@@ -342,6 +455,9 @@ class BatchRunner:
verbose (bool): Enable verbose logging
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
max_tool_failures (int): Maximum number of tool failures before stopping (default: 10)
max_tool_failure_rate (float): Maximum tool failure rate (0.0-1.0) before stopping (default: 0.5)
keep_recent_errors (int): Number of recent errors to keep per tool (default: 5)
"""
self.dataset_file = Path(dataset_file)
self.batch_size = batch_size
@@ -355,6 +471,9 @@ class BatchRunner:
self.verbose = verbose
self.ephemeral_system_prompt = ephemeral_system_prompt
self.log_prefix_chars = log_prefix_chars
self.max_tool_failures = max_tool_failures
self.max_tool_failure_rate = max_tool_failure_rate
self.keep_recent_errors = keep_recent_errors
# Validate distribution
if not validate_distribution(distribution):
@@ -376,17 +495,21 @@ class BatchRunner:
# Create batches
self.batches = self._create_batches()
print(f"📊 Batch Runner Initialized")
print(f" Dataset: {self.dataset_file} ({len(self.dataset)} prompts)")
print(f" Batch size: {self.batch_size}")
print(f" Total batches: {len(self.batches)}")
print(f" Run name: {self.run_name}")
print(f" Distribution: {self.distribution}")
print(f" Output directory: {self.output_dir}")
print(f" Workers: {self.num_workers}")
safe_print("[bold cyan]📊 Batch Runner Initialized[/bold cyan]")
safe_print(f" Dataset: {self.dataset_file} ({len(self.dataset)} prompts)")
safe_print(f" Batch size: {self.batch_size}")
safe_print(f" Total batches: {len(self.batches)}")
safe_print(f" Run name: {self.run_name}")
safe_print(f" Distribution: {self.distribution}")
safe_print(f" Output directory: {self.output_dir}")
safe_print(f" Workers: {self.num_workers}")
safe_print(f" [yellow]Tool failure limits:[/yellow]")
safe_print(f" Max failures: {self.max_tool_failures}")
safe_print(f" Max failure rate: {self.max_tool_failure_rate:.1%}")
safe_print(f" Keep recent errors: {self.keep_recent_errors}")
if self.ephemeral_system_prompt:
prompt_preview = self.ephemeral_system_prompt[:60] + "..." if len(self.ephemeral_system_prompt) > 60 else self.ephemeral_system_prompt
print(f" 🔒 Ephemeral system prompt: '{prompt_preview}'")
safe_print(f" 🔒 Ephemeral system prompt: '{prompt_preview}'")
def _load_dataset(self) -> List[Dict[str, Any]]:
"""
@@ -464,13 +587,13 @@ class BatchRunner:
def _save_checkpoint(self, checkpoint_data: Dict[str, Any], lock: Optional[Lock] = None):
"""
Save checkpoint data.
Args:
checkpoint_data (Dict): Checkpoint data to save
lock (Lock): Optional lock for thread-safe access
"""
checkpoint_data["last_updated"] = datetime.now().isoformat()
if lock:
with lock:
with open(self.checkpoint_file, 'w', encoding='utf-8') as f:
@@ -478,6 +601,69 @@ class BatchRunner:
else:
with open(self.checkpoint_file, 'w', encoding='utf-8') as f:
json.dump(checkpoint_data, f, indent=2, ensure_ascii=False)
def _consolidate_data(self, num_batches: int, tool_stats: Dict[str, Dict[str, int]],
start_time: float, tool_errors_by_tool: Dict[str, List[Dict]],
exception_errors: List[Dict], early_exit: bool = False, exit_reason: str = None):
"""
Consolidate batch data into trajectories.jsonl and save statistics.
Args:
num_batches (int): Number of batches processed
tool_stats (Dict): Aggregated tool statistics
start_time (float): Start time of the run
tool_errors_by_tool (Dict): Tool errors grouped by tool name with k most recent
exception_errors (List): Top-level exceptions
early_exit (bool): Whether this is an early exit
exit_reason (str): Reason for early exit
"""
# Combine all batch files into a single trajectories.jsonl file
combined_file = self.output_dir / "trajectories.jsonl"
safe_print(f"\n[cyan]📦 Combining batch files into {combined_file.name}...[/cyan]")
entries_written = 0
with open(combined_file, 'w', encoding='utf-8') as outfile:
for batch_num in range(num_batches):
batch_file = self.output_dir / f"batch_{batch_num}.jsonl"
if batch_file.exists():
with open(batch_file, 'r', encoding='utf-8') as infile:
for line in infile:
outfile.write(line)
entries_written += 1
safe_print(f"[green]✅ Combined {num_batches} batch files into trajectories.jsonl ({entries_written} entries)[/green]")
# Calculate success rates for tool stats
for tool_name in tool_stats:
stats = tool_stats[tool_name]
total_calls = stats["success"] + stats["failure"]
if total_calls > 0:
stats["success_rate"] = round(stats["success"] / total_calls * 100, 2)
stats["failure_rate"] = round(stats["failure"] / total_calls * 100, 2)
else:
stats["success_rate"] = 0.0
stats["failure_rate"] = 0.0
# Save final statistics
final_stats = {
"run_name": self.run_name,
"distribution": self.distribution,
"total_prompts": len(self.dataset),
"total_batches": len(self.batches),
"batches_processed": num_batches,
"batch_size": self.batch_size,
"model": self.model,
"completed_at": datetime.now().isoformat(),
"duration_seconds": round(time.time() - start_time, 2),
"early_exit": early_exit,
"exit_reason": exit_reason,
"tool_errors": tool_errors_by_tool,
"exception_errors": exception_errors[:self.keep_recent_errors], # Keep k most recent
"tool_statistics": tool_stats
}
with open(self.stats_file, 'w', encoding='utf-8') as f:
json.dump(final_stats, f, indent=2, ensure_ascii=False)
def run(self, resume: bool = False):
@@ -519,9 +705,16 @@ class BatchRunner:
# Aggregate statistics across all batches
total_tool_stats = {}
tool_errors_by_tool = {} # {tool_name: [list of k most recent errors]}
all_exception_errors = []
all_completed_prompts = list(completed_prompts_set)
total_processed = len(completed_prompts_set)
total_tool_errors = 0
early_exit = False
exit_reason = None
start_time = time.time()
# Process batches in parallel
with Pool(processes=self.num_workers) as pool:
# Create tasks for each batch
@@ -535,84 +728,147 @@ class BatchRunner:
)
for batch_num, batch_data in enumerate(self.batches)
]
# Use map to process batches in parallel
results = pool.map(_process_batch_worker, tasks)
# Aggregate all batch statistics and update checkpoint
all_completed_prompts = list(completed_prompts_set)
for batch_result in results:
# Add newly completed prompts
all_completed_prompts.extend(batch_result.get("completed_prompts", []))
# Aggregate tool stats
for tool_name, stats in batch_result.get("tool_stats", {}).items():
if tool_name not in total_tool_stats:
total_tool_stats[tool_name] = {
"count": 0,
"success": 0,
"failure": 0
}
total_tool_stats[tool_name]["count"] += stats["count"]
total_tool_stats[tool_name]["success"] += stats["success"]
total_tool_stats[tool_name]["failure"] += stats["failure"]
# Process batches and check tool failure threshold after each batch
for batch_num, task in enumerate(tasks):
# Process single batch
result = pool.apply(_process_batch_worker, (task,))
# Update statistics
all_completed_prompts.extend(result.get("completed_prompts", []))
total_processed += result.get("processed", 0)
# Aggregate tool stats
for tool_name, stats in result.get("tool_stats", {}).items():
if tool_name not in total_tool_stats:
total_tool_stats[tool_name] = {
"count": 0,
"success": 0,
"failure": 0
}
total_tool_stats[tool_name]["count"] += stats["count"]
total_tool_stats[tool_name]["success"] += stats["success"]
total_tool_stats[tool_name]["failure"] += stats["failure"]
# Aggregate tool errors (keep k most recent per tool)
for tool_error in result.get("tool_errors", []):
tool_name = tool_error["tool_name"]
if tool_name not in tool_errors_by_tool:
tool_errors_by_tool[tool_name] = []
# Add error and keep only k most recent
tool_errors_by_tool[tool_name].append(tool_error)
if len(tool_errors_by_tool[tool_name]) > self.keep_recent_errors:
tool_errors_by_tool[tool_name] = tool_errors_by_tool[tool_name][-self.keep_recent_errors:]
total_tool_errors += 1
# Track exception errors
all_exception_errors.extend(result.get("exception_errors", []))
# Check tool failure thresholds
if total_processed > 0:
tool_failure_rate = total_tool_errors / total_processed
# Check absolute count threshold
if total_tool_errors >= self.max_tool_failures:
early_exit = True
exit_reason = f"Exceeded maximum tool failures ({total_tool_errors}/{self.max_tool_failures})"
safe_print(f"\n[bold red]🛑 STOPPING: {exit_reason}[/bold red]")
break
# Check rate threshold
if tool_failure_rate >= self.max_tool_failure_rate:
early_exit = True
exit_reason = f"Exceeded tool failure rate ({tool_failure_rate:.2%} >= {self.max_tool_failure_rate:.2%})"
safe_print(f"\n[bold red]🛑 STOPPING: {exit_reason}[/bold red]")
break
# Update checkpoint after each batch
checkpoint_data["completed_prompts"] = all_completed_prompts
self._save_checkpoint(checkpoint_data)
# Save final checkpoint
checkpoint_data["completed_prompts"] = all_completed_prompts
self._save_checkpoint(checkpoint_data)
# Calculate success rates
for tool_name in total_tool_stats:
stats = total_tool_stats[tool_name]
total_calls = stats["success"] + stats["failure"]
if total_calls > 0:
stats["success_rate"] = round(stats["success"] / total_calls * 100, 2)
stats["failure_rate"] = round(stats["failure"] / total_calls * 100, 2)
else:
stats["success_rate"] = 0.0
stats["failure_rate"] = 0.0
# Combine all batch files into a single trajectories.jsonl file
combined_file = self.output_dir / "trajectories.jsonl"
print(f"\n📦 Combining batch files into {combined_file.name}...")
with open(combined_file, 'w', encoding='utf-8') as outfile:
for batch_num in range(len(self.batches)):
batch_file = self.output_dir / f"batch_{batch_num}.jsonl"
if batch_file.exists():
with open(batch_file, 'r', encoding='utf-8') as infile:
for line in infile:
outfile.write(line)
print(f"✅ Combined {len(self.batches)} batch files into trajectories.jsonl")
# Save final statistics
final_stats = {
"run_name": self.run_name,
"distribution": self.distribution,
"total_prompts": len(self.dataset),
"total_batches": len(self.batches),
"batch_size": self.batch_size,
"model": self.model,
"completed_at": datetime.now().isoformat(),
"duration_seconds": round(time.time() - start_time, 2),
"tool_statistics": total_tool_stats
}
with open(self.stats_file, 'w', encoding='utf-8') as f:
json.dump(final_stats, f, indent=2, ensure_ascii=False)
# Consolidate data and save statistics
num_batches_processed = batch_num + 1 if early_exit else len(self.batches)
self._consolidate_data(
num_batches_processed,
total_tool_stats,
start_time,
tool_errors_by_tool,
all_exception_errors,
early_exit,
exit_reason
)
# Print summary
print("\n" + "=" * 70)
print("📊 BATCH PROCESSING COMPLETE")
print("=" * 70)
print(f"✅ Total prompts processed: {len(self.dataset)}")
print(f"✅ Total batches: {len(self.batches)}")
print(f"⏱️ Total duration: {round(time.time() - start_time, 2)}s")
print(f"\n📈 Tool Usage Statistics:")
print("-" * 70)
safe_print("\n" + "=" * 70)
if early_exit:
safe_print("[bold yellow]⚠️ BATCH PROCESSING STOPPED EARLY[/bold yellow]")
safe_print(f"[yellow]Reason: {exit_reason}[/yellow]")
else:
safe_print("[bold green]📊 BATCH PROCESSING COMPLETE[/bold green]")
safe_print("=" * 70)
safe_print(f"✅ Total prompts processed: {total_processed}")
safe_print(f"✅ Batches completed: {num_batches_processed}/{len(self.batches)}")
safe_print(f"⏱️ Total duration: {round(time.time() - start_time, 2)}s")
# Tool error summary
if tool_errors_by_tool:
total_errors = sum(len(errors) for errors in tool_errors_by_tool.values())
safe_print(f"\n[bold red]🚨 Tool Errors: {total_tool_errors} total ({len(tool_errors_by_tool)} tools)[/bold red]")
safe_print("[red]-[/red]" * 70)
# Sort tools by error count
sorted_tools = sorted(
tool_errors_by_tool.items(),
key=lambda x: len(x[1]),
reverse=True
)
for tool_name, errors in sorted_tools:
# Count unique error messages
unique_errors = {}
for error in errors:
error_msg = error["error_message"][:100] # Truncate for grouping
if error_msg not in unique_errors:
unique_errors[error_msg] = []
unique_errors[error_msg].append(error)
safe_print(f"\n [red]{tool_name}:[/red] {len(errors)} errors ({len(unique_errors)} unique)")
# Show up to 3 most recent unique error types
for idx, (error_msg, instances) in enumerate(list(unique_errors.items())[:3]):
error_preview = error_msg if len(error_msg) <= 100 else error_msg[:97] + "..."
safe_print(f" [{idx+1}] [dim]{error_preview}[/dim] (x{len(instances)})")
# Show one example with prompt index
example = instances[-1] # Most recent
safe_print(f" [dim]Prompt {example['prompt_index']}[/dim]")
if len(unique_errors) > 3:
safe_print(f" [dim]... and {len(unique_errors) - 3} more error types[/dim]")
tool_failure_rate = total_tool_errors / total_processed if total_processed > 0 else 0
safe_print(f"\n [red]Tool failure rate: {tool_failure_rate:.2%}[/red]")
# Exception errors
if all_exception_errors:
safe_print(f"\n[bold red]💥 Top-level Exceptions: {len(all_exception_errors)}[/bold red]")
safe_print("[red]-[/red]" * 70)
for error in all_exception_errors[:self.keep_recent_errors]:
error_preview = error["error"][:100]
if len(error["error"]) > 100:
error_preview += "..."
safe_print(f" Prompt {error['prompt_index']}: [dim]{error_preview}[/dim]")
safe_print(f"\n[cyan]📈 Tool Usage Statistics:[/cyan]")
safe_print("-" * 70)
if total_tool_stats:
# Sort by count descending
sorted_tools = sorted(
@@ -620,25 +876,30 @@ class BatchRunner:
key=lambda x: x[1]["count"],
reverse=True
)
print(f"{'Tool Name':<25} {'Count':<10} {'Success':<10} {'Failure':<10} {'Success Rate':<12}")
print("-" * 70)
safe_print(f"{'Tool Name':<25} {'Count':<10} {'Success':<10} {'Failure':<10} {'Success Rate':<12}")
safe_print("-" * 70)
for tool_name, stats in sorted_tools:
print(
safe_print(
f"{tool_name:<25} "
f"{stats['count']:<10} "
f"{stats['success']:<10} "
f"{stats['failure']:<10} "
f"{stats['success_rate']:.1f}%"
f"{stats.get('success_rate', 0):.1f}%"
)
else:
print("No tool calls were made during this run.")
print(f"\n💾 Results saved to: {self.output_dir}")
print(f" - Trajectories: trajectories.jsonl (combined)")
print(f" - Individual batches: batch_*.jsonl (for debugging)")
print(f" - Statistics: {self.stats_file.name}")
print(f" - Checkpoint: {self.checkpoint_file.name}")
safe_print("No tool calls were made during this run.")
safe_print(f"\n[cyan]💾 Results saved to:[/cyan] {self.output_dir}")
safe_print(f" - Trajectories: trajectories.jsonl (combined)")
safe_print(f" - Individual batches: batch_*.jsonl (for debugging)")
safe_print(f" - Statistics: {self.stats_file.name}")
safe_print(f" - Checkpoint: {self.checkpoint_file.name}")
if early_exit:
safe_print(f"\n[bold yellow] Run was stopped early due to tool failures.[/bold yellow]")
safe_print(f"[yellow] Check {self.stats_file.name} for detailed error information including tracebacks.[/yellow]")
safe_print(f"[yellow] You can resume this run later with --resume flag.[/yellow]")
def main(
@@ -656,6 +917,9 @@ def main(
list_distributions: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
max_tool_failures: int = 10,
max_tool_failure_rate: float = 0.5,
keep_recent_errors: int = 5,
):
"""
Run batch processing of agent prompts from a dataset.
@@ -675,7 +939,10 @@ def main(
list_distributions (bool): List available toolset distributions and exit
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 20)
max_tool_failures (int): Maximum number of tool failures before stopping (default: 10)
max_tool_failure_rate (float): Maximum tool failure rate (0.0-1.0) before stopping (default: 0.5)
keep_recent_errors (int): Number of recent errors to keep per tool for reporting (default: 5)
Examples:
# Basic usage
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run
@@ -689,7 +956,11 @@ def main(
# With ephemeral system prompt (not saved to dataset)
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run \\
--ephemeral_system_prompt="You are a helpful assistant focused on image generation."
# With custom tool failure thresholds
python batch_runner.py --dataset_file=data.jsonl --batch_size=10 --run_name=my_run \\
--max_tool_failures=20 --max_tool_failure_rate=0.3 --keep_recent_errors=10
# List available distributions
python batch_runner.py --list_distributions
"""
@@ -736,7 +1007,10 @@ def main(
num_workers=num_workers,
verbose=verbose,
ephemeral_system_prompt=ephemeral_system_prompt,
log_prefix_chars=log_prefix_chars
log_prefix_chars=log_prefix_chars,
max_tool_failures=max_tool_failures,
max_tool_failure_rate=max_tool_failure_rate,
keep_recent_errors=keep_recent_errors
)
runner.run(resume=resume)

View File

@@ -31,9 +31,7 @@ import asyncio
from typing import Dict, Any, List, Optional
from tools.web_tools import web_search_tool, web_extract_tool, web_crawl_tool, check_firecrawl_api_key
from tools.simple_terminal_tool import simple_terminal_tool, check_requirements as check_simple_terminal_requirements, SIMPLE_TERMINAL_TOOL_DESCRIPTION
# Keep old terminal tool for backwards compatibility if needed
# from tools.terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION
from tools.terminal_tool import terminal_tool, check_hecate_requirements, TERMINAL_TOOL_DESCRIPTION
from tools.vision_tools import vision_analyze_tool, check_vision_requirements
from tools.mixture_of_agents_tool import mixture_of_agents_tool, check_moa_requirements
from tools.image_generation_tool import image_generate_tool, check_image_generation_requirements
@@ -113,7 +111,7 @@ def get_web_tool_definitions() -> List[Dict[str, Any]]:
def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
"""
Get tool definitions for terminal tools in OpenAI's expected format.
Returns:
List[Dict]: List of terminal tool definitions compatible with OpenAI API
"""
@@ -122,7 +120,7 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
"type": "function",
"function": {
"name": "terminal",
"description": SIMPLE_TERMINAL_TOOL_DESCRIPTION,
"description": TERMINAL_TOOL_DESCRIPTION,
"parameters": {
"type": "object",
"properties": {
@@ -130,18 +128,28 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
"type": "string",
"description": "The command to execute on the VM"
},
"input_keys": {
"type": "string",
"description": "Keystrokes to send to the most recent interactive session (e.g., 'hello\\n' for typing hello + Enter). If no active session exists, this will be ignored."
},
"background": {
"type": "boolean",
"description": "Whether to run the command in the background (default: false)",
"default": False
},
"idle_threshold": {
"type": "number",
"description": "Seconds to wait for output before considering session idle (default: 5.0)",
"default": 5.0,
"minimum": 0.1
},
"timeout": {
"type": "integer",
"description": "Command timeout in seconds (optional)",
"minimum": 1
}
},
"required": ["command"]
"required": []
}
}
}
@@ -254,11 +262,11 @@ def get_all_tool_names() -> List[str]:
# Web tools
if check_firecrawl_api_key():
tool_names.extend(["web_search", "web_extract", "web_crawl"])
# Terminal tools
if check_simple_terminal_requirements():
# Terminal tools
if check_hecate_requirements():
tool_names.extend(["terminal"])
# Vision tools
if check_vision_requirements():
tool_names.extend(["vision_analyze"])
@@ -338,11 +346,11 @@ def get_tool_definitions(
if check_firecrawl_api_key():
for tool in get_web_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
if check_simple_terminal_requirements():
if check_hecate_requirements():
for tool in get_terminal_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
if check_vision_requirements():
for tool in get_vision_tool_definitions():
all_available_tools_map[tool["function"]["name"]] = tool
@@ -486,10 +494,12 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A
"""
if function_name == "terminal":
command = function_args.get("command")
input_keys = function_args.get("input_keys")
background = function_args.get("background", False)
idle_threshold = function_args.get("idle_threshold", 5.0)
timeout = function_args.get("timeout")
return simple_terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id)
return terminal_tool(command, input_keys, None, background, idle_threshold, timeout, task_id)
else:
return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False)
@@ -671,10 +681,10 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]:
"requirements": ["FIRECRAWL_API_KEY environment variable"]
},
"terminal_tools": {
"available": check_simple_terminal_requirements(),
"tools": ["simple_terminal_tool"],
"description": "Execute commands on secure Linux VMs without session persistence",
"requirements": ["MORPH_API_KEY environment variable"]
"available": check_hecate_requirements(),
"tools": ["terminal_tool"],
"description": "Execute commands with optional interactive session support on Linux VMs",
"requirements": ["MORPH_API_KEY environment variable", "hecate package"]
},
"vision_tools": {
"available": check_vision_requirements(),
@@ -701,13 +711,13 @@ def get_available_toolsets() -> Dict[str, Dict[str, Any]]:
def check_toolset_requirements() -> Dict[str, bool]:
"""
Check if all requirements for available toolsets are met.
Returns:
Dict: Status of each toolset's requirements
"""
return {
"web_tools": check_firecrawl_api_key(),
"terminal_tools": check_simple_terminal_requirements(),
"terminal_tools": check_hecate_requirements(),
"vision_tools": check_vision_requirements(),
"moa_tools": check_moa_requirements(),
"image_tools": check_image_generation_requirements()

View File

@@ -1,9 +1,6 @@
firecrawl-py
openai
fal-client
fire
git@github.com:NousResearch/hecate.git
tenacity
python-dotenv
fire
httpx
httpx

View File

@@ -388,7 +388,7 @@ class AIAgent:
while api_call_count < self.max_iterations:
api_call_count += 1
print(f"\n🔄 Making OpenAI-compatible API call #{api_call_count}...")
print(f"\n🔄 Making API call #{api_call_count}...")
# Log request details if verbose
if self.verbose_logging:
@@ -397,8 +397,8 @@ class AIAgent:
api_start_time = time.time()
retry_count = 0
max_retries = 6 # Increased to allow longer backoff periods
max_retries = 3
while retry_count <= max_retries:
try:
# Prepare messages for API call
@@ -407,30 +407,30 @@ class AIAgent:
if active_system_prompt:
# Insert system message at the beginning
api_messages = [{"role": "system", "content": active_system_prompt}] + api_messages
# Make API call with tools
response = self.client.chat.completions.create(
model=self.model,
messages=api_messages,
tools=self.tools if self.tools else None,
timeout=300.0 # 5 minute timeout for long-running agent tasks
timeout=60.0 # Add explicit timeout
)
api_duration = time.time() - api_start_time
print(f"⏱️ OpenAI-compatible API call completed in {api_duration:.2f}s")
print(f"⏱️ API call completed in {api_duration:.2f}s")
if self.verbose_logging:
logging.debug(f"API Response received - Usage: {response.usage if hasattr(response, 'usage') else 'N/A'}")
break # Success, exit retry loop
except Exception as api_error:
retry_count += 1
if retry_count > max_retries:
raise api_error
wait_time = min(2 ** retry_count, 60) # Exponential backoff: 2s, 4s, 8s, 16s, 32s, 60s, 60s
print(f"⚠️ OpenAI-compatible API call failed (attempt {retry_count}/{max_retries}): {str(api_error)[:100]}")
wait_time = min(2 ** retry_count, 10) # Exponential backoff, max 10s
print(f"⚠️ API call failed (attempt {retry_count}/{max_retries}): {str(api_error)[:100]}")
print(f"⏳ Retrying in {wait_time}s...")
logging.warning(f"API retry {retry_count}/{max_retries} after error: {api_error}")
time.sleep(wait_time)
@@ -522,11 +522,11 @@ class AIAgent:
"content": final_response
})
print(f"🎉 Conversation completed after {api_call_count} OpenAI-compatible API call(s)")
print(f"🎉 Conversation completed after {api_call_count} API call(s)")
break
except Exception as e:
error_msg = f"Error during OpenAI-compatible API call #{api_call_count}: {str(e)}"
error_msg = f"Error during API call #{api_call_count}: {str(e)}"
print(f"{error_msg}")
if self.verbose_logging:

20
safe_print.py Normal file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""Simple safe print that tries rich, falls back to regular print."""
try:
from rich import print as rich_print
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
def safe_print(*args, **kwargs):
"""Try rich.print, fall back to regular print if it fails."""
if RICH_AVAILABLE:
try:
rich_print(*args, **kwargs)
return
except Exception:
pass
# Fallback to regular print
print(*args, **kwargs)

View File

@@ -161,11 +161,11 @@ def _construct_aggregator_prompt(system_prompt: str, responses: List[str]) -> st
async def _run_reference_model_safe(
model: str,
user_prompt: str,
model: str,
user_prompt: str,
temperature: float = REFERENCE_TEMPERATURE,
max_tokens: int = 32000,
max_retries: int = 6
max_retries: int = 3
) -> tuple[str, str, bool]:
"""
Run a single reference model with retry logic and graceful failure handling.
@@ -212,8 +212,8 @@ async def _run_reference_model_safe(
print(f"⚠️ {model} unknown error (attempt {attempt + 1}): {error_str}")
if attempt < max_retries - 1:
# Exponential backoff for rate limiting: 2s, 4s, 8s, 16s, 32s, 60s
sleep_time = min(2 ** (attempt + 1), 60)
# Exponential backoff for rate limiting
sleep_time = 2 ** attempt
print(f" Retrying in {sleep_time}s...")
await asyncio.sleep(sleep_time)
else:

View File

@@ -1,395 +0,0 @@
#!/usr/bin/env python3
"""
Simple Terminal Tool Module
A simplified terminal tool that executes commands on MorphCloud VMs without tmux.
No session persistence, no interactive app support - just simple command execution.
Features:
- Direct SSH command execution
- Background task support
- VM lifecycle management with TTL
- Automatic cleanup after inactivity
Usage:
from simple_terminal_tool import simple_terminal_tool
# Execute a simple command
result = simple_terminal_tool("ls -la")
# Execute in background
result = simple_terminal_tool("python server.py", background=True)
"""
import json
import os
import time
import threading
import atexit
from typing import Optional, Dict, Any
# Tool description for LLM
SIMPLE_TERMINAL_TOOL_DESCRIPTION = """Execute commands on a secure Linux VM environment.
**Environment:**
- Minimal Debian-based OS with internet access
- Automatic VM lifecycle management (creates on-demand, reuses, cleans up)
- Filesystem is persisted between tool calls but environment variables, venvs, etc are reset.
**Command Execution:**
- Simple commands: Just provide the 'command' parameter
- Background processes: Set 'background': True for servers/long-running tasks
- Command timeout: Optional 'timeout' parameter in seconds
**Examples:**
- Run command: `{"command": "ls -la"}`
- Background task: `{"command": "source path/to/my/venv/bin/activate && python server.py", "background": True}`
- With timeout: `{"command": "long_task.sh", "timeout": 300}`
**Best Practices:**
- Run servers/long processes in background
- Monitor disk usage for large tasks
- Install whatever tools you need with sudo apt-get
- Do not be afraid to run pip with --break-system-packages
**Things to avoid**
- Do NOT use interactive tools such as tmux, vim, nano, python repl - you will get stuck. Even git sometimes becomes interactive if the output is large. If you're not sure pipe to cat.
"""
# Global state for VM lifecycle management
_active_instances: Dict[str, Any] = {}
_last_activity: Dict[str, float] = {}
_instance_lock = threading.Lock()
_cleanup_thread = None
_cleanup_running = False
def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300):
"""Clean up VMs that have been inactive for longer than vm_lifetime_seconds."""
global _active_instances, _last_activity
current_time = time.time()
tasks_to_cleanup = []
with _instance_lock:
for task_id, last_time in list(_last_activity.items()):
if current_time - last_time > vm_lifetime_seconds:
tasks_to_cleanup.append(task_id)
for task_id in tasks_to_cleanup:
try:
if task_id in _active_instances:
instance = _active_instances[task_id]
if hasattr(instance, 'terminate'):
instance.terminate()
elif hasattr(instance, 'stop'):
instance.stop()
elif hasattr(instance, 'delete'):
instance.delete()
del _active_instances[task_id]
print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}")
if task_id in _last_activity:
del _last_activity[task_id]
except Exception as e:
# 404 errors are benign - VM already cleaned up by TTL
error_str = str(e)
if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower():
print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)")
else:
print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}")
def _cleanup_thread_worker():
"""Background thread worker that periodically cleans up inactive VMs."""
global _cleanup_running
while _cleanup_running:
try:
vm_lifetime = int(os.getenv("HECATE_VM_LIFETIME_SECONDS", "300"))
_cleanup_inactive_vms(vm_lifetime)
except Exception as e:
print(f"[VM Cleanup] Error in cleanup thread: {e}")
for _ in range(60):
if not _cleanup_running:
break
time.sleep(1)
def _start_cleanup_thread():
"""Start the background cleanup thread if not already running."""
global _cleanup_thread, _cleanup_running
with _instance_lock:
if _cleanup_thread is None or not _cleanup_thread.is_alive():
_cleanup_running = True
_cleanup_thread = threading.Thread(target=_cleanup_thread_worker, daemon=True)
_cleanup_thread.start()
def _stop_cleanup_thread():
"""Stop the background cleanup thread."""
global _cleanup_running
_cleanup_running = False
if _cleanup_thread is not None:
_cleanup_thread.join(timeout=5)
def cleanup_vm(task_id: str):
"""Manually clean up a specific VM by task_id."""
global _active_instances, _last_activity
with _instance_lock:
try:
if task_id in _active_instances:
instance = _active_instances[task_id]
if hasattr(instance, 'terminate'):
instance.terminate()
elif hasattr(instance, 'stop'):
instance.stop()
elif hasattr(instance, 'delete'):
instance.delete()
del _active_instances[task_id]
print(f"[VM Cleanup] Manually terminated VM for task: {task_id}")
if task_id in _last_activity:
del _last_activity[task_id]
except Exception as e:
# 404 errors are benign - VM already cleaned up by TTL
error_str = str(e)
if "404" in error_str or "InstanceNotFoundError" in error_str or "not found" in error_str.lower():
print(f"[VM Cleanup] VM for task {task_id} already cleaned up (likely TTL expiration)")
else:
print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}")
atexit.register(_stop_cleanup_thread)
def _execute_ssh_command(instance, command: str, timeout: Optional[int] = None) -> Dict[str, Any]:
"""
Execute a command via SSH on the VM instance.
Args:
instance: MorphVM instance
command: Command to execute
timeout: Optional timeout in seconds
Returns:
dict with stdout, stderr, returncode
"""
ssh_context_manager = None
try:
# Use the instance's SSH context manager
ssh_context_manager = instance.ssh()
ssh_context = ssh_context_manager.__enter__()
# Execute the command
result = ssh_context.run(command, get_pty=False, timeout=timeout or 120)
# Close the SSH connection
if ssh_context_manager:
try:
ssh_context_manager.__exit__(None, None, None)
except:
pass
return {
"stdout": result.stdout or "",
"stderr": result.stderr or "",
"returncode": result.returncode
}
except Exception as e:
# Close connection on error
if ssh_context_manager:
try:
ssh_context_manager.__exit__(None, None, None)
except:
pass
# Check if it's a timeout
error_str = str(e).lower()
if "timeout" in error_str:
return {
"stdout": "",
"stderr": f"Command timed out after {timeout or 120} seconds",
"returncode": 124
}
return {
"stdout": "",
"stderr": f"SSH execution failed: {str(e)}",
"returncode": -1
}
def simple_terminal_tool(
command: str,
background: bool = False,
timeout: Optional[int] = None,
task_id: Optional[str] = None
) -> str:
"""
Execute a command on a MorphCloud VM without session persistence.
Args:
command: The command to execute
background: Whether to run in background (default: False)
timeout: Command timeout in seconds (default: 120)
task_id: Unique identifier for VM isolation (optional)
Returns:
str: JSON string with output, exit_code, and error fields
Examples:
# Execute a simple command
>>> result = simple_terminal_tool(command="ls -la /tmp")
# Run a background task
>>> result = simple_terminal_tool(command="python server.py", background=True)
# With custom timeout
>>> result = simple_terminal_tool(command="long_task.sh", timeout=300)
"""
global _active_instances, _last_activity
try:
# Import required modules
try:
from morphcloud.api import MorphCloudClient
except ImportError as import_error:
return json.dumps({
"output": "",
"exit_code": -1,
"error": f"Terminal tool disabled: {import_error}",
"status": "disabled"
}, ensure_ascii=False)
# Get configuration
vm_ttl_seconds = int(os.getenv("HECATE_VM_TTL_SECONDS", "1200"))
snapshot_id = os.getenv("HECATE_DEFAULT_SNAPSHOT_ID", "snapshot_defv9tjg")
# Check API key
morph_api_key = os.getenv("MORPH_API_KEY")
if not morph_api_key:
return json.dumps({
"output": "",
"exit_code": -1,
"error": "MORPH_API_KEY environment variable not set",
"status": "disabled"
}, ensure_ascii=False)
# Use task_id for VM isolation
effective_task_id = task_id or "default"
# Start cleanup thread
_start_cleanup_thread()
# Get or create VM instance
with _instance_lock:
if effective_task_id not in _active_instances:
morph_client = MorphCloudClient(api_key=morph_api_key)
_active_instances[effective_task_id] = morph_client.instances.start(
snapshot_id=snapshot_id,
ttl_seconds=vm_ttl_seconds,
ttl_action="stop"
)
# Update last activity time
_last_activity[effective_task_id] = time.time()
instance = _active_instances[effective_task_id]
# Wait for instance to be ready
instance.wait_until_ready()
# Prepare command for execution
if background:
# Run in background with nohup and redirect output
exec_command = f"nohup {command} > /tmp/bg_output.log 2>&1 &"
result = _execute_ssh_command(instance, exec_command, timeout=10)
# For background tasks, return immediately with info
if result["returncode"] == 0:
return json.dumps({
"output": "Background task started successfully",
"exit_code": 0,
"error": None
}, ensure_ascii=False)
else:
return json.dumps({
"output": result["stdout"],
"exit_code": result["returncode"],
"error": result["stderr"]
}, ensure_ascii=False)
else:
# Run foreground command
result = _execute_ssh_command(instance, command, timeout=timeout)
# Combine stdout and stderr for output
output = result["stdout"]
if result["stderr"] and result["returncode"] != 0:
output = f"{output}\n{result['stderr']}" if output else result["stderr"]
return json.dumps({
"output": output.strip(),
"exit_code": result["returncode"],
"error": result["stderr"] if result["returncode"] != 0 else None
}, ensure_ascii=False)
except Exception as e:
return json.dumps({
"output": "",
"exit_code": -1,
"error": f"Failed to execute command: {str(e)}",
"status": "error"
}, ensure_ascii=False)
def check_requirements() -> bool:
"""Check if all requirements for the simple terminal tool are met."""
required_vars = ["MORPH_API_KEY"]
missing_required = [var for var in required_vars if not os.getenv(var)]
if missing_required:
print(f"Missing required environment variables: {', '.join(missing_required)}")
return False
try:
from morphcloud.api import MorphCloudClient
return True
except Exception as e:
print(f"MorphCloud not available: {e}")
return False
if __name__ == "__main__":
"""Simple test when run directly."""
print("Simple Terminal Tool Module")
print("=" * 40)
if not check_requirements():
print("Requirements not met. Please check the messages above.")
exit(1)
print("All requirements met!")
print("\nAvailable Tool:")
print(" - simple_terminal_tool: Execute commands without session persistence")
print("\nUsage Examples:")
print(" # Execute a command")
print(" result = simple_terminal_tool(command='ls -la')")
print(" ")
print(" # Run a background task")
print(" result = simple_terminal_tool(command='python server.py', background=True)")
print("\nEnvironment Variables:")
print(f" MORPH_API_KEY: {'Set' if os.getenv('MORPH_API_KEY') else 'Not set'}")
print(f" HECATE_VM_TTL_SECONDS: {os.getenv('HECATE_VM_TTL_SECONDS', '1200')} (default: 1200 / 20 minutes)")
print(f" HECATE_VM_LIFETIME_SECONDS: {os.getenv('HECATE_VM_LIFETIME_SECONDS', '300')} (default: 300 / 5 minutes)")
print(f" HECATE_DEFAULT_SNAPSHOT_ID: {os.getenv('HECATE_DEFAULT_SNAPSHOT_ID', 'snapshot_defv9tjg')}")

View File

@@ -184,10 +184,10 @@ Your goal is to preserve ALL important information while reducing length. Never
Create a markdown summary that captures all key information in a well-organized, scannable format. Include important quotes and code snippets in their original formatting. Focus on actionable information, specific details, and unique insights."""
# Call the LLM asynchronously with retry logic for flaky API
max_retries = 6
max_retries = 3
retry_delay = 2 # Start with 2 seconds
last_error = None
for attempt in range(max_retries):
try:
response = await nous_client.chat.completions.create(
@@ -206,7 +206,7 @@ Create a markdown summary that captures all key information in a well-organized,
print(f"⚠️ LLM API call failed (attempt {attempt + 1}/{max_retries}): {str(api_error)[:100]}")
print(f" Retrying in {retry_delay}s...")
await asyncio.sleep(retry_delay)
retry_delay = min(retry_delay * 2, 60) # Exponential backoff: 2s, 4s, 8s, 16s, 32s, 60s
retry_delay *= 2 # Exponential backoff: 2s, 4s, 8s
else:
# All retries exhausted
raise last_error

View File

@@ -67,7 +67,7 @@ DISTRIBUTIONS = {
"description": "Web research with vision analysis and reasoning",
"toolsets": {
"web": 94, # 90% chance of web tools
"vision": 65, # 50% chance of vision tools
"vision": 50, # 50% chance of vision tools
"moa": 10, # 40% chance of reasoning tools
"terminal": 94, # 10% chance of terminal tools
"image_gen": 15 # 80% chance of image generation tools