Compare commits

..

2 Commits

Author SHA1 Message Date
teknium1 d9122ac936 feat: use Codex-style compaction prompt for context compression
Replace the generic summarization prompt ('Summarize these conversation
turns concisely') with a task-oriented handoff prompt inspired by
OpenAI's Codex CLI compaction flow (researched in #499).

The new prompt frames compression as a 'CONTEXT CHECKPOINT COMPACTION'
and instructs the summarization model to produce a structured handoff
summary that includes:
- Current progress and key decisions
- User preferences and constraints discovered
- Clear next steps remaining
- Critical data (file paths, URLs, error messages, code snippets)
- Tool calls made and their key results

This produces better summaries because the model understands the summary
will be used by another LLM to continue the work, rather than treating
it as a generic text compression task.

No behavioral change to the compression algorithm itself — same
positional protection, same role alternation, same [CONTEXT SUMMARY]:
prefix. Only the prompt sent to the summarization model changes.

Inspired by PR #776 by @kshitijk4poor.
2026-03-11 05:38:20 -07:00
aydnOktay 9149c34a26 refactor(slack): replace print statements with structured logging
Replaces all ad-hoc print() calls in the Slack gateway adapter with
proper logging.getLogger(__name__) calls, matching the pattern already
used by every other platform adapter (telegram, discord, whatsapp,
signal, homeassistant).

Changes:
- Add import logging + module-level logger
- Use logger.error for failures, logger.warning for non-critical
  fallbacks, logger.info for status, logger.debug for routine ops
- Add exc_info=True for full stack traces on all error/warning paths
- Use %s format strings (lazy evaluation) instead of f-strings
- Wrap disconnect() in try/except for safety
- Add structured context (file paths, channel IDs, URLs) to log messages
- Convert document handling prints added after the original PR

Cherry-picked from PR #778 by aydnOktay, rebased onto current main
with conflict resolution and extended to cover document/video methods
added since the PR was created.

Co-authored-by: aydnOktay <xaydinoktay@gmail.com>
2026-03-11 05:34:43 -07:00
7 changed files with 105 additions and 620 deletions
+18 -16
View File
@@ -103,22 +103,24 @@ class ContextCompressor:
parts.append(f"[{role.upper()}]: {content}")
content_to_summarize = "\n\n".join(parts)
prompt = f"""Summarize these conversation turns concisely. This summary will replace these turns in the conversation history.
Write from a neutral perspective describing:
1. What actions were taken (tool calls, searches, file operations)
2. Key information or results obtained
3. Important decisions or findings
4. Relevant data, file names, or outputs
Keep factual and informative. Target ~{self.summary_target_tokens} tokens.
---
TURNS TO SUMMARIZE:
{content_to_summarize}
---
Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix."""
prompt = (
"You are performing a CONTEXT CHECKPOINT COMPACTION. Create a handoff "
"summary for the AI assistant that will resume this conversation.\n\n"
"Include:\n"
"- Current progress and key decisions made\n"
"- Important context, constraints, or user preferences discovered\n"
"- What remains to be done (clear next steps)\n"
"- Any critical data: file paths, variable names, URLs, error messages, "
"or code snippets needed to continue\n"
"- Tool calls made and their key results\n\n"
"Be concise, structured, and focused on helping the assistant seamlessly "
"continue the work without re-doing what's already been done.\n\n"
f"Target roughly {self.summary_target_tokens} tokens.\n\n"
"---\n"
f"TURNS TO SUMMARIZE:\n{content_to_summarize}\n"
"---\n\n"
'Write only the summary, starting with "[CONTEXT SUMMARY]:" prefix.'
)
# 1. Try the auxiliary model (cheap/fast)
if self.client:
+1 -28
View File
@@ -1253,7 +1253,6 @@ class HermesCLI:
# Background task tracking: {task_id: threading.Thread}
self._background_tasks: Dict[str, threading.Thread] = {}
self._background_task_counter = 0
self._stream_buf = ""
def _invalidate(self, min_interval: float = 0.25) -> None:
"""Throttled UI repaint — prevents terminal blinking on slow/SSH connections."""
@@ -1496,7 +1495,6 @@ class HermesCLI:
platform="cli",
session_db=self._session_db,
clarify_callback=self._clarify_callback,
stream_delta_callback=self._stream_delta,
honcho_session_key=self.session_id,
fallback_model=self._fallback_model,
thinking_callback=self._on_thinking,
@@ -3341,28 +3339,6 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
_stream_started = False
def _stream_delta(self, text: str):
"""Buffer streaming tokens; emit complete lines via _cprint."""
if not text:
return
if not self._stream_started:
text = text.lstrip("\n")
if not text:
return
self._stream_started = True
self._stream_buf += text
while "\n" in self._stream_buf:
line, self._stream_buf = self._stream_buf.split("\n", 1)
_cprint(line)
def _flush_stream(self):
"""Emit any remaining partial line from the stream buffer."""
if self._stream_buf:
_cprint(self._stream_buf)
self._stream_buf = ""
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
@@ -3491,8 +3467,6 @@ class HermesCLI:
# Add user message to history
self.conversation_history.append({"role": "user", "content": message})
self._stream_buf = ""
self._stream_started = False
_cprint(f"{_GOLD}{'' * 40}{_RST}")
print(flush=True)
@@ -3540,7 +3514,6 @@ class HermesCLI:
agent_thread.join(0.1)
agent_thread.join() # Ensure agent thread completes
self._flush_stream()
# Drain any remaining agent output still in the StdoutProxy
# buffer so tool/status lines render ABOVE our response box.
@@ -3569,7 +3542,7 @@ class HermesCLI:
if response and pending_message:
response = response + "\n\n---\n_[Interrupted - processing new message]_"
if response and not (self.agent and self.agent.stream_delta_callback):
if response:
# Use a Rich Panel for the response box — adapts to terminal
# width at render time instead of hard-coding border length.
try:
+3 -42
View File
@@ -413,36 +413,6 @@ class BasePlatformAdapter(ABC):
"""
return SendResult(success=False, error="Not supported")
@property
def supports_streaming(self) -> bool:
"""Whether this platform supports response streaming via message edits."""
return False
@property
def supports_draft_streaming(self) -> bool:
"""Whether this platform supports native draft streaming (Bot API 9.3+)."""
return False
async def send_draft(self, chat_id: str, draft_id: int, text: str, metadata: dict = None) -> bool:
"""Push a draft text update. Override in subclasses."""
return False
async def finalize_draft(self, chat_id: str, content: str, metadata: dict = None) -> "SendResult":
"""Finalize a draft stream with the completed message."""
return SendResult(success=False, error="Not supported")
async def delete_message(self, chat_id: str, message_id: str) -> SendResult:
"""Delete a previously sent message."""
return SendResult(success=False, error="Not supported")
async def send_raw(self, chat_id: str, content: str, metadata: dict = None) -> "SendResult":
"""Send without formatting (default: delegates to send)."""
return await self.send(chat_id=chat_id, content=content, metadata=metadata)
async def edit_message_raw(self, chat_id: str, message_id: str, content: str) -> "SendResult":
"""Edit without formatting (default: delegates to edit_message)."""
return await self.edit_message(chat_id=chat_id, message_id=message_id, content=content)
async def send_typing(self, chat_id: str, metadata=None) -> None:
"""
Send a typing indicator.
@@ -727,20 +697,11 @@ class BasePlatformAdapter(ABC):
try:
# Call the handler (this can take a while with tool calls)
handler_result = await self._message_handler(event)
# Normalise: handler may return str or dict(content, already_sent)
already_sent = False
if isinstance(handler_result, dict):
response = handler_result.get("content") or ""
already_sent = handler_result.get("already_sent", False)
else:
response = handler_result
response = await self._message_handler(event)
# Send response if any
if not response:
if not already_sent:
logger.warning("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
logger.warning("[%s] Handler returned empty/None response for %s", self.name, event.source.chat_id)
if response:
# Extract MEDIA:<path> tags (from TTS tool) before other processing
media_files, response = self.extract_media(response)
@@ -751,7 +712,7 @@ class BasePlatformAdapter(ABC):
logger.info("[%s] extract_images found %d image(s) in response (%d chars)", self.name, len(images), len(response))
# Send the text portion first (if any remains after extractions)
if text_content and not already_sent:
if text_content:
logger.info("[%s] Sending response (%d chars) to %s", self.name, len(text_content), event.source.chat_id)
result = await self.send(
chat_id=event.source.chat_id,
+80 -28
View File
@@ -9,6 +9,7 @@ Uses slack-bolt (Python) with Socket Mode for:
"""
import asyncio
import logging
import os
import re
from typing import Dict, List, Optional, Any
@@ -41,6 +42,9 @@ from gateway.platforms.base import (
)
logger = logging.getLogger(__name__)
def check_slack_requirements() -> bool:
"""Check if Slack dependencies are available."""
return SLACK_AVAILABLE
@@ -73,17 +77,19 @@ class SlackAdapter(BasePlatformAdapter):
async def connect(self) -> bool:
"""Connect to Slack via Socket Mode."""
if not SLACK_AVAILABLE:
print("[Slack] slack-bolt not installed. Run: pip install slack-bolt")
logger.error(
"[Slack] slack-bolt not installed. Run: pip install slack-bolt",
)
return False
bot_token = self.config.token
app_token = os.getenv("SLACK_APP_TOKEN")
if not bot_token:
print("[Slack] SLACK_BOT_TOKEN not set")
logger.error("[Slack] SLACK_BOT_TOKEN not set")
return False
if not app_token:
print("[Slack] SLACK_APP_TOKEN not set")
logger.error("[Slack] SLACK_APP_TOKEN not set")
return False
try:
@@ -117,19 +123,22 @@ class SlackAdapter(BasePlatformAdapter):
asyncio.create_task(self._handler.start_async())
self._running = True
print(f"[Slack] Connected as @{bot_name} (Socket Mode)")
logger.info("[Slack] Connected as @%s (Socket Mode)", bot_name)
return True
except Exception as e:
print(f"[Slack] Connection failed: {e}")
except Exception as e: # pragma: no cover - defensive logging
logger.error("[Slack] Connection failed: %s", e, exc_info=True)
return False
async def disconnect(self) -> None:
"""Disconnect from Slack."""
if self._handler:
await self._handler.close_async()
try:
await self._handler.close_async()
except Exception as e: # pragma: no cover - defensive logging
logger.warning("[Slack] Error while closing Socket Mode handler: %s", e, exc_info=True)
self._running = False
print("[Slack] Disconnected")
logger.info("[Slack] Disconnected")
async def send(
self,
@@ -162,8 +171,8 @@ class SlackAdapter(BasePlatformAdapter):
raw_response=result,
)
except Exception as e:
print(f"[Slack] Send error: {e}")
except Exception as e: # pragma: no cover - defensive logging
logger.error("[Slack] Send error: %s", e, exc_info=True)
return SendResult(success=False, error=str(e))
async def edit_message(
@@ -182,7 +191,14 @@ class SlackAdapter(BasePlatformAdapter):
text=content,
)
return SendResult(success=True, message_id=message_id)
except Exception as e:
except Exception as e: # pragma: no cover - defensive logging
logger.error(
"[Slack] Failed to edit message %s in channel %s: %s",
message_id,
chat_id,
e,
exc_info=True,
)
return SendResult(success=False, error=str(e))
async def send_typing(self, chat_id: str, metadata=None) -> None:
@@ -214,8 +230,14 @@ class SlackAdapter(BasePlatformAdapter):
)
return SendResult(success=True, raw_response=result)
except Exception as e:
print(f"[{self.name}] Failed to send local image: {e}")
except Exception as e: # pragma: no cover - defensive logging
logger.error(
"[%s] Failed to send local Slack image %s: %s",
self.name,
image_path,
e,
exc_info=True,
)
return await super().send_image_file(chat_id, image_path, caption, reply_to)
async def send_image(
@@ -247,7 +269,13 @@ class SlackAdapter(BasePlatformAdapter):
return SendResult(success=True, raw_response=result)
except Exception as e:
except Exception as e: # pragma: no cover - defensive logging
logger.warning(
"[Slack] Failed to upload image from URL %s, falling back to text: %s",
image_url,
e,
exc_info=True,
)
# Fall back to sending the URL as text
text = f"{caption}\n{image_url}" if caption else image_url
return await self.send(chat_id=chat_id, content=text, reply_to=reply_to)
@@ -273,7 +301,13 @@ class SlackAdapter(BasePlatformAdapter):
)
return SendResult(success=True, raw_response=result)
except Exception as e:
except Exception as e: # pragma: no cover - defensive logging
logger.error(
"[Slack] Failed to send audio file %s: %s",
audio_path,
e,
exc_info=True,
)
return SendResult(success=False, error=str(e))
async def send_video(
@@ -300,8 +334,14 @@ class SlackAdapter(BasePlatformAdapter):
)
return SendResult(success=True, raw_response=result)
except Exception as e:
print(f"[{self.name}] Failed to send video: {e}")
except Exception as e: # pragma: no cover - defensive logging
logger.error(
"[%s] Failed to send video %s: %s",
self.name,
video_path,
e,
exc_info=True,
)
return await super().send_video(chat_id, video_path, caption, reply_to)
async def send_document(
@@ -331,8 +371,14 @@ class SlackAdapter(BasePlatformAdapter):
)
return SendResult(success=True, raw_response=result)
except Exception as e:
print(f"[{self.name}] Failed to send document: {e}")
except Exception as e: # pragma: no cover - defensive logging
logger.error(
"[%s] Failed to send document %s: %s",
self.name,
file_path,
e,
exc_info=True,
)
return await super().send_document(chat_id, file_path, caption, file_name, reply_to)
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
@@ -348,7 +394,13 @@ class SlackAdapter(BasePlatformAdapter):
"name": channel.get("name", chat_id),
"type": "dm" if is_dm else "group",
}
except Exception:
except Exception as e: # pragma: no cover - defensive logging
logger.error(
"[Slack] Failed to fetch chat info for %s: %s",
chat_id,
e,
exc_info=True,
)
return {"name": chat_id, "type": "unknown"}
# ----- Internal handlers -----
@@ -403,8 +455,8 @@ class SlackAdapter(BasePlatformAdapter):
media_urls.append(cached)
media_types.append(mimetype)
msg_type = MessageType.PHOTO
except Exception as e:
print(f"[Slack] Failed to cache image: {e}", flush=True)
except Exception as e: # pragma: no cover - defensive logging
logger.warning("[Slack] Failed to cache image from %s: %s", url, e, exc_info=True)
elif mimetype.startswith("audio/") and url:
try:
ext = "." + mimetype.split("/")[-1].split(";")[0]
@@ -414,8 +466,8 @@ class SlackAdapter(BasePlatformAdapter):
media_urls.append(cached)
media_types.append(mimetype)
msg_type = MessageType.VOICE
except Exception as e:
print(f"[Slack] Failed to cache audio: {e}", flush=True)
except Exception as e: # pragma: no cover - defensive logging
logger.warning("[Slack] Failed to cache audio from %s: %s", url, e, exc_info=True)
elif url:
# Try to handle as a document attachment
try:
@@ -437,7 +489,7 @@ class SlackAdapter(BasePlatformAdapter):
file_size = f.get("size", 0)
MAX_DOC_BYTES = 20 * 1024 * 1024
if not file_size or file_size > MAX_DOC_BYTES:
print(f"[Slack] Document too large or unknown size: {file_size}", flush=True)
logger.warning("[Slack] Document too large or unknown size: %s", file_size)
continue
# Download and cache
@@ -449,7 +501,7 @@ class SlackAdapter(BasePlatformAdapter):
media_urls.append(cached_path)
media_types.append(doc_mime)
msg_type = MessageType.DOCUMENT
print(f"[Slack] Cached user document: {cached_path}", flush=True)
logger.debug("[Slack] Cached user document: %s", cached_path)
# Inject text content for .txt/.md files (capped at 100 KB)
MAX_TEXT_INJECT_BYTES = 100 * 1024
@@ -466,8 +518,8 @@ class SlackAdapter(BasePlatformAdapter):
except UnicodeDecodeError:
pass # Binary content, skip injection
except Exception as e:
print(f"[Slack] Failed to cache document: {e}", flush=True)
except Exception as e: # pragma: no cover - defensive logging
logger.warning("[Slack] Failed to cache document from %s: %s", url, e, exc_info=True)
# Build source
source = self.build_source(
-93
View File
@@ -299,99 +299,6 @@ class TelegramAdapter(BasePlatformAdapter):
)
return SendResult(success=False, error=str(e))
async def send_raw(
self, chat_id: str, content: str, metadata: dict = None,
) -> SendResult:
"""Send a plain-text message without MarkdownV2 formatting."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
thread_id = metadata.get("thread_id") if metadata else None
msg = await self._bot.send_message(
chat_id=int(chat_id), text=content, parse_mode=None,
message_thread_id=int(thread_id) if thread_id else None,
)
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
return SendResult(success=False, error=str(e))
async def edit_message_raw(
self, chat_id: str, message_id: str, content: str,
) -> SendResult:
"""Edit a message with plain text (no MarkdownV2 formatting)."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
await self._bot.edit_message_text(
chat_id=int(chat_id), message_id=int(message_id),
text=content, parse_mode=None,
)
return SendResult(success=True, message_id=message_id)
except Exception as e:
return SendResult(success=False, error=str(e))
@property
def supports_streaming(self) -> bool:
return True
@property
def supports_draft_streaming(self) -> bool:
"""Whether this adapter supports Telegram Bot API sendMessageDraft (9.3+)."""
return True
async def send_draft(
self, chat_id: str, draft_id: int, text: str, metadata: dict = None,
) -> bool:
"""Push a draft update via sendMessageDraft (Bot API 9.3+)."""
if not self._bot:
return False
try:
thread_id = metadata.get("thread_id") if metadata else None
return await self._bot.send_message_draft(
chat_id=int(chat_id), draft_id=draft_id, text=text,
parse_mode=None,
message_thread_id=int(thread_id) if thread_id else None,
)
except Exception as e:
logger.warning("[%s] send_message_draft failed: %s", self.name, e)
return False
async def finalize_draft(
self, chat_id: str, content: str, metadata: dict = None,
) -> SendResult:
"""Finalize a draft stream by sending the completed message with formatting."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
thread_id = metadata.get("thread_id") if metadata else None
formatted = self.format_message(content)
try:
msg = await self._bot.send_message(
chat_id=int(chat_id), text=formatted,
parse_mode=ParseMode.MARKDOWN_V2,
message_thread_id=int(thread_id) if thread_id else None,
)
except Exception:
msg = await self._bot.send_message(
chat_id=int(chat_id), text=content, parse_mode=None,
message_thread_id=int(thread_id) if thread_id else None,
)
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
return SendResult(success=False, error=str(e))
async def delete_message(self, chat_id: str, message_id: str) -> SendResult:
"""Delete a Telegram message."""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
await self._bot.delete_message(
chat_id=int(chat_id), message_id=int(message_id),
)
return SendResult(success=True, message_id=message_id)
except Exception as e:
return SendResult(success=False, error=str(e))
async def send_voice(
self,
chat_id: str,
+3 -156
View File
@@ -175,7 +175,6 @@ class AIAgent:
thinking_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_delta_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
@@ -263,7 +262,6 @@ class AIAgent:
self.thinking_callback = thinking_callback
self.clarify_callback = clarify_callback
self.step_callback = step_callback
self.stream_delta_callback = stream_delta_callback
self._last_reported_tool = None # Track for "new tool" mode
# Interrupt mechanism for breaking out of tool loops
@@ -2062,147 +2060,6 @@ class AIAgent:
return terminal_response
raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.")
def _interruptible_streaming_api_call(self, api_kwargs: dict, on_first_delta=None):
"""Streaming variant of _interruptible_api_call for chat_completions.
Fires self.stream_delta_callback(text) as content tokens arrive and
accumulates the full response into a SimpleNamespace matching the shape
downstream code expects. Falls back to the non-streaming path when the
provider rejects the stream request.
"""
from types import SimpleNamespace
result = {"response": None, "error": None}
first_delta_fired = [False]
def _stream():
try:
stream_kwargs = {**api_kwargs, "stream": True,
"stream_options": {"include_usage": True}}
stream_resp = self.client.chat.completions.create(**stream_kwargs)
content_parts = []
tool_calls_acc = {}
finish_reason = "stop"
usage = None
reasoning_content = None
model = None
has_tool_calls = False
try:
for chunk in stream_resp:
if not chunk.choices:
if hasattr(chunk, "usage") and chunk.usage:
usage = chunk.usage
continue
choice = chunk.choices[0]
if choice.finish_reason:
finish_reason = choice.finish_reason
if model is None and hasattr(chunk, "model"):
model = chunk.model
delta = choice.delta
if delta is None:
continue
if delta.content:
content_parts.append(delta.content)
if not first_delta_fired[0]:
first_delta_fired[0] = True
if on_first_delta:
on_first_delta()
if self.stream_delta_callback and not has_tool_calls:
try:
self.stream_delta_callback(delta.content)
except Exception:
pass
if delta.tool_calls:
has_tool_calls = True
for tc_delta in delta.tool_calls:
idx = tc_delta.index
if idx not in tool_calls_acc:
tool_calls_acc[idx] = {
"id": tc_delta.id or "",
"type": tc_delta.type or "function",
"function": {
"name": getattr(tc_delta.function, "name", None) or "",
"arguments": getattr(tc_delta.function, "arguments", None) or "",
},
}
else:
entry = tool_calls_acc[idx]
if tc_delta.id:
entry["id"] = tc_delta.id
fn = tc_delta.function
if fn:
if fn.name:
entry["function"]["name"] = fn.name
if fn.arguments:
entry["function"]["arguments"] += fn.arguments
rc = getattr(delta, "reasoning_content", None) or getattr(delta, "reasoning", None)
if rc:
reasoning_content = (reasoning_content or "") + rc
finally:
close_fn = getattr(stream_resp, "close", None)
if callable(close_fn):
try:
close_fn()
except Exception:
pass
tool_calls_list = None
if tool_calls_acc:
tool_calls_list = [
SimpleNamespace(
id=tc["id"], call_id=tc["id"], type=tc["type"],
function=SimpleNamespace(name=tc["function"]["name"],
arguments=tc["function"]["arguments"]),
)
for idx, tc in sorted(tool_calls_acc.items())
]
message = SimpleNamespace(
content="".join(content_parts) or None,
tool_calls=tool_calls_list,
reasoning=reasoning_content,
reasoning_content=reasoning_content,
reasoning_details=None,
)
result["response"] = SimpleNamespace(
choices=[SimpleNamespace(message=message, finish_reason=finish_reason)],
usage=usage,
model=model,
)
except Exception as e:
result["error"] = e
t = threading.Thread(target=_stream, daemon=True)
t.start()
while t.is_alive():
t.join(timeout=0.3)
if self._interrupt_requested:
try:
self.client.close()
except Exception:
pass
try:
self.client = OpenAI(**self._client_kwargs)
except Exception:
pass
raise InterruptedError("Agent interrupted during streaming API call")
if result["error"] is not None:
err = result["error"]
err_str = str(err).lower()
if any(kw in err_str for kw in ("stream", "not support", "unsupported")):
logger.debug("Streaming failed (%s), falling back to non-streaming.", err)
return self._interruptible_api_call(api_kwargs)
raise err
return result["response"]
def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool:
if self.api_mode != "codex_responses" or self.provider != "openai-codex":
return False
@@ -3617,17 +3474,7 @@ class AIAgent:
if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}:
self._dump_api_request_debug(api_kwargs, reason="preflight")
if self.stream_delta_callback and self.api_mode != "codex_responses":
def _stop_spinner():
nonlocal thinking_spinner
if thinking_spinner:
thinking_spinner.stop("")
thinking_spinner = None
response = self._interruptible_streaming_api_call(
api_kwargs, on_first_delta=_stop_spinner)
else:
response = self._interruptible_api_call(api_kwargs)
response = self._interruptible_api_call(api_kwargs)
api_duration = time.time() - api_start_time
@@ -4383,8 +4230,8 @@ class AIAgent:
turn_content = assistant_message.content or ""
if turn_content and self._has_content_after_think_block(turn_content):
self._last_content_with_tools = turn_content
# Show intermediate commentary — skip when streaming (already in buffer)
if self.quiet_mode and not self.stream_delta_callback:
# Show intermediate commentary so the user can follow along
if self.quiet_mode:
clean = self._strip_think_blocks(turn_content).strip()
if clean:
print(f" ┊ 💬 {clean}")
-257
View File
@@ -1,257 +0,0 @@
"""Tests for streaming token output — accumulator shape, callback order, fallback."""
import queue
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock, patch, call
import pytest
from run_agent import AIAgent
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_tool_defs(*names):
return [
{"type": "function", "function": {"name": n, "description": f"{n}", "parameters": {"type": "object", "properties": {}}}}
for n in names
]
@pytest.fixture()
def agent():
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
cb = MagicMock()
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
stream_delta_callback=cb,
)
a.client = MagicMock()
a._stream_cb = cb
return a
# ---------------------------------------------------------------------------
# Helpers — fake streaming chunks
# ---------------------------------------------------------------------------
def _chunk(content=None, tool_call_delta=None, finish_reason=None, usage=None, model=None):
delta = SimpleNamespace(content=content, tool_calls=tool_call_delta,
reasoning_content=None, reasoning=None)
choice = SimpleNamespace(delta=delta, finish_reason=finish_reason)
c = SimpleNamespace(choices=[choice])
if usage is not None:
c.usage = SimpleNamespace(**usage)
if model:
c.model = model
return c
def _usage_chunk(**kw):
c = SimpleNamespace(choices=[], usage=SimpleNamespace(**kw))
return c
def _tc_delta(index, id=None, name=None, arguments=None, type=None):
fn = SimpleNamespace(name=name, arguments=arguments)
return SimpleNamespace(index=index, id=id, type=type, function=fn)
# ---------------------------------------------------------------------------
# Tests: accumulator shape
# ---------------------------------------------------------------------------
class TestStreamingAccumulator:
def test_text_only_response(self, agent):
"""Streaming text-only response produces correct synthetic shape."""
chunks = [
_chunk(content="Hello", model="test/m"),
_chunk(content=" world"),
_chunk(finish_reason="stop"),
_usage_chunk(prompt_tokens=10, completion_tokens=5, total_tokens=15),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"model": "test"})
assert resp.choices[0].message.content == "Hello world"
assert resp.choices[0].message.tool_calls is None
assert resp.choices[0].finish_reason == "stop"
assert resp.usage.prompt_tokens == 10
assert resp.model == "test/m"
def test_tool_call_response(self, agent):
"""Streaming tool-call response accumulates function name + arguments."""
chunks = [
_chunk(tool_call_delta=[_tc_delta(0, id="call_1", name="web_search", arguments='{"q', type="function")]),
_chunk(tool_call_delta=[_tc_delta(0, arguments='uery": "hi"}')]),
_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"model": "test"})
tc = resp.choices[0].message.tool_calls
assert tc is not None
assert len(tc) == 1
assert tc[0].id == "call_1"
assert tc[0].function.name == "web_search"
assert tc[0].function.arguments == '{"query": "hi"}'
assert resp.choices[0].finish_reason == "tool_calls"
def test_mixed_content_and_tool_calls(self, agent):
"""Content + tool calls in same stream are both accumulated."""
chunks = [
_chunk(content="Let me check."),
_chunk(tool_call_delta=[_tc_delta(0, id="c1", name="web_search", arguments="{}", type="function")]),
_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"model": "test"})
assert resp.choices[0].message.content == "Let me check."
assert len(resp.choices[0].message.tool_calls) == 1
class TestStreamingCallbacks:
def test_deltas_fire_in_order(self, agent):
"""stream_delta_callback receives content deltas in order."""
received = []
agent.stream_delta_callback = lambda t: received.append(t)
chunks = [_chunk(content="a"), _chunk(content="b"), _chunk(content="c"), _chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"model": "test"})
assert received == ["a", "b", "c"]
def test_on_first_delta_fires_once(self, agent):
first = MagicMock()
chunks = [_chunk(content="x"), _chunk(content="y"), _chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"model": "test"}, on_first_delta=first)
first.assert_called_once()
def test_tool_only_does_not_fire_callback(self, agent):
"""Tool-call-only stream does not invoke stream_delta_callback."""
received = []
agent.stream_delta_callback = lambda t: received.append(t)
chunks = [
_chunk(tool_call_delta=[_tc_delta(0, id="c1", name="t", arguments="{}", type="function")]),
_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"model": "test"})
assert received == []
class TestStreamingFallback:
def test_stream_error_falls_back(self, agent):
"""When streaming fails with 'not support', falls back to non-streaming."""
agent.client.chat.completions.create.side_effect = [
Exception("streaming not supported by this provider"),
SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="ok", tool_calls=None, reasoning=None, reasoning_content=None, reasoning_details=None),
finish_reason="stop",
)],
usage=None,
model="test/m",
),
]
resp = agent._interruptible_streaming_api_call({"model": "test"})
assert resp.choices[0].message.content == "ok"
assert agent.client.chat.completions.create.call_count == 2
def test_non_stream_error_raises(self, agent):
"""Non-stream-related errors propagate normally."""
agent.client.chat.completions.create.side_effect = ValueError("bad request")
with pytest.raises(ValueError, match="bad request"):
agent._interruptible_streaming_api_call({"model": "test"})
# ---------------------------------------------------------------------------
# Tests: base.py already_sent contract
# ---------------------------------------------------------------------------
class TestAlreadySentContract:
def _make_adapter(self, send_side_effect=None):
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.config import Platform, PlatformConfig
class FakeAdapter(BasePlatformAdapter):
async def connect(self): return True
async def disconnect(self): pass
async def get_chat_info(self, chat_id): return {"name": "test"}
async def send(self, chat_id, content, reply_to=None, metadata=None):
if send_side_effect is not None:
send_side_effect(content)
return SendResult(success=True, message_id="1")
cfg = PlatformConfig(enabled=True)
adapter = FakeAdapter(cfg, Platform.TELEGRAM)
adapter._running = True
return adapter
@pytest.mark.asyncio
async def test_already_sent_skips_send(self):
"""Handler returning already_sent=True prevents base from calling send()."""
from gateway.platforms.base import MessageEvent
from gateway.config import Platform
from gateway.session import SessionSource
sent = []
adapter = self._make_adapter(send_side_effect=lambda c: sent.append(c))
async def handler(event):
return {"content": "hello", "already_sent": True}
adapter.set_message_handler(handler)
event = MessageEvent(
text="hi",
source=SessionSource(platform=Platform.TELEGRAM, chat_id="1", user_id="u1"),
)
await adapter._process_message_background(event, "s1")
assert sent == [], "send() should not be called when already_sent=True"
@pytest.mark.asyncio
async def test_string_response_sends_normally(self):
"""Handler returning a plain string triggers send() as before."""
from gateway.platforms.base import MessageEvent
from gateway.config import Platform
from gateway.session import SessionSource
sent = []
adapter = self._make_adapter(send_side_effect=lambda c: sent.append(c))
async def handler(event):
return "hello"
adapter.set_message_handler(handler)
event = MessageEvent(
text="hi",
source=SessionSource(platform=Platform.TELEGRAM, chat_id="1", user_id="u1"),
)
await adapter._process_message_background(event, "s1")
assert "hello" in sent