"""Async history bridge for non-blocking transcript persistence.""" from __future__ import annotations import asyncio import time from dataclasses import dataclass from typing import Optional from loguru import logger from runtime.ports import ConversationHistoryStore @dataclass class _HistoryTranscriptJob: call_id: str turn_index: int speaker: str content: str start_ms: int end_ms: int duration_ms: int class SessionHistoryBridge: """Session-scoped buffered history writer with background retries.""" _STOP_SENTINEL = object() def __init__( self, *, history_writer: ConversationHistoryStore | None, enabled: bool, queue_max_size: int, retry_max_attempts: int, retry_backoff_sec: float, finalize_drain_timeout_sec: float, ): self._history_writer = history_writer self._enabled = bool(enabled and history_writer is not None) self._queue_max_size = max(1, int(queue_max_size)) self._retry_max_attempts = max(0, int(retry_max_attempts)) self._retry_backoff_sec = max(0.0, float(retry_backoff_sec)) self._finalize_drain_timeout_sec = max(0.0, float(finalize_drain_timeout_sec)) self._call_id: Optional[str] = None self._turn_index: int = 0 self._started_mono: Optional[float] = None self._finalized: bool = False self._worker_task: Optional[asyncio.Task] = None self._finalize_lock = asyncio.Lock() self._queue: asyncio.Queue[_HistoryTranscriptJob | object] = asyncio.Queue(maxsize=self._queue_max_size) @property def enabled(self) -> bool: return self._enabled @property def call_id(self) -> Optional[str]: return self._call_id async def start_call( self, *, user_id: int, assistant_id: Optional[str], source: str, ) -> Optional[str]: """Create remote call record and start background worker.""" if not self._enabled or self._call_id: return self._call_id call_id = await self._history_writer.create_call_record( user_id=user_id, assistant_id=assistant_id, source=source, ) if not call_id: return None self._call_id = str(call_id) self._turn_index = 0 self._finalized = False self._started_mono = time.monotonic() self._ensure_worker() return self._call_id def elapsed_ms(self) -> int: if self._started_mono is None: return 0 return max(0, int((time.monotonic() - self._started_mono) * 1000)) def enqueue_turn(self, *, role: str, text: str) -> bool: """Queue one transcript write without blocking the caller.""" if not self._enabled or not self._call_id or self._finalized: return False content = str(text or "").strip() if not content: return False speaker = "human" if str(role or "").strip().lower() == "user" else "ai" end_ms = self.elapsed_ms() estimated_duration_ms = max(300, min(12000, len(content) * 80)) start_ms = max(0, end_ms - estimated_duration_ms) job = _HistoryTranscriptJob( call_id=self._call_id, turn_index=self._turn_index, speaker=speaker, content=content, start_ms=start_ms, end_ms=end_ms, duration_ms=max(1, end_ms - start_ms), ) self._turn_index += 1 self._ensure_worker() try: self._queue.put_nowait(job) return True except asyncio.QueueFull: logger.warning( "History queue full; dropping transcript call_id={} turn={}", self._call_id, job.turn_index, ) return False async def finalize(self, *, status: str) -> bool: """Finalize history record once; waits briefly for queue drain.""" if not self._enabled or not self._call_id: return False async with self._finalize_lock: if self._finalized: return True await self._drain_queue() ok = await self._history_writer.finalize_call_record( call_id=self._call_id, status=status, duration_seconds=self.duration_seconds(), ) if ok: self._finalized = True await self._stop_worker() return ok async def shutdown(self) -> None: """Stop worker task and release queue resources.""" await self._stop_worker() def duration_seconds(self) -> int: if self._started_mono is None: return 0 return max(0, int(time.monotonic() - self._started_mono)) def _ensure_worker(self) -> None: if self._worker_task and not self._worker_task.done(): return self._worker_task = asyncio.create_task(self._worker_loop()) async def _drain_queue(self) -> None: if self._finalize_drain_timeout_sec <= 0: return try: await asyncio.wait_for(self._queue.join(), timeout=self._finalize_drain_timeout_sec) except asyncio.TimeoutError: logger.warning("History queue drain timed out after {}s", self._finalize_drain_timeout_sec) async def _stop_worker(self) -> None: task = self._worker_task if not task: return if task.done(): self._worker_task = None return sent = False try: self._queue.put_nowait(self._STOP_SENTINEL) sent = True except asyncio.QueueFull: pass if not sent: try: await asyncio.wait_for(self._queue.put(self._STOP_SENTINEL), timeout=0.5) except asyncio.TimeoutError: task.cancel() try: await asyncio.wait_for(task, timeout=1.5) except asyncio.TimeoutError: task.cancel() try: await task except Exception: pass except asyncio.CancelledError: pass finally: self._worker_task = None async def _worker_loop(self) -> None: while True: item = await self._queue.get() try: if item is self._STOP_SENTINEL: return assert isinstance(item, _HistoryTranscriptJob) await self._write_with_retry(item) except Exception as exc: logger.warning("History worker write failed unexpectedly: {}", exc) finally: self._queue.task_done() async def _write_with_retry(self, job: _HistoryTranscriptJob) -> bool: for attempt in range(self._retry_max_attempts + 1): ok = await self._history_writer.add_transcript( call_id=job.call_id, turn_index=job.turn_index, speaker=job.speaker, content=job.content, start_ms=job.start_ms, end_ms=job.end_ms, duration_ms=job.duration_ms, ) if ok: return True if attempt >= self._retry_max_attempts: logger.warning( "History write dropped after retries call_id={} turn={}", job.call_id, job.turn_index, ) return False if self._retry_backoff_sec > 0: await asyncio.sleep(self._retry_backoff_sec * (2**attempt)) return False