Compare commits

..

9 Commits

Author SHA1 Message Date
emozilla
ab6abc2c13 fix: use per-thread persistent event loops in worker threads
Replace asyncio.run() with thread-local persistent event loops for
worker threads (e.g., delegate_task's ThreadPoolExecutor). asyncio.run()
creates and closes a fresh loop on every call, leaving cached
httpx/AsyncOpenAI clients bound to a dead loop — causing 'Event loop is
closed' errors during GC when parallel subagents clean up connections.

The fix mirrors the main thread's _get_tool_loop() pattern but uses
threading.local() so each worker thread gets its own long-lived loop,
avoiding both cross-thread contention and the create-destroy lifecycle.

Added 4 regression tests covering worker loop persistence, reuse,
per-thread isolation, and separation from the main thread's loop.
2026-03-20 15:41:06 -04:00
Teknium
aafe86d81a fix: prevent 'event loop already running' when async tools run in parallel (#2207)
When the model returns multiple tool calls, run_agent.py executes them
concurrently in a ThreadPoolExecutor. Each thread called _run_async()
which used a shared persistent event loop (_get_tool_loop()). If two
async tools (like web_extract) ran in parallel, the second thread would
hit 'This event loop is already running' on the shared loop.

Fix: detect worker threads (not main thread) and use asyncio.run() with
a per-thread fresh loop instead of the shared persistent one. The shared
loop is still used for the main thread (CLI sequential path) to keep
cached async clients (httpx/AsyncOpenAI) alive.

Co-authored-by: Test <test@test.com>
2026-03-20 11:39:13 -07:00
Teknium
1aa7027be1 Merge pull request #2192 from NousResearch/hermes/hermes-3d7c23c9
fix(acp): preserve leading whitespace in streaming chunks
2026-03-20 09:52:32 -07:00
Teknium
f961937097 Merge pull request #2181 from NousResearch/hermes/hermes-4a7e401e
fix: missing platforms in delivery maps + WhatsApp image/bridge improvements
2026-03-20 09:45:50 -07:00
Teknium
7a427d7b03 fix: persistent event loop in _run_async prevents 'Event loop is closed' (#2190)
Cherry-picked from PR #2146 by @crazywriter1. Fixes #2104.

asyncio.run() creates and closes a fresh event loop each call. Cached
httpx/AsyncOpenAI clients bound to the dead loop crash on GC with
'Event loop is closed'. This hit vision_analyze on first use in CLI.

Two-layer fix:
- model_tools._run_async(): replace asyncio.run() with persistent
  loop via _get_tool_loop() + run_until_complete()
- auxiliary_client._get_cached_client(): track which loop created
  each async client, discard stale entries if loop is closed

6 regression tests covering loop lifecycle, reuse, and full vision
dispatch chain.

Co-authored-by: Test <test@test.com>
2026-03-20 09:44:50 -07:00
Teknium
66a1942524 feat: add /queue command to queue prompts without interrupting (#2191)
Adds /queue <prompt> (alias /q) that queues a message for the next
turn while the agent is busy, without interrupting the current run.

- CLI: /queue <prompt> puts it in _pending_input for the next turn
- Gateway: /queue <prompt> creates a pending MessageEvent on the
  adapter, picked up after the current agent run finishes
- Enter still interrupts as usual (no behavior change)
- /queue with no prompt shows usage
- /queue when agent is idle tells user to just type normally

Co-authored-by: Test <test@test.com>
2026-03-20 09:44:27 -07:00
Dilee
1173adbe86 fix(acp): preserve leading whitespace in streaming chunks 2026-03-20 09:38:13 -07:00
Test
a5beb6d8f0 fix(whatsapp): image downloading, bridge reuse, LID allowlist, Baileys 7.x compat
Salvaged from PR #2162 by @Zindar. Reply prefix changes excluded (already
on main via #1756 configurable prefix).

Bridge improvements (bridge.js):
- Download incoming images to ~/.hermes/image_cache/ via downloadMediaMessage
  so the agent can actually see user-sent photos
- Add getMessage callback required for Baileys 7.x E2EE session
  re-establishment (without it, some messages arrive as null)
- Build LID→phone reverse map for allowlist resolution (WhatsApp LID format)
- Add placeholder body for media without caption: [image received]
- Bind express to 127.0.0.1 instead of 0.0.0.0 for security
- Use 127.0.0.1 consistently throughout (more reliable than localhost)

Adapter improvements (whatsapp.py):
- Detect and reuse already-running bridge (only if status=connected)
- Handle local file paths from bridge-cached images in _build_message_event
- Don't kill external bridges on disconnect
- Use 127.0.0.1 throughout for consistency with bridge binding

Fix vs original PR: bridge reuse now checks status=connected, not just
HTTP 200. A disconnected bridge gets restarted instead of reused.

Co-authored-by: Zindar <zindar@users.noreply.github.com>
2026-03-20 09:37:48 -07:00
Test
8f6ecd5c64 fix: add missing platforms to cron/send_message delivery maps and tool schema
Matrix, Mattermost, Home Assistant, and DingTalk were missing from the
platform_map in both cron/scheduler.py and tools/send_message_tool.py,
causing delivery to those platforms to silently fail.

Also updates the cronjob tool schema description to list all available
delivery targets so the model knows its options.
2026-03-20 08:52:21 -07:00
14 changed files with 534 additions and 68 deletions

View File

@@ -1191,8 +1191,18 @@ def _get_cached_client(
cache_key = (provider, async_mode, base_url or "", api_key or "")
with _client_cache_lock:
if cache_key in _client_cache:
cached_client, cached_default = _client_cache[cache_key]
return cached_client, model or cached_default
cached_client, cached_default, cached_loop = _client_cache[cache_key]
if async_mode:
# Async clients are bound to the event loop that created them.
# A cached async client whose loop has been closed will raise
# "Event loop is closed" when httpx tries to clean up its
# transport. Discard the stale client and create a fresh one.
if cached_loop is not None and cached_loop.is_closed():
del _client_cache[cache_key]
else:
return cached_client, model or cached_default
else:
return cached_client, model or cached_default
# Build outside the lock
client, default_model = resolve_provider_client(
provider,
@@ -1202,11 +1212,20 @@ def _get_cached_client(
explicit_api_key=api_key,
)
if client is not None:
# For async clients, remember which loop they were created on so we
# can detect stale entries later.
bound_loop = None
if async_mode:
try:
import asyncio as _aio
bound_loop = _aio.get_event_loop()
except RuntimeError:
pass
with _client_cache_lock:
if cache_key not in _client_cache:
_client_cache[cache_key] = (client, default_model)
_client_cache[cache_key] = (client, default_model, bound_loop)
else:
client, default_model = _client_cache[cache_key]
client, default_model, _ = _client_cache[cache_key]
return client, model or default_model

View File

@@ -356,7 +356,7 @@ class CopilotACPClient:
text_parts=text_parts,
reasoning_parts=reasoning_parts,
)
return "".join(text_parts).strip(), "".join(reasoning_parts).strip()
return "".join(text_parts), "".join(reasoning_parts)
finally:
self.close()
@@ -380,7 +380,7 @@ class CopilotACPClient:
content = update.get("content") or {}
chunk_text = ""
if isinstance(content, dict):
chunk_text = str(content.get("text") or "").strip()
chunk_text = str(content.get("text") or "")
if kind == "agent_message_chunk" and chunk_text and text_parts is not None:
text_parts.append(chunk_text)
elif kind == "agent_thought_chunk" and chunk_text and reasoning_parts is not None:

42
cli.py
View File

@@ -760,7 +760,7 @@ def _prune_stale_worktrees(repo_root: str, max_age_hours: int = 24) -> None:
# - Dim: #B8860B (muted text)
# ANSI building blocks for conversation display
_GOLD = "\033[1;38;2;255;215;0m" # True-color #FFD700 bold — matches Rich Panel gold
_GOLD = "\033[1;33m" # Bold yellow — closest universal match to the gold theme
_BOLD = "\033[1m"
_DIM = "\033[2m"
_RST = "\033[0m"
@@ -1504,7 +1504,7 @@ class HermesCLI:
_cprint(f"{_DIM}{'' * (w - 2)}{_RST}")
self._reasoning_box_opened = False
def _stream_delta(self, text) -> None:
def _stream_delta(self, text: str) -> None:
"""Line-buffered streaming callback for real-time token rendering.
Receives text deltas from the agent as tokens arrive. Buffers
@@ -1514,15 +1514,7 @@ class HermesCLI:
Reasoning/thinking blocks (<REASONING_SCRATCHPAD>, <think>, etc.)
are suppressed during streaming since they'd display raw XML tags.
The agent strips them from the final response anyway.
A ``None`` value signals an intermediate turn boundary (tools are
about to execute). Flushes any open boxes and resets state so
tool feed lines render cleanly between turns.
"""
if text is None:
self._flush_stream()
self._reset_stream_state()
return
if not text:
return
@@ -3686,6 +3678,18 @@ class HermesCLI:
self._handle_stop_command()
elif canonical == "background":
self._handle_background_command(cmd_original)
elif canonical == "queue":
if not self._agent_running:
_cprint(" /queue only works while Hermes is busy. Just type your message normally.")
else:
# Extract prompt after "/queue " or "/q "
parts = cmd_original.split(None, 1)
payload = parts[1].strip() if len(parts) > 1 else ""
if not payload:
_cprint(" Usage: /queue <prompt>")
else:
self._pending_input.put(payload)
_cprint(f" Queued for the next turn: {payload[:80]}{'...' if len(payload) > 80 else ''}")
elif canonical == "skin":
self._handle_skin_command(cmd_original)
elif canonical == "voice":
@@ -4561,27 +4565,15 @@ class HermesCLI:
# ====================================================================
def _on_tool_progress(self, function_name: str, preview: str, function_args: dict):
"""Called when a tool starts executing.
Updates the TUI spinner widget so the user can see what the agent
is doing during tool execution (fills the gap between thinking
spinner and next response). Also plays audio cue in voice mode.
"""
if not function_name.startswith("_"):
from agent.display import get_tool_emoji
emoji = get_tool_emoji(function_name)
label = preview or function_name
if len(label) > 50:
label = label[:47] + "..."
self._spinner_text = f"{emoji} {label}"
self._invalidate()
"""Called when a tool starts executing. Plays audio cue in voice mode."""
if not self._voice_mode:
return
# Skip internal/thinking tools
if function_name.startswith("_"):
return
try:
from tools.voice_mode import play_beep
# Short, subtle tick sound (higher pitch, very brief)
threading.Thread(
target=play_beep,
kwargs={"frequency": 1200, "duration": 0.06, "count": 1},

View File

@@ -137,6 +137,9 @@ def _deliver_result(job: dict, content: str) -> None:
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
"matrix": Platform.MATRIX,
"mattermost": Platform.MATTERMOST,
"homeassistant": Platform.HOMEASSISTANT,
"dingtalk": Platform.DINGTALK,
"email": Platform.EMAIL,
"sms": Platform.SMS,
}

View File

@@ -182,9 +182,31 @@ class WhatsAppAdapter(BasePlatformAdapter):
# Ensure session directory exists
self._session_path.mkdir(parents=True, exist_ok=True)
# Check if bridge is already running and connected
import aiohttp
import asyncio
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://127.0.0.1:{self._bridge_port}/health",
timeout=aiohttp.ClientTimeout(total=2)
) as resp:
if resp.status == 200:
data = await resp.json()
bridge_status = data.get("status", "unknown")
if bridge_status == "connected":
print(f"[{self.name}] Using existing bridge (status: {bridge_status})")
self._running = True
self._bridge_process = None # Not managed by us
asyncio.create_task(self._poll_messages())
return True
else:
print(f"[{self.name}] Bridge found but not connected (status: {bridge_status}), restarting")
except Exception:
pass # Bridge not running, start a new one
# Kill any orphaned bridge from a previous gateway run
_kill_port_process(self._bridge_port)
import asyncio
await asyncio.sleep(1)
# Start the bridge process in its own process group.
@@ -232,7 +254,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://localhost:{self._bridge_port}/health",
f"http://127.0.0.1:{self._bridge_port}/health",
timeout=aiohttp.ClientTimeout(total=2)
) as resp:
if resp.status == 200:
@@ -264,7 +286,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://localhost:{self._bridge_port}/health",
f"http://127.0.0.1:{self._bridge_port}/health",
timeout=aiohttp.ClientTimeout(total=2)
) as resp:
if resp.status == 200:
@@ -326,9 +348,9 @@ class WhatsAppAdapter(BasePlatformAdapter):
self._bridge_process.kill()
except Exception as e:
print(f"[{self.name}] Error stopping bridge: {e}")
# Also kill any orphaned bridge processes on our port
_kill_port_process(self._bridge_port)
else:
# Bridge was not started by us, don't kill it
print(f"[{self.name}] Disconnecting (external bridge left running)")
self._running = False
self._bridge_process = None
@@ -358,7 +380,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
payload["replyTo"] = reply_to
async with session.post(
f"http://localhost:{self._bridge_port}/send",
f"http://127.0.0.1:{self._bridge_port}/send",
json=payload,
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
@@ -394,7 +416,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://localhost:{self._bridge_port}/edit",
f"http://127.0.0.1:{self._bridge_port}/edit",
json={
"chatId": chat_id,
"messageId": message_id,
@@ -439,7 +461,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
async with aiohttp.ClientSession() as session:
async with session.post(
f"http://localhost:{self._bridge_port}/send-media",
f"http://127.0.0.1:{self._bridge_port}/send-media",
json=payload,
timeout=aiohttp.ClientTimeout(total=120),
) as resp:
@@ -515,7 +537,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
async with aiohttp.ClientSession() as session:
await session.post(
f"http://localhost:{self._bridge_port}/typing",
f"http://127.0.0.1:{self._bridge_port}/typing",
json={"chatId": chat_id},
timeout=aiohttp.ClientTimeout(total=5)
)
@@ -532,7 +554,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://localhost:{self._bridge_port}/chat/{chat_id}",
f"http://127.0.0.1:{self._bridge_port}/chat/{chat_id}",
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
@@ -559,7 +581,7 @@ class WhatsAppAdapter(BasePlatformAdapter):
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"http://localhost:{self._bridge_port}/messages",
f"http://127.0.0.1:{self._bridge_port}/messages",
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
if resp.status == 200:
@@ -621,6 +643,11 @@ class WhatsAppAdapter(BasePlatformAdapter):
print(f"[{self.name}] Failed to cache image: {e}", flush=True)
cached_urls.append(url)
media_types.append("image/jpeg")
elif msg_type == MessageType.PHOTO and os.path.isabs(url):
# Local file path — bridge already downloaded the image
cached_urls.append(url)
media_types.append("image/jpeg")
print(f"[{self.name}] Using bridge-cached image: {url}", flush=True)
elif msg_type == MessageType.VOICE and url.startswith(("http://", "https://")):
try:
cached_path = await cache_audio_from_url(url, ext=".ogg")

View File

@@ -1369,6 +1369,23 @@ class GatewayRunner:
del self._running_agents[_quick_key]
return await self._handle_reset_command(event)
# /queue <prompt> — queue without interrupting
if event.get_command() in ("queue", "q"):
queued_text = event.get_command_args().strip()
if not queued_text:
return "Usage: /queue <prompt>"
adapter = self.adapters.get(source.platform)
if adapter:
from gateway.platforms.base import MessageEvent as _ME, MessageType as _MT
queued_event = _ME(
text=queued_text,
message_type=_MT.TEXT,
source=event.source,
message_id=event.message_id,
)
adapter._pending_messages[_quick_key] = queued_event
return "Queued for the next turn."
if event.message_type == MessageType.PHOTO:
logger.debug("PRIORITY photo follow-up for session %s — queueing without interrupt", _quick_key[:20])
adapter = self.adapters.get(source.platform)

View File

@@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
# ANSI building blocks for conversation display
# =========================================================================
_GOLD = "\033[1;38;2;255;215;0m" # True-color #FFD700 bold
_GOLD = "\033[1;33m"
_BOLD = "\033[1m"
_DIM = "\033[2m"
_RST = "\033[0m"

View File

@@ -67,6 +67,8 @@ COMMAND_REGISTRY: list[CommandDef] = [
gateway_only=True),
CommandDef("background", "Run a prompt in the background", "Session",
aliases=("bg",), args_hint="<prompt>"),
CommandDef("queue", "Queue a prompt for the next turn (doesn't interrupt)", "Session",
aliases=("q",), args_hint="<prompt>"),
CommandDef("status", "Show session info", "Session",
gateway_only=True),
CommandDef("sethome", "Set this chat as the home channel", "Session",

View File

@@ -24,6 +24,7 @@ import json
import asyncio
import os
import logging
import threading
from typing import Dict, Any, List, Optional, Tuple
from tools.registry import registry
@@ -36,6 +37,48 @@ logger = logging.getLogger(__name__)
# Async Bridging (single source of truth -- used by registry.dispatch too)
# =============================================================================
_tool_loop = None # persistent loop for the main (CLI) thread
_tool_loop_lock = threading.Lock()
_worker_thread_local = threading.local() # per-worker-thread persistent loops
def _get_tool_loop():
"""Return a long-lived event loop for running async tool handlers.
Using a persistent loop (instead of asyncio.run() which creates and
*closes* a fresh loop every time) prevents "Event loop is closed"
errors that occur when cached httpx/AsyncOpenAI clients attempt to
close their transport on a dead loop during garbage collection.
"""
global _tool_loop
with _tool_loop_lock:
if _tool_loop is None or _tool_loop.is_closed():
_tool_loop = asyncio.new_event_loop()
return _tool_loop
def _get_worker_loop():
"""Return a persistent event loop for the current worker thread.
Each worker thread (e.g., delegate_task's ThreadPoolExecutor threads)
gets its own long-lived loop stored in thread-local storage. This
prevents the "Event loop is closed" errors that occurred when
asyncio.run() was used per-call: asyncio.run() creates a loop, runs
the coroutine, then *closes* the loop — but cached httpx/AsyncOpenAI
clients remain bound to that now-dead loop and raise RuntimeError
during garbage collection or subsequent use.
By keeping the loop alive for the thread's lifetime, cached clients
stay valid and their cleanup runs on a live loop.
"""
loop = getattr(_worker_thread_local, 'loop', None)
if loop is None or loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_worker_thread_local.loop = loop
return loop
def _run_async(coro):
"""Run an async coroutine from a sync context.
@@ -44,6 +87,15 @@ def _run_async(coro):
disposable thread so asyncio.run() can create its own loop without
conflicting.
For the common CLI path (no running loop), we use a persistent event
loop so that cached async clients (httpx / AsyncOpenAI) remain bound
to a live loop and don't trigger "Event loop is closed" on GC.
When called from a worker thread (parallel tool execution), we use a
per-thread persistent loop to avoid both contention with the main
thread's shared loop AND the "Event loop is closed" errors caused by
asyncio.run()'s create-and-destroy lifecycle.
This is the single source of truth for sync->async bridging in tool
handlers. The RL paths (agent_loop.py, tool_context.py) also provide
outer thread-pool wrapping as defense-in-depth, but each handler is
@@ -55,11 +107,23 @@ def _run_async(coro):
loop = None
if loop and loop.is_running():
# Inside an async context (gateway, RL env) — run in a fresh thread.
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
future = pool.submit(asyncio.run, coro)
return future.result(timeout=300)
return asyncio.run(coro)
# If we're on a worker thread (e.g., parallel tool execution in
# delegate_task), use a per-thread persistent loop. This avoids
# contention with the main thread's shared loop while keeping cached
# httpx/AsyncOpenAI clients bound to a live loop for the thread's
# lifetime — preventing "Event loop is closed" on GC cleanup.
if threading.current_thread() is not threading.main_thread():
worker_loop = _get_worker_loop()
return worker_loop.run_until_complete(coro)
tool_loop = _get_tool_loop()
return tool_loop.run_until_complete(coro)
# =============================================================================

View File

@@ -4838,7 +4838,7 @@ class AIAgent:
spinner.stop(cute_msg)
elif self.quiet_mode:
self._vprint(f" {cute_msg}")
elif self.quiet_mode:
elif self.quiet_mode and not self._has_stream_consumers():
face = random.choice(KawaiiSpinner.KAWAII_WAITING)
emoji = _get_tool_emoji(function_name)
preview = _build_tool_preview(function_name, function_args) or function_name
@@ -6568,19 +6568,7 @@ class AIAgent:
self._vprint(f" ┊ 💬 {clean}")
messages.append(assistant_msg)
# Close any open streaming display (response box, reasoning
# box) before tool execution begins. Intermediate turns may
# have streamed early content that opened the response box;
# flushing here prevents it from wrapping tool feed lines.
# Only signal the display callback — TTS (_stream_callback)
# should NOT receive None (it uses None as end-of-stream).
if self.stream_delta_callback:
try:
self.stream_delta_callback(None)
except Exception:
pass
_msg_count_before_tools = len(messages)
self._execute_tool_calls(assistant_message, messages, effective_task_id, api_call_count)

View File

@@ -18,12 +18,13 @@
* node bridge.js --port 3000 --session ~/.hermes/whatsapp/session
*/
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion } from '@whiskeysockets/baileys';
import { makeWASocket, useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion, downloadMediaMessage } from '@whiskeysockets/baileys';
import express from 'express';
import { Boom } from '@hapi/boom';
import pino from 'pino';
import path from 'path';
import { mkdirSync, readFileSync, existsSync } from 'fs';
import { mkdirSync, readFileSync, writeFileSync, existsSync, readdirSync } from 'fs';
import { randomBytes } from 'crypto';
import qrcode from 'qrcode-terminal';
// Parse CLI args
@@ -41,6 +42,7 @@ const WHATSAPP_DEBUG =
const PORT = parseInt(getArg('port', '3000'), 10);
const SESSION_DIR = getArg('session', path.join(process.env.HOME || '~', '.hermes', 'whatsapp', 'session'));
const IMAGE_CACHE_DIR = path.join(process.env.HOME || '~', '.hermes', 'image_cache');
const PAIR_ONLY = args.includes('--pair-only');
const WHATSAPP_MODE = getArg('mode', process.env.WHATSAPP_MODE || 'self-chat'); // "bot" or "self-chat"
const ALLOWED_USERS = (process.env.WHATSAPP_ALLOWED_USERS || '').split(',').map(s => s.trim()).filter(Boolean);
@@ -55,6 +57,22 @@ function formatOutgoingMessage(message) {
mkdirSync(SESSION_DIR, { recursive: true });
// Build LID → phone reverse map from session files (lid-mapping-{phone}.json)
function buildLidMap() {
const map = {};
try {
for (const f of readdirSync(SESSION_DIR)) {
const m = f.match(/^lid-mapping-(\d+)\.json$/);
if (!m) continue;
const phone = m[1];
const lid = JSON.parse(readFileSync(path.join(SESSION_DIR, f), 'utf8'));
if (lid) map[String(lid)] = phone;
}
} catch {}
return map;
}
let lidToPhone = buildLidMap();
const logger = pino({ level: 'warn' });
// Message queue for polling
@@ -80,9 +98,16 @@ async function startSocket() {
browser: ['Hermes Agent', 'Chrome', '120.0'],
syncFullHistory: false,
markOnlineOnConnect: false,
// Required for Baileys 7.x: without this, incoming messages that need
// E2EE session re-establishment are silently dropped (msg.message === null)
getMessage: async (key) => {
// We don't maintain a message store, so return a placeholder.
// This is enough for Baileys to complete the retry handshake.
return { conversation: '' };
},
});
sock.ev.on('creds.update', saveCreds);
sock.ev.on('creds.update', () => { saveCreds(); lidToPhone = buildLidMap(); });
sock.ev.on('connection.update', (update) => {
const { connection, lastDisconnect, qr } = update;
@@ -120,7 +145,7 @@ async function startSocket() {
}
});
sock.ev.on('messages.upsert', ({ messages, type }) => {
sock.ev.on('messages.upsert', async ({ messages, type }) => {
// In self-chat mode, your own messages commonly arrive as 'append' rather
// than 'notify'. Accept both and filter agent echo-backs below.
if (type !== 'notify' && type !== 'append') return;
@@ -163,9 +188,10 @@ async function startSocket() {
if (!isSelfChat) continue;
}
// Check allowlist for messages from others
if (!msg.key.fromMe && ALLOWED_USERS.length > 0 && !ALLOWED_USERS.includes(senderNumber)) {
continue;
// Check allowlist for messages from others (resolve LID → phone if needed)
if (!msg.key.fromMe && ALLOWED_USERS.length > 0) {
const resolvedNumber = lidToPhone[senderNumber] || senderNumber;
if (!ALLOWED_USERS.includes(resolvedNumber)) continue;
}
// Extract message body
@@ -182,6 +208,18 @@ async function startSocket() {
body = msg.message.imageMessage.caption || '';
hasMedia = true;
mediaType = 'image';
try {
const buf = await downloadMediaMessage(msg, 'buffer', {}, { logger, reuploadRequest: sock.updateMediaMessage });
const mime = msg.message.imageMessage.mimetype || 'image/jpeg';
const extMap = { 'image/jpeg': '.jpg', 'image/png': '.png', 'image/webp': '.webp', 'image/gif': '.gif' };
const ext = extMap[mime] || '.jpg';
mkdirSync(IMAGE_CACHE_DIR, { recursive: true });
const filePath = path.join(IMAGE_CACHE_DIR, `img_${randomBytes(6).toString('hex')}${ext}`);
writeFileSync(filePath, buf);
mediaUrls.push(filePath);
} catch (err) {
console.error('[bridge] Failed to download image:', err.message);
}
} else if (msg.message.videoMessage) {
body = msg.message.videoMessage.caption || '';
hasMedia = true;
@@ -195,6 +233,11 @@ async function startSocket() {
mediaType = 'document';
}
// For media without caption, use a placeholder so the API message is never empty
if (hasMedia && !body) {
body = `[${mediaType} received]`;
}
// Ignore Hermes' own reply messages in self-chat mode to avoid loops.
if (msg.key.fromMe && ((REPLY_PREFIX && body.startsWith(REPLY_PREFIX)) || recentlySentIds.has(msg.key.id))) {
if (WHATSAPP_DEBUG) {
@@ -433,7 +476,7 @@ if (PAIR_ONLY) {
console.log();
startSocket();
} else {
app.listen(PORT, () => {
app.listen(PORT, '127.0.0.1', () => {
console.log(`🌉 WhatsApp bridge listening on port ${PORT} (mode: ${WHATSAPP_MODE})`);
console.log(`📁 Session stored in: ${SESSION_DIR}`);
if (ALLOWED_USERS.length > 0) {

View File

@@ -0,0 +1,307 @@
"""Regression tests for the _run_async() event-loop lifecycle.
These tests verify the fix for GitHub issue #2104:
"Event loop is closed" after vision_analyze used as first call in session.
Root cause: asyncio.run() creates and *closes* a fresh event loop on every
call. Cached httpx/AsyncOpenAI clients that were bound to the now-dead loop
would crash with RuntimeError("Event loop is closed") when garbage-collected.
The fix replaces asyncio.run() with a persistent event loop in _run_async().
"""
import asyncio
import json
import threading
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _get_current_loop():
"""Return the running event loop from inside a coroutine."""
return asyncio.get_event_loop()
async def _create_and_return_transport():
"""Simulate an async client creating a transport on the current loop.
Returns a simple asyncio.Future bound to the running loop so we can
later check whether the loop is still alive.
"""
loop = asyncio.get_event_loop()
fut = loop.create_future()
fut.set_result("ok")
return loop, fut
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestRunAsyncLoopLifecycle:
"""Verify _run_async() keeps the event loop alive after returning."""
def test_loop_not_closed_after_run_async(self):
"""The loop used by _run_async must still be open after the call."""
from model_tools import _run_async
loop = _run_async(_get_current_loop())
assert not loop.is_closed(), (
"_run_async() closed the event loop — cached async clients will "
"crash with 'Event loop is closed' on GC (issue #2104)"
)
def test_same_loop_reused_across_calls(self):
"""Consecutive _run_async calls should reuse the same loop."""
from model_tools import _run_async
loop1 = _run_async(_get_current_loop())
loop2 = _run_async(_get_current_loop())
assert loop1 is loop2, (
"_run_async() created a new loop on the second call — cached "
"async clients from the first call would be orphaned"
)
def test_cached_transport_survives_between_calls(self):
"""A transport/future created in call 1 must be valid in call 2."""
from model_tools import _run_async
loop, fut = _run_async(_create_and_return_transport())
assert not loop.is_closed()
assert fut.result() == "ok"
loop2 = _run_async(_get_current_loop())
assert loop2 is loop, "Loop changed between calls"
assert not loop.is_closed(), "Loop closed before second call"
class TestRunAsyncWorkerThread:
"""Verify worker threads get persistent per-thread loops (delegate_task fix)."""
def test_worker_thread_loop_not_closed(self):
"""A worker thread's loop must stay open after _run_async returns,
so cached httpx/AsyncOpenAI clients don't crash on GC."""
from concurrent.futures import ThreadPoolExecutor
from model_tools import _run_async
def _run_on_worker():
loop = _run_async(_get_current_loop())
still_open = not loop.is_closed()
return loop, still_open
with ThreadPoolExecutor(max_workers=1) as pool:
loop, still_open = pool.submit(_run_on_worker).result()
assert still_open, (
"Worker thread's event loop was closed after _run_async — "
"cached async clients will crash with 'Event loop is closed'"
)
def test_worker_thread_reuses_loop_across_calls(self):
"""Multiple _run_async calls on the same worker thread should
reuse the same persistent loop (not create-and-destroy each time)."""
from concurrent.futures import ThreadPoolExecutor
from model_tools import _run_async
def _run_twice_on_worker():
loop1 = _run_async(_get_current_loop())
loop2 = _run_async(_get_current_loop())
return loop1, loop2
with ThreadPoolExecutor(max_workers=1) as pool:
loop1, loop2 = pool.submit(_run_twice_on_worker).result()
assert loop1 is loop2, (
"Worker thread created different loops for consecutive calls — "
"cached clients from the first call would be orphaned"
)
assert not loop1.is_closed()
def test_parallel_workers_get_separate_loops(self):
"""Different worker threads must get their own loops to avoid
contention (the original reason for the worker-thread branch)."""
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from model_tools import _run_async
barrier = threading.Barrier(3, timeout=5)
def _get_loop_id():
# Use a barrier to force all 3 threads to be alive simultaneously,
# ensuring the ThreadPoolExecutor actually uses 3 distinct threads.
loop = _run_async(_get_current_loop())
barrier.wait()
return id(loop), not loop.is_closed(), threading.current_thread().ident
with ThreadPoolExecutor(max_workers=3) as pool:
futures = [pool.submit(_get_loop_id) for _ in range(3)]
results = [f.result() for f in as_completed(futures)]
loop_ids = {r[0] for r in results}
thread_ids = {r[2] for r in results}
all_open = all(r[1] for r in results)
assert all_open, "At least one worker thread's loop was closed"
# The barrier guarantees 3 distinct threads were used
assert len(thread_ids) == 3, f"Expected 3 threads, got {len(thread_ids)}"
# Each thread should have its own loop
assert len(loop_ids) == 3, (
f"Expected 3 distinct loops for 3 parallel workers, "
f"got {len(loop_ids)} — workers may be contending on a shared loop"
)
def test_worker_loop_separate_from_main_loop(self):
"""Worker thread loops must be different from the main thread's
persistent loop to avoid cross-thread contention."""
from concurrent.futures import ThreadPoolExecutor
from model_tools import _run_async, _get_tool_loop
main_loop = _get_tool_loop()
def _get_worker_loop_id():
loop = _run_async(_get_current_loop())
return id(loop)
with ThreadPoolExecutor(max_workers=1) as pool:
worker_loop_id = pool.submit(_get_worker_loop_id).result()
assert worker_loop_id != id(main_loop), (
"Worker thread used the main thread's loop — this would cause "
"cross-thread contention on the event loop"
)
class TestRunAsyncWithRunningLoop:
"""When a loop is already running, _run_async falls back to a thread."""
@pytest.mark.asyncio
async def test_run_async_from_async_context(self):
"""_run_async should still work when called from inside an
already-running event loop (gateway / Atropos path)."""
from model_tools import _run_async
async def _simple():
return 42
result = await asyncio.get_event_loop().run_in_executor(
None, _run_async, _simple()
)
assert result == 42
# ---------------------------------------------------------------------------
# Integration: full vision_analyze dispatch chain
# ---------------------------------------------------------------------------
def _mock_vision_response():
"""Build a fake LLM response matching async_call_llm's return shape."""
message = SimpleNamespace(content="A cat sitting on a chair.")
choice = SimpleNamespace(index=0, message=message, finish_reason="stop")
return SimpleNamespace(choices=[choice], model="test/vision", usage=None)
class TestVisionDispatchLoopSafety:
"""Simulate the full registry.dispatch('vision_analyze') chain and
verify the event loop stays alive afterwards — the exact scenario
from issue #2104."""
def test_vision_dispatch_keeps_loop_alive(self, tmp_path):
"""After dispatching vision_analyze via the registry, the event
loop must remain open so cached async clients don't crash on GC."""
from model_tools import _run_async, _get_tool_loop
from tools.registry import registry
fake_response = _mock_vision_response()
with (
patch(
"tools.vision_tools.async_call_llm",
new_callable=AsyncMock,
return_value=fake_response,
),
patch(
"tools.vision_tools._download_image",
new_callable=AsyncMock,
side_effect=lambda url, dest, **kw: _write_fake_image(dest),
),
patch(
"tools.vision_tools._validate_image_url",
return_value=True,
),
patch(
"tools.vision_tools._image_to_base64_data_url",
return_value="data:image/jpeg;base64,abc",
),
):
result_json = registry.dispatch(
"vision_analyze",
{"image_url": "https://example.com/cat.png", "question": "What is this?"},
)
result = json.loads(result_json)
assert result.get("success") is True, f"dispatch failed: {result}"
assert "cat" in result.get("analysis", "").lower()
loop = _get_tool_loop()
assert not loop.is_closed(), (
"Event loop closed after vision_analyze dispatch — cached async "
"clients will crash with 'Event loop is closed' (issue #2104)"
)
def test_two_consecutive_vision_dispatches(self, tmp_path):
"""Two back-to-back vision_analyze dispatches must both succeed
and share the same loop (simulates 'first call fails, second
works' from the issue report)."""
from model_tools import _get_tool_loop
from tools.registry import registry
fake_response = _mock_vision_response()
with (
patch(
"tools.vision_tools.async_call_llm",
new_callable=AsyncMock,
return_value=fake_response,
),
patch(
"tools.vision_tools._download_image",
new_callable=AsyncMock,
side_effect=lambda url, dest, **kw: _write_fake_image(dest),
),
patch(
"tools.vision_tools._validate_image_url",
return_value=True,
),
patch(
"tools.vision_tools._image_to_base64_data_url",
return_value="data:image/jpeg;base64,abc",
),
):
args = {"image_url": "https://example.com/cat.png", "question": "Describe"}
r1 = json.loads(registry.dispatch("vision_analyze", args))
loop_after_first = _get_tool_loop()
r2 = json.loads(registry.dispatch("vision_analyze", args))
loop_after_second = _get_tool_loop()
assert r1.get("success") is True
assert r2.get("success") is True
assert loop_after_first is loop_after_second, "Loop changed between dispatches"
assert not loop_after_second.is_closed()
def _write_fake_image(dest):
"""Write minimal bytes so vision_analyze_tool thinks download succeeded."""
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(b"\xff\xd8\xff" + b"\x00" * 16)
return dest

View File

@@ -370,7 +370,7 @@ Important safety rule: cron-run sessions should not recursively schedule more cr
},
"deliver": {
"type": "string",
"description": "Delivery target: origin, local, telegram, discord, signal, sms, or platform:chat_id"
"description": "Delivery target: origin, local, telegram, discord, slack, whatsapp, signal, matrix, mattermost, homeassistant, dingtalk, email, sms, or platform:chat_id"
},
"model": {
"type": "string",

View File

@@ -124,6 +124,10 @@ def _handle_send(args):
"slack": Platform.SLACK,
"whatsapp": Platform.WHATSAPP,
"signal": Platform.SIGNAL,
"matrix": Platform.MATRIX,
"mattermost": Platform.MATTERMOST,
"homeassistant": Platform.HOMEASSISTANT,
"dingtalk": Platform.DINGTALK,
"email": Platform.EMAIL,
"sms": Platform.SMS,
}