diff --git a/engine/app/backend_client.py b/engine/app/backend_client.py index 1e716dc..4d8aa8c 100644 --- a/engine/app/backend_client.py +++ b/engine/app/backend_client.py @@ -1,4 +1,4 @@ -"""Backend API client for assistant config.""" +"""Backend API client for assistant config and history persistence.""" from __future__ import annotations @@ -42,3 +42,107 @@ async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]: logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}") return None + +def _backend_base_url() -> Optional[str]: + if not settings.backend_url: + return None + return settings.backend_url.rstrip("/") + + +def _timeout() -> aiohttp.ClientTimeout: + return aiohttp.ClientTimeout(total=settings.backend_timeout_sec) + + +async def create_history_call_record( + *, + user_id: int, + assistant_id: Optional[str], + source: str = "debug", +) -> Optional[str]: + """Create a call record via backend history API and return call_id.""" + base_url = _backend_base_url() + if not base_url: + return None + + url = f"{base_url}/api/history" + payload: Dict[str, Any] = { + "user_id": user_id, + "assistant_id": assistant_id, + "source": source, + "status": "connected", + } + + try: + async with aiohttp.ClientSession(timeout=_timeout()) as session: + async with session.post(url, json=payload) as resp: + resp.raise_for_status() + data = await resp.json() + call_id = str((data or {}).get("id") or "") + return call_id or None + except Exception as exc: + logger.warning(f"Failed to create history call record: {exc}") + return None + + +async def add_history_transcript( + *, + call_id: str, + turn_index: int, + speaker: str, + content: str, + start_ms: int, + end_ms: int, + confidence: Optional[float] = None, + duration_ms: Optional[int] = None, +) -> bool: + """Append a transcript segment to backend history.""" + base_url = _backend_base_url() + if not base_url or not call_id: + return False + + url = f"{base_url}/api/history/{call_id}/transcripts" + payload: Dict[str, Any] = { + "turn_index": turn_index, + "speaker": speaker, + "content": content, + "confidence": confidence, + "start_ms": start_ms, + "end_ms": end_ms, + "duration_ms": duration_ms, + } + + try: + async with aiohttp.ClientSession(timeout=_timeout()) as session: + async with session.post(url, json=payload) as resp: + resp.raise_for_status() + return True + except Exception as exc: + logger.warning(f"Failed to append history transcript (call_id={call_id}, turn={turn_index}): {exc}") + return False + + +async def finalize_history_call_record( + *, + call_id: str, + status: str, + duration_seconds: int, +) -> bool: + """Finalize a call record with status and duration.""" + base_url = _backend_base_url() + if not base_url or not call_id: + return False + + url = f"{base_url}/api/history/{call_id}" + payload: Dict[str, Any] = { + "status": status, + "duration_seconds": duration_seconds, + } + + try: + async with aiohttp.ClientSession(timeout=_timeout()) as session: + async with session.put(url, json=payload) as resp: + resp.raise_for_status() + return True + except Exception as exc: + logger.warning(f"Failed to finalize history call record ({call_id}): {exc}") + return False diff --git a/engine/app/config.py b/engine/app/config.py index 0c10fc1..95b92e1 100644 --- a/engine/app/config.py +++ b/engine/app/config.py @@ -91,6 +91,11 @@ class Settings(BaseSettings): ws_api_key: Optional[str] = Field(default=None, description="Optional API key required for WS hello auth") ws_require_auth: bool = Field(default=False, description="Require auth in hello message even when ws_api_key is not set") + # Backend bridge configuration (for call/transcript persistence) + backend_url: Optional[str] = Field(default=None, description="Backend API base URL (e.g. http://localhost:8787)") + backend_timeout_sec: int = Field(default=10, description="Backend API request timeout in seconds") + history_default_user_id: int = Field(default=1, description="Fallback user_id for history records") + @property def chunk_size_bytes(self) -> int: """Calculate chunk size in bytes based on sample rate and duration.""" diff --git a/engine/core/session.py b/engine/core/session.py index 7ff368e..460cd6d 100644 --- a/engine/core/session.py +++ b/engine/core/session.py @@ -2,12 +2,19 @@ import uuid import json +import time from enum import Enum from typing import Optional, Dict, Any from loguru import logger +from app.backend_client import ( + create_history_call_record, + add_history_transcript, + finalize_history_call_record, +) from core.transports import BaseTransport from core.duplex_pipeline import DuplexPipeline +from core.conversation import ConversationTurn from app.config import settings from models.ws_v1 import ( parse_client_message, @@ -67,6 +74,12 @@ class Session: # Track IDs self.current_track_id: Optional[str] = str(uuid.uuid4()) + self._history_call_id: Optional[str] = None + self._history_turn_index: int = 0 + self._history_call_started_mono: Optional[float] = None + self._history_finalized: bool = False + + self.pipeline.conversation.on_turn_complete(self._on_turn_complete) logger.info(f"Session {self.id} created (duplex={self.use_duplex})") @@ -206,8 +219,13 @@ class Session: await self._send_error("client", "Duplicate session.start", "protocol.order") return + metadata = message.metadata or {} + + # Create history call record early so later turn callbacks can append transcripts. + await self._start_history_bridge(metadata) + # Apply runtime service/prompt overrides from backend if provided - self.pipeline.apply_runtime_overrides(message.metadata) + self.pipeline.apply_runtime_overrides(metadata) # Start duplex pipeline if not self._pipeline_started: @@ -241,6 +259,7 @@ class Session: reason=stop_reason, ) ) + await self._finalize_history(status="connected") await self.transport.close() async def _send_error(self, sender: str, error_message: str, code: str) -> None: @@ -270,5 +289,83 @@ class Session: async def cleanup(self) -> None: """Cleanup session resources.""" logger.info(f"Session {self.id} cleaning up") + await self._finalize_history(status="connected") await self.pipeline.cleanup() await self.transport.close() + + async def _start_history_bridge(self, metadata: Dict[str, Any]) -> None: + """Initialize backend history call record for this session.""" + if self._history_call_id: + return + + history_meta: Dict[str, Any] = {} + if isinstance(metadata.get("history"), dict): + history_meta = metadata["history"] + + raw_user_id = history_meta.get("userId", metadata.get("userId", settings.history_default_user_id)) + try: + user_id = int(raw_user_id) + except (TypeError, ValueError): + user_id = settings.history_default_user_id + + assistant_id = history_meta.get("assistantId", metadata.get("assistantId")) + source = str(history_meta.get("source", metadata.get("source", "debug"))) + + call_id = await create_history_call_record( + user_id=user_id, + assistant_id=str(assistant_id) if assistant_id else None, + source=source, + ) + if not call_id: + return + + self._history_call_id = call_id + self._history_call_started_mono = time.monotonic() + self._history_turn_index = 0 + self._history_finalized = False + logger.info(f"Session {self.id} history bridge enabled (call_id={call_id}, source={source})") + + async def _on_turn_complete(self, turn: ConversationTurn) -> None: + """Persist completed turns to backend call transcripts.""" + if not self._history_call_id: + return + if not turn.text or not turn.text.strip(): + return + + role = (turn.role or "").lower() + speaker = "human" if role == "user" else "ai" + + end_ms = 0 + if self._history_call_started_mono is not None: + end_ms = max(0, int((time.monotonic() - self._history_call_started_mono) * 1000)) + estimated_duration_ms = max(300, min(12000, len(turn.text.strip()) * 80)) + start_ms = max(0, end_ms - estimated_duration_ms) + + turn_index = self._history_turn_index + await add_history_transcript( + call_id=self._history_call_id, + turn_index=turn_index, + speaker=speaker, + content=turn.text.strip(), + start_ms=start_ms, + end_ms=end_ms, + duration_ms=max(1, end_ms - start_ms), + ) + self._history_turn_index += 1 + + async def _finalize_history(self, status: str) -> None: + """Finalize history call record once.""" + if not self._history_call_id or self._history_finalized: + return + + duration_seconds = 0 + if self._history_call_started_mono is not None: + duration_seconds = max(0, int(time.monotonic() - self._history_call_started_mono)) + + ok = await finalize_history_call_record( + call_id=self._history_call_id, + status=status, + duration_seconds=duration_seconds, + ) + if ok: + self._history_finalized = True diff --git a/web/pages/Assistants.tsx b/web/pages/Assistants.tsx index f273c1b..f15703b 100644 --- a/web/pages/Assistants.tsx +++ b/web/pages/Assistants.tsx @@ -1420,6 +1420,11 @@ export const DebugDrawer: React.FC<{ systemPrompt: assistant.prompt || '', greeting: assistant.opener || '', services, + history: { + assistantId: assistant.id, + userId: 1, + source: 'debug', + }, }, };