Compare commits

...

10 Commits

Author SHA1 Message Date
teknium1 67cf37fc26 fix: head+tail truncation for execute_code stdout (inspired by openclaw context-pruning)
Previously, _drain() only captured the first MAX_STDOUT_BYTES (50KB) of
stdout, silently dropping all tail output. Scripts that print() their
final results at the end would have those results lost.

Now uses a two-buffer approach: 40% head + 60% tail (rolling window).
This matches the pattern already used in terminal_tool.py (line 1042-1051)
but gives the tail more space since execute_code scripts typically
print() their final results at the end.

Inspired by openclaw's softTrim context-pruning (headChars/tailChars).
2026-03-09 02:15:48 -07:00
teknium1 a2d0d07109 Merge PR #754: fix: stabilize system prompt across gateway turns for cache hits
Prevents unnecessary Anthropic prompt cache misses by reusing stored
system prompts for continuing sessions and stabilizing Honcho context
per session instead of per turn.
2026-03-09 02:00:14 -07:00
teknium1 aedb773f0d fix: stabilize system prompt across gateway turns for cache hits
Two changes to prevent unnecessary Anthropic prompt cache misses in the
gateway, where a fresh AIAgent is created per user message:

1. Reuse stored system prompt for continuing sessions:
   When conversation_history is non-empty, load the system prompt from
   the session DB instead of rebuilding from disk. The model already has
   updated memory in its conversation history (it wrote it!), so
   re-reading memory from disk produces a different system prompt that
   breaks the cache prefix.

2. Stabilize Honcho context per session:
   - Only prefetch Honcho context on the first turn (empty history)
   - Bake Honcho context into the cached system prompt and store to DB
   - Remove the per-turn Honcho injection from the API call loop

   This ensures the system message is identical across all turns in a
   session. Previously, re-fetching Honcho could return different context
   on each turn, changing the system message and invalidating the cache.

Both changes preserve the existing behavior for compression (which
invalidates the prompt and rebuilds from scratch) and for the CLI
(where the same AIAgent persists and the cached prompt is already
stable across turns).

Tests: 2556 passed (6 new)
2026-03-09 01:50:58 -07:00
teknium1 aaf8f2d2d2 feat: expand secret redaction patterns
Added 14 new redaction patterns, all with distinctive prefixes
that have near-zero false positive risk:

Prefix patterns:
  - AWS Access Key ID (AKIA...)
  - Stripe keys (sk_live_, sk_test_, rk_live_)
  - SendGrid (SG....)
  - HuggingFace (hf_...)
  - Replicate (r8_...)
  - npm tokens (npm_...)
  - PyPI tokens (pypi-...)
  - DigitalOcean PATs (dop_v1_, doo_v1_)
  - AgentMail (am_...)

