|
|
|
|
@@ -36,7 +36,7 @@ _NEW_SEGMENT = object()
|
|
|
|
|
@dataclass
|
|
|
|
|
class StreamConsumerConfig:
|
|
|
|
|
"""Runtime config for a single stream consumer instance."""
|
|
|
|
|
edit_interval: float = 0.3
|
|
|
|
|
edit_interval: float = 1.0
|
|
|
|
|
buffer_threshold: int = 40
|
|
|
|
|
cursor: str = " ▉"
|
|
|
|
|
|
|
|
|
|
@@ -56,6 +56,10 @@ class GatewayStreamConsumer:
|
|
|
|
|
await task # wait for final edit
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# After this many consecutive flood-control failures, permanently disable
|
|
|
|
|
# progressive edits for the remainder of the stream.
|
|
|
|
|
_MAX_FLOOD_STRIKES = 3
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
adapter: Any,
|
|
|
|
|
@@ -76,6 +80,8 @@ class GatewayStreamConsumer:
|
|
|
|
|
self._last_sent_text = "" # Track last-sent text to skip redundant edits
|
|
|
|
|
self._fallback_final_send = False
|
|
|
|
|
self._fallback_prefix = ""
|
|
|
|
|
self._flood_strikes = 0 # Consecutive flood-control edit failures
|
|
|
|
|
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def already_sent(self) -> bool:
|
|
|
|
|
@@ -129,7 +135,7 @@ class GatewayStreamConsumer:
|
|
|
|
|
should_edit = (
|
|
|
|
|
got_done
|
|
|
|
|
or got_segment_break
|
|
|
|
|
or (elapsed >= self.cfg.edit_interval
|
|
|
|
|
or (elapsed >= self._current_edit_interval
|
|
|
|
|
and self._accumulated)
|
|
|
|
|
or len(self._accumulated) >= self.cfg.buffer_threshold
|
|
|
|
|
)
|
|
|
|
|
@@ -173,12 +179,13 @@ class GatewayStreamConsumer:
|
|
|
|
|
if split_at < _safe_limit // 2:
|
|
|
|
|
split_at = _safe_limit
|
|
|
|
|
chunk = self._accumulated[:split_at]
|
|
|
|
|
await self._send_or_edit(chunk)
|
|
|
|
|
if self._fallback_final_send:
|
|
|
|
|
# Edit failed while attempting to split an oversized
|
|
|
|
|
# message. Keep the full accumulated text intact so
|
|
|
|
|
# the fallback final-send path can deliver the
|
|
|
|
|
# remaining continuation without dropping content.
|
|
|
|
|
ok = await self._send_or_edit(chunk)
|
|
|
|
|
if self._fallback_final_send or not ok:
|
|
|
|
|
# Edit failed (or backed off due to flood control)
|
|
|
|
|
# while attempting to split an oversized message.
|
|
|
|
|
# Keep the full accumulated text intact so the
|
|
|
|
|
# fallback final-send path can deliver the remaining
|
|
|
|
|
# continuation without dropping content.
|
|
|
|
|
break
|
|
|
|
|
self._accumulated = self._accumulated[split_at:].lstrip("\n")
|
|
|
|
|
self._message_id = None
|
|
|
|
|
@@ -322,7 +329,10 @@ class GatewayStreamConsumer:
|
|
|
|
|
return chunks
|
|
|
|
|
|
|
|
|
|
async def _send_fallback_final(self, text: str) -> None:
|
|
|
|
|
"""Send the final continuation after streaming edits stop working."""
|
|
|
|
|
"""Send the final continuation after streaming edits stop working.
|
|
|
|
|
|
|
|
|
|
Retries each chunk once on flood-control failures with a short delay.
|
|
|
|
|
"""
|
|
|
|
|
final_text = self._clean_for_display(text)
|
|
|
|
|
continuation = self._continuation_text(final_text)
|
|
|
|
|
self._fallback_final_send = False
|
|
|
|
|
@@ -339,12 +349,25 @@ class GatewayStreamConsumer:
|
|
|
|
|
last_successful_chunk = ""
|
|
|
|
|
sent_any_chunk = False
|
|
|
|
|
for chunk in chunks:
|
|
|
|
|
result = await self.adapter.send(
|
|
|
|
|
chat_id=self.chat_id,
|
|
|
|
|
content=chunk,
|
|
|
|
|
metadata=self.metadata,
|
|
|
|
|
)
|
|
|
|
|
if not result.success:
|
|
|
|
|
# Try sending with one retry on flood-control errors.
|
|
|
|
|
result = None
|
|
|
|
|
for attempt in range(2):
|
|
|
|
|
result = await self.adapter.send(
|
|
|
|
|
chat_id=self.chat_id,
|
|
|
|
|
content=chunk,
|
|
|
|
|
metadata=self.metadata,
|
|
|
|
|
)
|
|
|
|
|
if result.success:
|
|
|
|
|
break
|
|
|
|
|
if attempt == 0 and self._is_flood_error(result):
|
|
|
|
|
logger.debug(
|
|
|
|
|
"Flood control on fallback send, retrying in 3s"
|
|
|
|
|
)
|
|
|
|
|
await asyncio.sleep(3.0)
|
|
|
|
|
else:
|
|
|
|
|
break # non-flood error or second attempt failed
|
|
|
|
|
|
|
|
|
|
if not result or not result.success:
|
|
|
|
|
if sent_any_chunk:
|
|
|
|
|
# Some continuation text already reached the user. Suppress
|
|
|
|
|
# the base gateway final-send path so we don't resend the
|
|
|
|
|
@@ -370,20 +393,52 @@ class GatewayStreamConsumer:
|
|
|
|
|
self._last_sent_text = chunks[-1]
|
|
|
|
|
self._fallback_prefix = ""
|
|
|
|
|
|
|
|
|
|
async def _send_or_edit(self, text: str) -> None:
|
|
|
|
|
"""Send or edit the streaming message."""
|
|
|
|
|
def _is_flood_error(self, result) -> bool:
|
|
|
|
|
"""Check if a SendResult failure is due to flood control / rate limiting."""
|
|
|
|
|
err = getattr(result, "error", "") or ""
|
|
|
|
|
err_lower = err.lower()
|
|
|
|
|
return "flood" in err_lower or "retry after" in err_lower or "rate" in err_lower
|
|
|
|
|
|
|
|
|
|
async def _try_strip_cursor(self) -> None:
|
|
|
|
|
"""Best-effort edit to remove the cursor from the last visible message.
|
|
|
|
|
|
|
|
|
|
Called when entering fallback mode so the user doesn't see a stuck
|
|
|
|
|
cursor (▉) in the partial message.
|
|
|
|
|
"""
|
|
|
|
|
if not self._message_id or self._message_id == "__no_edit__":
|
|
|
|
|
return
|
|
|
|
|
prefix = self._visible_prefix()
|
|
|
|
|
if not prefix or not prefix.strip():
|
|
|
|
|
return
|
|
|
|
|
try:
|
|
|
|
|
await self.adapter.edit_message(
|
|
|
|
|
chat_id=self.chat_id,
|
|
|
|
|
message_id=self._message_id,
|
|
|
|
|
content=prefix,
|
|
|
|
|
)
|
|
|
|
|
self._last_sent_text = prefix
|
|
|
|
|
except Exception:
|
|
|
|
|
pass # best-effort — don't let this block the fallback path
|
|
|
|
|
|
|
|
|
|
async def _send_or_edit(self, text: str) -> bool:
|
|
|
|
|
"""Send or edit the streaming message.
|
|
|
|
|
|
|
|
|
|
Returns True if the text was successfully delivered (sent or edited),
|
|
|
|
|
False otherwise. Callers like the overflow split loop use this to
|
|
|
|
|
decide whether to advance past the delivered chunk.
|
|
|
|
|
"""
|
|
|
|
|
# Strip MEDIA: directives so they don't appear as visible text.
|
|
|
|
|
# Media files are delivered as native attachments after the stream
|
|
|
|
|
# finishes (via _deliver_media_from_response in gateway/run.py).
|
|
|
|
|
text = self._clean_for_display(text)
|
|
|
|
|
if not text.strip():
|
|
|
|
|
return
|
|
|
|
|
return True # nothing to send is "success"
|
|
|
|
|
try:
|
|
|
|
|
if self._message_id is not None:
|
|
|
|
|
if self._edit_supported:
|
|
|
|
|
# Skip if text is identical to what we last sent
|
|
|
|
|
if text == self._last_sent_text:
|
|
|
|
|
return
|
|
|
|
|
return True
|
|
|
|
|
# Edit existing message
|
|
|
|
|
result = await self.adapter.edit_message(
|
|
|
|
|
chat_id=self.chat_id,
|
|
|
|
|
@@ -393,19 +448,52 @@ class GatewayStreamConsumer:
|
|
|
|
|
if result.success:
|
|
|
|
|
self._already_sent = True
|
|
|
|
|
self._last_sent_text = text
|
|
|
|
|
# Successful edit — reset flood strike counter
|
|
|
|
|
self._flood_strikes = 0
|
|
|
|
|
return True
|
|
|
|
|
else:
|
|
|
|
|
# If an edit fails mid-stream (especially Telegram flood control),
|
|
|
|
|
# stop progressive edits and send only the missing tail once the
|
|
|
|
|
# Edit failed. If this looks like flood control / rate
|
|
|
|
|
# limiting, use adaptive backoff: double the edit interval
|
|
|
|
|
# and retry on the next cycle. Only permanently disable
|
|
|
|
|
# edits after _MAX_FLOOD_STRIKES consecutive failures.
|
|
|
|
|
if self._is_flood_error(result):
|
|
|
|
|
self._flood_strikes += 1
|
|
|
|
|
self._current_edit_interval = min(
|
|
|
|
|
self._current_edit_interval * 2, 10.0,
|
|
|
|
|
)
|
|
|
|
|
logger.debug(
|
|
|
|
|
"Flood control on edit (strike %d/%d), "
|
|
|
|
|
"backoff interval → %.1fs",
|
|
|
|
|
self._flood_strikes,
|
|
|
|
|
self._MAX_FLOOD_STRIKES,
|
|
|
|
|
self._current_edit_interval,
|
|
|
|
|
)
|
|
|
|
|
if self._flood_strikes < self._MAX_FLOOD_STRIKES:
|
|
|
|
|
# Don't disable edits yet — just slow down.
|
|
|
|
|
# Update _last_edit_time so the next edit
|
|
|
|
|
# respects the new interval.
|
|
|
|
|
self._last_edit_time = time.monotonic()
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Non-flood error OR flood strikes exhausted: enter
|
|
|
|
|
# fallback mode — send only the missing tail once the
|
|
|
|
|
# final response is available.
|
|
|
|
|
logger.debug("Edit failed, disabling streaming for this adapter")
|
|
|
|
|
logger.debug(
|
|
|
|
|
"Edit failed (strikes=%d), entering fallback mode",
|
|
|
|
|
self._flood_strikes,
|
|
|
|
|
)
|
|
|
|
|
self._fallback_prefix = self._visible_prefix()
|
|
|
|
|
self._fallback_final_send = True
|
|
|
|
|
self._edit_supported = False
|
|
|
|
|
self._already_sent = True
|
|
|
|
|
# Best-effort: strip the cursor from the last visible
|
|
|
|
|
# message so the user doesn't see a stuck ▉.
|
|
|
|
|
await self._try_strip_cursor()
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
# Editing not supported — skip intermediate updates.
|
|
|
|
|
# The final response will be sent by the fallback path.
|
|
|
|
|
pass
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
# First message — send new
|
|
|
|
|
result = await self.adapter.send(
|
|
|
|
|
@@ -417,6 +505,7 @@ class GatewayStreamConsumer:
|
|
|
|
|
self._message_id = result.message_id
|
|
|
|
|
self._already_sent = True
|
|
|
|
|
self._last_sent_text = text
|
|
|
|
|
return True
|
|
|
|
|
elif result.success:
|
|
|
|
|
# Platform accepted the message but returned no message_id
|
|
|
|
|
# (e.g. Signal). Can't edit without an ID — switch to
|
|
|
|
|
@@ -428,8 +517,11 @@ class GatewayStreamConsumer:
|
|
|
|
|
self._fallback_final_send = True
|
|
|
|
|
# Sentinel prevents re-entering this branch on every delta
|
|
|
|
|
self._message_id = "__no_edit__"
|
|
|
|
|
return True # platform accepted, just can't edit
|
|
|
|
|
else:
|
|
|
|
|
# Initial send failed — disable streaming for this session
|
|
|
|
|
self._edit_supported = False
|
|
|
|
|
return False
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error("Stream send/edit error: %s", e)
|
|
|
|
|
return False
|
|
|
|
|
|