Compare commits
2 Commits
asyncio
...
add-prokle
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e578f976af | ||
|
|
96bc31a8b1 |
@@ -689,16 +689,12 @@ class BatchRunner:
|
||||
print("\n" + "=" * 70)
|
||||
print("🚀 Starting Batch Processing")
|
||||
print("=" * 70)
|
||||
|
||||
# Load checkpoint
|
||||
checkpoint_data = self._load_checkpoint() if resume else {
|
||||
"run_name": self.run_name,
|
||||
"completed_prompts": [],
|
||||
"last_updated": None
|
||||
}
|
||||
|
||||
if resume and checkpoint_data.get("completed_prompts"):
|
||||
print(f"📂 Resuming from checkpoint ({len(checkpoint_data['completed_prompts'])} prompts already completed)")
|
||||
|
||||
# Always load checkpoint if it exists to skip completed indices
|
||||
checkpoint_data = self._load_checkpoint()
|
||||
|
||||
if checkpoint_data.get("completed_prompts"):
|
||||
print(f"📂 Found existing checkpoint - skipping {len(checkpoint_data['completed_prompts'])} already completed prompts")
|
||||
|
||||
completed_prompts_set = set(checkpoint_data.get("completed_prompts", []))
|
||||
|
||||
|
||||
13
gemini_thinking.sh
Normal file
13
gemini_thinking.sh
Normal file
@@ -0,0 +1,13 @@
|
||||
python batch_runner.py \
|
||||
--dataset_file="source-data/agent_tasks_eval_10.jsonl" \
|
||||
--batch_size=1 \
|
||||
--run_name="agenttasks_eval_gemini-3-3-10-thinking-2025-11-22" \
|
||||
--distribution="science" \
|
||||
--prokletor_client="HermesToolClient" \
|
||||
--model="gemini-3-pro-preview" \
|
||||
--base_url="https://generativelanguage.googleapis.com/v1beta/openai/" \
|
||||
--api_key="${GEMINI_API_KEY}" \
|
||||
--num_workers=10 \
|
||||
--max_turns=60 \
|
||||
--verbose \
|
||||
--ephemeral_system_prompt="You have access to a variety of tools to help you solve scientific, math, and technology problems presented to you. You can use them in sequence and build off of the results of prior tools you've used results. Always use the terminal or search tool if it can provide additional context, verify formulas, double check concepts and recent studies and understanding, doing all calculations, etc. You should only be confident in your own reasoning, knowledge, or calculations if you've exhaustively used all tools available to you to that can help you verify or validate your work. Always pip install any packages you need to use the python scripts you want to run. If you need to use a tool that isn't available, you can use the terminal tool to install or create it in many cases as well. Do not use the terminal tool to communicate with the user, as they cannot see your commands, only your final response after completing the task. The web search tool only gets you urls and brief descriptions you need to run web extract to actually visit those urls. If you need to check if you have a certain API key available please do so in a way that does not expose the key. For verbose tools like installs please use the quietest version. Also please make sure you include -y in your install commands or the terminal will get stuck at the y/n stage."
|
||||
17
run_agent.py
17
run_agent.py
@@ -738,7 +738,6 @@ class AIAgent:
|
||||
if self.save_trajectories:
|
||||
# Use the client wrapper's format method if available to get the exact Hermes format
|
||||
if hasattr(self, 'client') and hasattr(self.client, 'format'):
|
||||
raise ValueError("reached this point")
|
||||
formatted_messages = self.client.format(messages, self.tools, render_final=True)
|
||||
|
||||
# We need to adapt this formatted list to the trajectory format expected by _save_trajectory
|
||||
@@ -803,12 +802,16 @@ class AIAgent:
|
||||
# Fallback to original saving method
|
||||
self._save_trajectory(messages, user_message, completed)
|
||||
|
||||
# Clean up VM for this task after conversation completes
|
||||
try:
|
||||
await asyncio.to_thread(cleanup_vm, effective_task_id)
|
||||
except Exception as e:
|
||||
if self.verbose_logging:
|
||||
logging.warning(f"Failed to cleanup VM for task {effective_task_id}: {e}")
|
||||
# Clean up VM for this task after conversation completes (fire-and-forget)
|
||||
# Don't await this - let it run in the background so we don't block returning results
|
||||
async def cleanup_task():
|
||||
try:
|
||||
await asyncio.to_thread(cleanup_vm, effective_task_id)
|
||||
except Exception as e:
|
||||
if self.verbose_logging:
|
||||
logging.warning(f"Failed to cleanup VM for task {effective_task_id}: {e}")
|
||||
|
||||
asyncio.create_task(cleanup_task())
|
||||
|
||||
# Get profiling statistics for this conversation
|
||||
profiling_stats = get_profiler().get_statistics()
|
||||
|
||||
@@ -100,30 +100,17 @@ def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300):
|
||||
global _active_instances, _active_contexts, _last_activity
|
||||
|
||||
current_time = time.time()
|
||||
tasks_to_cleanup = []
|
||||
instances_to_cleanup = []
|
||||
|
||||
# Find and extract instances to cleanup while holding lock
|
||||
with _instance_lock:
|
||||
# Find all VMs that have been inactive for too long
|
||||
for task_id, last_time in list(_last_activity.items()):
|
||||
if current_time - last_time > vm_lifetime_seconds:
|
||||
tasks_to_cleanup.append(task_id)
|
||||
|
||||
# Clean up the inactive VMs
|
||||
for task_id in tasks_to_cleanup:
|
||||
try:
|
||||
if task_id in _active_instances:
|
||||
instance = _active_instances[task_id]
|
||||
# Terminate the VM instance
|
||||
if hasattr(instance, 'terminate'):
|
||||
instance.terminate()
|
||||
elif hasattr(instance, 'stop'):
|
||||
instance.stop()
|
||||
elif hasattr(instance, 'delete'):
|
||||
instance.delete()
|
||||
|
||||
# Remove from tracking dictionaries
|
||||
instances_to_cleanup.append((task_id, _active_instances[task_id]))
|
||||
# Remove from tracking dictionaries immediately
|
||||
del _active_instances[task_id]
|
||||
print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}")
|
||||
|
||||
if task_id in _active_contexts:
|
||||
del _active_contexts[task_id]
|
||||
@@ -131,8 +118,18 @@ def _cleanup_inactive_vms(vm_lifetime_seconds: int = 300):
|
||||
if task_id in _last_activity:
|
||||
del _last_activity[task_id]
|
||||
|
||||
except Exception as e:
|
||||
print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}")
|
||||
# Terminate outside the lock so we don't block other operations
|
||||
for task_id, instance in instances_to_cleanup:
|
||||
try:
|
||||
if hasattr(instance, 'terminate'):
|
||||
instance.terminate()
|
||||
elif hasattr(instance, 'stop'):
|
||||
instance.stop()
|
||||
elif hasattr(instance, 'delete'):
|
||||
instance.delete()
|
||||
print(f"[VM Cleanup] Terminated inactive VM for task: {task_id}")
|
||||
except Exception as e:
|
||||
print(f"[VM Cleanup] Error cleaning up VM for task {task_id}: {e}")
|
||||
|
||||
def _cleanup_thread_worker():
|
||||
"""
|
||||
@@ -185,28 +182,30 @@ def cleanup_vm(task_id: str):
|
||||
"""
|
||||
global _active_instances, _active_contexts, _last_activity
|
||||
|
||||
# Extract instance from dict while holding lock, but don't terminate yet
|
||||
instance_to_cleanup = None
|
||||
with _instance_lock:
|
||||
if task_id in _active_instances:
|
||||
instance_to_cleanup = _active_instances[task_id]
|
||||
# Remove from tracking dictionaries immediately
|
||||
del _active_instances[task_id]
|
||||
|
||||
if task_id in _active_contexts:
|
||||
del _active_contexts[task_id]
|
||||
|
||||
if task_id in _last_activity:
|
||||
del _last_activity[task_id]
|
||||
|
||||
# Terminate outside the lock so multiple cleanups can run concurrently
|
||||
if instance_to_cleanup:
|
||||
try:
|
||||
if task_id in _active_instances:
|
||||
instance = _active_instances[task_id]
|
||||
# Terminate the VM instance
|
||||
if hasattr(instance, 'terminate'):
|
||||
instance.terminate()
|
||||
elif hasattr(instance, 'stop'):
|
||||
instance.stop()
|
||||
elif hasattr(instance, 'delete'):
|
||||
instance.delete()
|
||||
|
||||
# Remove from tracking dictionaries
|
||||
del _active_instances[task_id]
|
||||
print(f"[VM Cleanup] Manually terminated VM for task: {task_id}")
|
||||
|
||||
if task_id in _active_contexts:
|
||||
del _active_contexts[task_id]
|
||||
|
||||
if task_id in _last_activity:
|
||||
del _last_activity[task_id]
|
||||
|
||||
if hasattr(instance_to_cleanup, 'terminate'):
|
||||
instance_to_cleanup.terminate()
|
||||
elif hasattr(instance_to_cleanup, 'stop'):
|
||||
instance_to_cleanup.stop()
|
||||
elif hasattr(instance_to_cleanup, 'delete'):
|
||||
instance_to_cleanup.delete()
|
||||
print(f"[VM Cleanup] Manually terminated VM for task: {task_id}")
|
||||
except Exception as e:
|
||||
print(f"[VM Cleanup] Error manually cleaning up VM for task {task_id}: {e}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user