Compare commits

..

1 Commits

Author SHA1 Message Date
alireza78a 9a7ed81b4b fix(cron): use atomic write in save_job_output to prevent data loss on crash
save_job_output() used bare open('w') which truncates the output file
immediately. A crash or OOM kill between truncation and the completed
write would silently wipe the job output.

Write now goes to a temp file first, then os.replace() swaps it
atomically — matching the existing save_jobs() pattern in the same file.
Preserves _secure_file() permissions and uses safe cleanup on error.

Cherry-picked from PR #874 by alireza78a, rebased onto current main
with conflict resolution and fixes:
- Kept _secure_dir/_secure_file security calls from PR #757
- Used except BaseException (not bare except) to match save_jobs pattern
- Wrapped os.unlink in try/except OSError to avoid masking errors

Co-authored-by: alireza78a <alireza78a@users.noreply.github.com>
2026-03-13 08:04:36 -07:00
2 changed files with 14 additions and 26 deletions
+14 -3
View File
@@ -431,8 +431,19 @@ def save_job_output(job_id: str, output: str):
timestamp = _hermes_now().strftime("%Y-%m-%d_%H-%M-%S")
output_file = job_output_dir / f"{timestamp}.md"
with open(output_file, 'w', encoding='utf-8') as f:
f.write(output)
_secure_file(output_file)
fd, tmp_path = tempfile.mkstemp(dir=str(job_output_dir), suffix='.tmp', prefix='.output_')
try:
with os.fdopen(fd, 'w', encoding='utf-8') as f:
f.write(output)
f.flush()
os.fsync(f.fileno())
os.replace(tmp_path, output_file)
_secure_file(output_file)
except BaseException:
try:
os.unlink(tmp_path)
except OSError:
pass
raise
return output_file
-23
View File
@@ -1446,11 +1446,6 @@ class GatewayRunner:
response = agent_result.get("final_response", "")
agent_messages = agent_result.get("messages", [])
# If the agent's session_id changed during compression, update
# session_entry so transcript writes below go to the right session.
if agent_result.get("session_id") and agent_result["session_id"] != session_entry.session_id:
session_entry.session_id = agent_result["session_id"]
# Prepend reasoning/thinking if display is enabled
if getattr(self, "_show_reasoning", False) and response:
last_reasoning = agent_result.get("last_reasoning")
@@ -3500,23 +3495,6 @@ class GatewayRunner:
unique_tags.insert(0, "[[audio_as_voice]]")
final_response = final_response + "\n" + "\n".join(unique_tags)
# Sync session_id: the agent may have created a new session during
# mid-run context compression (_compress_context splits sessions).
# If so, update the session store entry so the NEXT message loads
# the compressed transcript, not the stale pre-compression one.
agent = agent_holder[0]
if agent and session_key and hasattr(agent, 'session_id') and agent.session_id != session_id:
logger.info(
"Session split detected: %s%s (compression)",
session_id, agent.session_id,
)
entry = self.session_store._entries.get(session_key)
if entry:
entry.session_id = agent.session_id
self.session_store._save()
effective_session_id = getattr(agent, 'session_id', session_id) if agent else session_id
return {
"final_response": final_response,
"last_reasoning": result.get("last_reasoning"),
@@ -3525,7 +3503,6 @@ class GatewayRunner:
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
"last_prompt_tokens": _last_prompt_toks,
"session_id": effective_session_id,
}
# Start progress message sender if enabled