Structural patterns:
  - Private key blocks (-----BEGIN...PRIVATE KEY-----)
  - Database connection string passwords (postgres://user:PASS@host)
2026-03-09 01:28:27 -07:00
teknium1 12f4800631 docs: add security.redact_secrets as commented config section
Moved redact_secrets out of DEFAULT_CONFIG (it's on by default when
unset) and into the commented sections at the bottom of config.yaml,
alongside fallback_model. Users can see the option and uncomment to
disable.
2026-03-09 01:12:49 -07:00
teknium1 57b48a81ca feat: add config toggle to disable secret redaction
New config option:

  security:
    redact_secrets: false  # default: true

When set to false, API keys, tokens, and passwords are shown in
full in read_file, search_files, and terminal output. Useful for
debugging auth issues where you need to verify the actual key value.

Bridged to both CLI and gateway via HERMES_REDACT_SECRETS env var.
The check is in redact_sensitive_text() itself, so all call sites
(terminal, file tools, log formatter) respect it.
2026-03-09 01:04:33 -07:00
teknium1 7af33accf1 fix: apply secret redaction to file tool outputs
Terminal output was already redacted via redact_sensitive_text() but
read_file and search_files returned raw content. Now both tools
redact secrets before returning results to the LLM.

Based on PR #372 by @teyrebaz33 (closes #363) — applied manually
due to branch conflicts with the current codebase.
2026-03-09 00:49:46 -07:00
teknium1 3214c05e82 Merge PR #369: fix(gateway): add missing UTF-8 encoding to file I/O
Authored by @ch3ronsa. Fixes UnicodeEncodeError/UnicodeDecodeError on
Windows with non-UTF-8 system locales (e.g. Turkish cp1254).

Adds encoding='utf-8' to 10 open() calls across gateway/session.py,
gateway/channel_directory.py, and gateway/mirror.py.
2026-03-09 00:36:38 -07:00
teknium1 4608a7fe4e fix: make skills manifest writes atomic
Uses temp file + fsync + os.replace() to avoid corruption if the
process crashes mid-write. Cleans up temp file on failure, logs
errors at debug level.

Based on PR #335 by @aydnOktay — adapted for the current v2
manifest format (name:hash).
2026-03-08 23:53:57 -07:00
Vicaversa f90a627f9a fix(gateway): add missing UTF-8 encoding to file I/O preventing crashes on Windows
On Windows, Python's open() defaults to the system locale encoding
(e.g. cp1254 for Turkish, cp1252 for Western European) instead of
UTF-8. The gateway already uses ensure_ascii=False in json.dumps()
to preserve Unicode characters in chat messages, but the
corresponding open() calls lack encoding="utf-8". This mismatch
causes UnicodeEncodeError / UnicodeDecodeError when users send
non-ASCII messages (Turkish, Japanese, Arabic, emoji, etc.) through
Telegram, Discord, WhatsApp, or Slack on Windows.

The project already fixed this for .env files in hermes_cli/config.py
(line 624) but the gateway module was missed.

Files fixed:
- gateway/session.py: session index + JSONL transcript read/write (5 calls)
- gateway/channel_directory.py: channel directory read/write (3 calls)
- gateway/mirror.py: session index read + transcript append (2 calls)
2026-03-04 11:32:57 +03:00
14 changed files with 408 additions and 36 deletions
+35 -1
View File
@@ -8,6 +8,7 @@ the first 6 and last 4 characters for debuggability.
"""
import logging
import os
import re
from typing import Optional
@@ -15,7 +16,7 @@ logger = logging.getLogger(__name__)
# Known API key prefixes -- match the prefix + contiguous token chars
_PREFIX_PATTERNS = [
r"sk-[A-Za-z0-9_-]{10,}", # OpenAI / OpenRouter
r"sk-[A-Za-z0-9_-]{10,}", # OpenAI / OpenRouter / Anthropic (sk-ant-*)
r"ghp_[A-Za-z0-9]{10,}", # GitHub PAT (classic)
r"github_pat_[A-Za-z0-9_]{10,}", # GitHub PAT (fine-grained)
r"xox[baprs]-[A-Za-z0-9-]{10,}", # Slack tokens
@@ -25,6 +26,18 @@ _PREFIX_PATTERNS = [
r"fc-[A-Za-z0-9]{10,}", # Firecrawl
r"bb_live_[A-Za-z0-9_-]{10,}", # BrowserBase
r"gAAAA[A-Za-z0-9_=-]{20,}", # Codex encrypted tokens
r"AKIA[A-Z0-9]{16}", # AWS Access Key ID
r"sk_live_[A-Za-z0-9]{10,}", # Stripe secret key (live)
r"sk_test_[A-Za-z0-9]{10,}", # Stripe secret key (test)
r"rk_live_[A-Za-z0-9]{10,}", # Stripe restricted key
r"SG\.[A-Za-z0-9_-]{10,}", # SendGrid API key
r"hf_[A-Za-z0-9]{10,}", # HuggingFace token
r"r8_[A-Za-z0-9]{10,}", # Replicate API token
r"npm_[A-Za-z0-9]{10,}", # npm access token
r"pypi-[A-Za-z0-9_-]{10,}", # PyPI API token
r"dop_v1_[A-Za-z0-9]{10,}", # DigitalOcean PAT
r"doo_v1_[A-Za-z0-9]{10,}", # DigitalOcean OAuth
r"am_[A-Za-z0-9_-]{10,}", # AgentMail API key
]
# ENV assignment patterns: KEY=value where KEY contains a secret-like name
@@ -52,6 +65,18 @@ _TELEGRAM_RE = re.compile(
r"(bot)?(\d{8,}):([-A-Za-z0-9_]{30,})",
)
# Private key blocks: -----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY-----
_PRIVATE_KEY_RE = re.compile(
r"-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----"
)
# Database connection strings: protocol://user:PASSWORD@host
# Catches postgres, mysql, mongodb, redis, amqp URLs and redacts the password
_DB_CONNSTR_RE = re.compile(
r"((?:postgres(?:ql)?|mysql|mongodb(?:\+srv)?|redis|amqp)://[^:]+:)([^@]+)(@)",
re.IGNORECASE,
)
# E.164 phone numbers: +<country><number>, 7-15 digits
# Negative lookahead prevents matching hex strings or identifiers
_SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])")
@@ -73,9 +98,12 @@ def redact_sensitive_text(text: str) -> str:
"""Apply all redaction patterns to a block of text.
Safe to call on any string -- non-matching text passes through unchanged.
Disabled when security.redact_secrets is false in config.yaml.
"""
if not text:
return text
if os.getenv("HERMES_REDACT_SECRETS", "").lower() in ("0", "false", "no", "off"):
return text
# Known prefixes (sk-, ghp_, etc.)
text = _PREFIX_RE.sub(lambda m: _mask_token(m.group(1)), text)
@@ -105,6 +133,12 @@ def redact_sensitive_text(text: str) -> str:
return f"{prefix}{digits}:***"
text = _TELEGRAM_RE.sub(_redact_telegram, text)
# Private key blocks
text = _PRIVATE_KEY_RE.sub("[REDACTED PRIVATE KEY]", text)
# Database connection string passwords
text = _DB_CONNSTR_RE.sub(lambda m: f"{m.group(1)}***{m.group(3)}", text)
# E.164 phone numbers (Signal, WhatsApp)
def _redact_phone(m):
phone = m.group(1)
+7
View File
@@ -364,6 +364,13 @@ def load_cli_config() -> Dict[str, Any]:
if model:
os.environ[model_env] = model
# Security settings
security_config = defaults.get("security", {})
if isinstance(security_config, dict):
redact = security_config.get("redact_secrets")
if redact is not None:
os.environ["HERMES_REDACT_SECRETS"] = str(redact).lower()
return defaults
# Load configuration at module startup
+3 -3
View File
@@ -52,7 +52,7 @@ def build_channel_directory(adapters: Dict[Any, Any]) -> Dict[str, Any]:
try:
DIRECTORY_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(DIRECTORY_PATH, "w") as f:
with open(DIRECTORY_PATH, "w", encoding="utf-8") as f:
json.dump(directory, f, indent=2, ensure_ascii=False)
except Exception as e:
logger.warning("Channel directory: failed to write: %s", e)
@@ -115,7 +115,7 @@ def _build_from_sessions(platform_name: str) -> List[Dict[str, str]]:
entries = []
try:
with open(sessions_path) as f:
with open(sessions_path, encoding="utf-8") as f:
data = json.load(f)
seen_ids = set()
@@ -147,7 +147,7 @@ def load_directory() -> Dict[str, Any]:
if not DIRECTORY_PATH.exists():
return {"updated_at": None, "platforms": {}}
try:
with open(DIRECTORY_PATH) as f:
with open(DIRECTORY_PATH, encoding="utf-8") as f:
return json.load(f)
except Exception:
return {"updated_at": None, "platforms": {}}
+2 -2
View File
@@ -73,7 +73,7 @@ def _find_session_id(platform: str, chat_id: str) -> Optional[str]:
return None
try:
with open(_SESSIONS_INDEX) as f:
with open(_SESSIONS_INDEX, encoding="utf-8") as f:
data = json.load(f)
except Exception:
return None
@@ -103,7 +103,7 @@ def _append_to_jsonl(session_id: str, message: dict) -> None:
"""Append a message to the JSONL transcript file."""
transcript_path = _SESSIONS_DIR / f"{session_id}.jsonl"
try:
with open(transcript_path, "a") as f:
with open(transcript_path, "a", encoding="utf-8") as f:
f.write(json.dumps(message, ensure_ascii=False) + "\n")
except Exception as e:
logger.debug("Mirror JSONL write failed: %s", e)
+6
View File
@@ -118,6 +118,12 @@ if _config_path.exists():
_tz_cfg = _cfg.get("timezone", "")
if _tz_cfg and isinstance(_tz_cfg, str) and "HERMES_TIMEZONE" not in os.environ:
os.environ["HERMES_TIMEZONE"] = _tz_cfg.strip()
# Security settings
_security_cfg = _cfg.get("security", {})
if isinstance(_security_cfg, dict):
_redact = _security_cfg.get("redact_secrets")
if _redact is not None:
os.environ["HERMES_REDACT_SECRETS"] = str(_redact).lower()
except Exception:
pass # Non-fatal; gateway can still run with .env values
+5 -5
View File
@@ -342,7 +342,7 @@ class SessionStore:
if sessions_file.exists():
try:
with open(sessions_file, "r") as f:
with open(sessions_file, "r", encoding="utf-8") as f:
data = json.load(f)
for key, entry_data in data.items():
self._entries[key] = SessionEntry.from_dict(entry_data)
@@ -357,7 +357,7 @@ class SessionStore:
sessions_file = self.sessions_dir / "sessions.json"
data = {key: entry.to_dict() for key, entry in self._entries.items()}
with open(sessions_file, "w") as f:
with open(sessions_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
def _generate_session_key(self, source: SessionSource) -> str:
@@ -681,7 +681,7 @@ class SessionStore:
# Also write legacy JSONL (keeps existing tooling working during transition)
transcript_path = self.get_transcript_path(session_id)
with open(transcript_path, "a") as f:
with open(transcript_path, "a", encoding="utf-8") as f:
f.write(json.dumps(message, ensure_ascii=False) + "\n")
def rewrite_transcript(self, session_id: str, messages: List[Dict[str, Any]]) -> None:
@@ -708,7 +708,7 @@ class SessionStore:
# JSONL: overwrite the file
transcript_path = self.get_transcript_path(session_id)
with open(transcript_path, "w") as f:
with open(transcript_path, "w", encoding="utf-8") as f:
for msg in messages:
f.write(json.dumps(msg, ensure_ascii=False) + "\n")
@@ -730,7 +730,7 @@ class SessionStore:
return []
messages = []
with open(transcript_path, "r") as f:
with open(transcript_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
+21 -5
View File
@@ -759,8 +759,16 @@ def load_config() -> Dict[str, Any]:
return config
_FALLBACK_MODEL_COMMENT = """
# Fallback model — automatic provider failover when primary is unavailable.
_COMMENTED_SECTIONS = """
# ── Security ──────────────────────────────────────────────────────────
# API keys, tokens, and passwords are redacted from tool output by default.
# Set to false to see full values (useful for debugging auth issues).
#
# security:
# redact_secrets: false
# ── Fallback Model ────────────────────────────────────────────────────
# Automatic provider failover when primary is unavailable.
# Uncomment and configure to enable. Triggers on rate limits (429),
# overload (529), service errors (503), or connection failures.
#
@@ -788,10 +796,18 @@ def save_config(config: Dict[str, Any]):
with open(config_path, 'w') as f:
yaml.dump(config, f, default_flow_style=False, sort_keys=False)
# Append commented-out fallback_model docs if user hasn't configured it
fb = config.get("fallback_model")
# Append commented-out sections for features that are off by default
# or only relevant when explicitly configured. Skip sections the
# user has already uncommented and configured.
sections = []
sec = config.get("security", {})
if not sec or sec.get("redact_secrets") is None:
sections.append("security")
fb = config.get("fallback_model", {})
if not fb or not (fb.get("provider") and fb.get("model")):
f.write(_FALLBACK_MODEL_COMMENT)
sections.append("fallback")
if sections:
f.write(_COMMENTED_SECTIONS)
def load_env() -> Dict[str, str]:
+45 -10
View File
@@ -3092,9 +3092,14 @@ class AIAgent:
)
self._iters_since_skill = 0
# Honcho prefetch: retrieve user context for system prompt injection
# Honcho prefetch: retrieve user context for system prompt injection.
# Only on the FIRST turn of a session (empty history). On subsequent
# turns the model already has all prior context in its conversation
# history, and the Honcho context is baked into the stored system
# prompt — re-fetching it would change the system message and break
# Anthropic prompt caching.
self._honcho_context = ""
if self._honcho and self._honcho_session_key:
if self._honcho and self._honcho_session_key and not conversation_history:
try:
self._honcho_context = self._honcho_prefetch(user_message)
except Exception as e:
@@ -3112,14 +3117,42 @@ class AIAgent:
# Built once on first call, reused for all subsequent calls.
# Only rebuilt after context compression events (which invalidate
# the cache and reload memory from disk).
#
# For continuing sessions (gateway creates a fresh AIAgent per
# message), we load the stored system prompt from the session DB
# instead of rebuilding. Rebuilding would pick up memory changes
# from disk that the model already knows about (it wrote them!),
# producing a different system prompt and breaking the Anthropic
# prefix cache.
if self._cached_system_prompt is None:
self._cached_system_prompt = self._build_system_prompt(system_message)
# Store the system prompt snapshot in SQLite
if self._session_db:
stored_prompt = None
if conversation_history and self._session_db:
try:
self._session_db.update_system_prompt(self.session_id, self._cached_system_prompt)
except Exception as e:
logger.debug("Session DB update_system_prompt failed: %s", e)
session_row = self._session_db.get_session(self.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
except Exception:
pass # Fall through to build fresh
if stored_prompt:
# Continuing session — reuse the exact system prompt from
# the previous turn so the Anthropic cache prefix matches.
self._cached_system_prompt = stored_prompt
else:
# First turn of a new session — build from scratch.
self._cached_system_prompt = self._build_system_prompt(system_message)
# Bake Honcho context into the prompt so it's stable for
# the entire session (not re-fetched per turn).
if self._honcho_context:
self._cached_system_prompt = (
self._cached_system_prompt + "\n\n" + self._honcho_context
).strip()
# Store the system prompt snapshot in SQLite
if self._session_db:
try:
self._session_db.update_system_prompt(self.session_id, self._cached_system_prompt)
except Exception as e:
logger.debug("Session DB update_system_prompt failed: %s", e)
active_system_prompt = self._cached_system_prompt
@@ -3244,11 +3277,13 @@ class AIAgent:
# Build the final system message: cached prompt + ephemeral system prompt.
# The ephemeral part is appended here (not baked into the cached prompt)
# so it stays out of the session DB and logs.
# Note: Honcho context is baked into _cached_system_prompt on the first
# turn and stored in the session DB, so it does NOT need to be injected
# here. This keeps the system message identical across all turns in a
# session, maximizing Anthropic prompt cache hits.
effective_system = active_system_prompt or ""
if self.ephemeral_system_prompt:
effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip()
if self._honcho_context:
effective_system = (effective_system + "\n\n" + self._honcho_context).strip()
if effective_system:
api_messages = [{"role": "system", "content": effective_system}] + api_messages
+133
View File
@@ -1040,3 +1040,136 @@ class TestMaxTokensParam:
agent.base_url = "https://openrouter.ai/api/v1/api.openai.com"
result = agent._max_tokens_param(4096)
assert result == {"max_tokens": 4096}
# ---------------------------------------------------------------------------
# System prompt stability for prompt caching
# ---------------------------------------------------------------------------
class TestSystemPromptStability:
"""Verify that the system prompt stays stable across turns for cache hits."""
def test_stored_prompt_reused_for_continuing_session(self, agent):
"""When conversation_history is non-empty and session DB has a stored
prompt, it should be reused instead of rebuilding from disk."""
stored = "You are helpful. [stored from turn 1]"
mock_db = MagicMock()
mock_db.get_session.return_value = {"system_prompt": stored}
agent._session_db = mock_db
# Simulate a continuing session with history
history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
# First call — _cached_system_prompt is None, history is non-empty
agent._cached_system_prompt = None
# Patch run_conversation internals to just test the system prompt logic.
# We'll call the prompt caching block directly by simulating what
# run_conversation does.
conversation_history = history
# The block under test (from run_conversation):
if agent._cached_system_prompt is None:
stored_prompt = None
if conversation_history and agent._session_db:
try:
session_row = agent._session_db.get_session(agent.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
except Exception:
pass
if stored_prompt:
agent._cached_system_prompt = stored_prompt
assert agent._cached_system_prompt == stored
mock_db.get_session.assert_called_once_with(agent.session_id)
def test_fresh_build_when_no_history(self, agent):
"""On the first turn (no history), system prompt should be built fresh."""
mock_db = MagicMock()
agent._session_db = mock_db
agent._cached_system_prompt = None
conversation_history = []
# The block under test:
if agent._cached_system_prompt is None:
stored_prompt = None
if conversation_history and agent._session_db:
session_row = agent._session_db.get_session(agent.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
if stored_prompt:
agent._cached_system_prompt = stored_prompt
else:
agent._cached_system_prompt = agent._build_system_prompt()
# Should have built fresh, not queried the DB
mock_db.get_session.assert_not_called()
assert agent._cached_system_prompt is not None
assert "Hermes Agent" in agent._cached_system_prompt
def test_fresh_build_when_db_has_no_prompt(self, agent):
"""If the session DB has no stored prompt, build fresh even with history."""
mock_db = MagicMock()
mock_db.get_session.return_value = {"system_prompt": ""}
agent._session_db = mock_db
agent._cached_system_prompt = None
conversation_history = [{"role": "user", "content": "hi"}]
if agent._cached_system_prompt is None:
stored_prompt = None
if conversation_history and agent._session_db:
try:
session_row = agent._session_db.get_session(agent.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
except Exception:
pass
if stored_prompt:
agent._cached_system_prompt = stored_prompt
else:
agent._cached_system_prompt = agent._build_system_prompt()
# Empty string is falsy, so should fall through to fresh build
assert "Hermes Agent" in agent._cached_system_prompt
def test_honcho_context_baked_into_prompt_on_first_turn(self, agent):
"""Honcho context should be baked into _cached_system_prompt on
the first turn, not injected separately per API call."""
agent._honcho_context = "User prefers Python over JavaScript."
agent._cached_system_prompt = None
# Simulate first turn: build fresh and bake in Honcho
agent._cached_system_prompt = agent._build_system_prompt()
if agent._honcho_context:
agent._cached_system_prompt = (
agent._cached_system_prompt + "\n\n" + agent._honcho_context
).strip()
assert "User prefers Python over JavaScript" in agent._cached_system_prompt
def test_honcho_prefetch_skipped_on_continuing_session(self):
"""Honcho prefetch should not be called when conversation_history
is non-empty (continuing session)."""
conversation_history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
# The guard: `not conversation_history` is False when history exists
should_prefetch = not conversation_history
assert should_prefetch is False
def test_honcho_prefetch_runs_on_first_turn(self):
"""Honcho prefetch should run when conversation_history is empty."""
conversation_history = []
should_prefetch = not conversation_history
assert should_prefetch is True
+51
View File
@@ -393,5 +393,56 @@ class TestStubSchemaDrift(unittest.TestCase):
self.assertIn("mode", src)
class TestHeadTailTruncation(unittest.TestCase):
"""Tests for head+tail truncation of large stdout in execute_code."""
def _run(self, code):
with patch("model_tools.handle_function_call", side_effect=_mock_handle_function_call):
result = execute_code(
code=code,
task_id="test-task",
enabled_tools=list(SANDBOX_ALLOWED_TOOLS),
)
return json.loads(result)
def test_short_output_not_truncated(self):
"""Output under MAX_STDOUT_BYTES should not be truncated."""
result = self._run('print("small output")')
self.assertEqual(result["status"], "success")
self.assertIn("small output", result["output"])
self.assertNotIn("TRUNCATED", result["output"])
def test_large_output_preserves_head_and_tail(self):
"""Output exceeding MAX_STDOUT_BYTES keeps both head and tail."""
code = '''
# Print HEAD marker, then filler, then TAIL marker
print("HEAD_MARKER_START")
for i in range(15000):
print(f"filler_line_{i:06d}_padding_to_fill_buffer")
print("TAIL_MARKER_END")
'''
result = self._run(code)
self.assertEqual(result["status"], "success")
output = result["output"]
# Head should be preserved
self.assertIn("HEAD_MARKER_START", output)
# Tail should be preserved (this is the key improvement)
self.assertIn("TAIL_MARKER_END", output)
# Truncation notice should be present
self.assertIn("TRUNCATED", output)
def test_truncation_notice_format(self):
"""Truncation notice includes character counts."""
code = '''
for i in range(15000):
print(f"padding_line_{i:06d}_xxxxxxxxxxxxxxxxxxxxxxxxxx")
'''
result = self._run(code)
output = result["output"]
if "TRUNCATED" in output:
self.assertIn("chars omitted", output)
self.assertIn("total", output)
if __name__ == "__main__":
unittest.main()
+2
View File
@@ -38,6 +38,7 @@ class TestReadFileHandler:
def test_returns_file_content(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.content = "line1\nline2"
result_obj.to_dict.return_value = {"content": "line1\nline2", "total_lines": 2}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
@@ -52,6 +53,7 @@ class TestReadFileHandler:
def test_custom_offset_and_limit(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.content = "line10"
result_obj.to_dict.return_value = {"content": "line10", "total_lines": 50}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
+62 -7
View File
@@ -457,11 +457,17 @@ def execute_code(
# --- Poll loop: watch for exit, timeout, and interrupt ---
deadline = time.monotonic() + timeout
stdout_chunks: list = []
stderr_chunks: list = []
# Background readers to avoid pipe buffer deadlocks
# Background readers to avoid pipe buffer deadlocks.
# For stdout we use a head+tail strategy: keep the first HEAD_BYTES
# and a rolling window of the last TAIL_BYTES so the final print()
# output is never lost. Stderr keeps head-only (errors appear early).
_STDOUT_HEAD_BYTES = int(MAX_STDOUT_BYTES * 0.4) # 40% head
_STDOUT_TAIL_BYTES = MAX_STDOUT_BYTES - _STDOUT_HEAD_BYTES # 60% tail
def _drain(pipe, chunks, max_bytes):
"""Simple head-only drain (used for stderr)."""
total = 0
try:
while True:
@@ -475,8 +481,48 @@ def execute_code(
except (ValueError, OSError):
pass
stdout_total_bytes = [0] # mutable ref for total bytes seen
def _drain_head_tail(pipe, head_chunks, tail_chunks, head_bytes, tail_bytes, total_ref):
"""Drain stdout keeping both head and tail data."""
head_collected = 0
from collections import deque
tail_buf = deque()
tail_collected = 0
try:
while True:
data = pipe.read(4096)
if not data:
break
total_ref[0] += len(data)
# Fill head buffer first
if head_collected < head_bytes:
keep = min(len(data), head_bytes - head_collected)
head_chunks.append(data[:keep])
head_collected += keep
data = data[keep:] # remaining goes to tail
if not data:
continue
# Everything past head goes into rolling tail buffer
tail_buf.append(data)
tail_collected += len(data)
# Evict old tail data to stay within tail_bytes budget
while tail_collected > tail_bytes and tail_buf:
oldest = tail_buf.popleft()
tail_collected -= len(oldest)
except (ValueError, OSError):
pass
# Transfer final tail to output list
tail_chunks.extend(tail_buf)
stdout_head_chunks: list = []
stdout_tail_chunks: list = []
stdout_reader = threading.Thread(
target=_drain, args=(proc.stdout, stdout_chunks, MAX_STDOUT_BYTES), daemon=True
target=_drain_head_tail,
args=(proc.stdout, stdout_head_chunks, stdout_tail_chunks,
_STDOUT_HEAD_BYTES, _STDOUT_TAIL_BYTES, stdout_total_bytes),
daemon=True
)
stderr_reader = threading.Thread(
target=_drain, args=(proc.stderr, stderr_chunks, MAX_STDERR_BYTES), daemon=True
@@ -500,12 +546,21 @@ def execute_code(
stdout_reader.join(timeout=3)
stderr_reader.join(timeout=3)
stdout_text = b"".join(stdout_chunks).decode("utf-8", errors="replace")
stdout_head = b"".join(stdout_head_chunks).decode("utf-8", errors="replace")
stdout_tail = b"".join(stdout_tail_chunks).decode("utf-8", errors="replace")
stderr_text = b"".join(stderr_chunks).decode("utf-8", errors="replace")
# Truncation notice
if len(stdout_text) >= MAX_STDOUT_BYTES:
stdout_text = stdout_text[:MAX_STDOUT_BYTES] + "\n[output truncated at 50KB]"
# Assemble stdout with head+tail truncation
total_stdout = stdout_total_bytes[0]
if total_stdout > MAX_STDOUT_BYTES and stdout_tail:
omitted = total_stdout - len(stdout_head) - len(stdout_tail)
truncated_notice = (
f"\n\n... [OUTPUT TRUNCATED - {omitted:,} chars omitted "
f"out of {total_stdout:,} total] ...\n\n"
)
stdout_text = stdout_head + truncated_notice + stdout_tail
else:
stdout_text = stdout_head + stdout_tail
exit_code = proc.returncode if proc.returncode is not None else -1
duration = round(time.monotonic() - exec_start, 2)
+7
View File
@@ -7,6 +7,7 @@ import os
import threading
from typing import Optional
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
logger = logging.getLogger(__name__)
@@ -128,6 +129,8 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
try:
file_ops = _get_file_ops(task_id)
result = file_ops.read_file(path, offset, limit)
if result.content:
result.content = redact_sensitive_text(result.content)
return json.dumps(result.to_dict(), ensure_ascii=False)
except Exception as e:
return json.dumps({"error": str(e)}, ensure_ascii=False)
@@ -186,6 +189,10 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
pattern=pattern, path=path, target=target, file_glob=file_glob,
limit=limit, offset=offset, output_mode=output_mode, context=context
)
if hasattr(result, 'matches'):
for m in result.matches:
if hasattr(m, 'content') and m.content:
m.content = redact_sensitive_text(m.content)
result_dict = result.to_dict()
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when results were truncated — explicit next offset is clearer
+29 -3
View File
@@ -69,10 +69,36 @@ def _read_manifest() -> Dict[str, str]:
def _write_manifest(entries: Dict[str, str]):
"""Write the manifest file in v2 format (name:hash)."""
"""Write the manifest file atomically in v2 format (name:hash).
Uses a temp file + os.replace() to avoid corruption if the process
crashes or is interrupted mid-write.
"""
import tempfile
MANIFEST_FILE.parent.mkdir(parents=True, exist_ok=True)
lines = [f"{name}:{hash_val}" for name, hash_val in sorted(entries.items())]
MANIFEST_FILE.write_text("\n".join(lines) + "\n", encoding="utf-8")
data = "\n".join(f"{name}:{hash_val}" for name, hash_val in sorted(entries.items())) + "\n"
try:
fd, tmp_path = tempfile.mkstemp(
dir=str(MANIFEST_FILE.parent),
prefix=".bundled_manifest_",
suffix=".tmp",
)
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(data)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, MANIFEST_FILE)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
except Exception as e:
logger.debug("Failed to write skills manifest %s: %s", MANIFEST_FILE, e, exc_info=True)
def _discover_bundled_skills(bundled_dir: Path) -> List[Tuple[str, Path]]: