"""Backend adapter implementations for engine integration ports.""" from __future__ import annotations from typing import Any, Dict, List, Optional import aiohttp from loguru import logger from app.config import settings class NullBackendAdapter: """No-op adapter for engine-only runtime without backend dependencies.""" async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]: _ = assistant_id return None async def create_call_record( self, *, user_id: int, assistant_id: Optional[str], source: str = "debug", ) -> Optional[str]: _ = (user_id, assistant_id, source) return None async def add_transcript( self, *, call_id: str, turn_index: int, speaker: str, content: str, start_ms: int, end_ms: int, confidence: Optional[float] = None, duration_ms: Optional[int] = None, ) -> bool: _ = (call_id, turn_index, speaker, content, start_ms, end_ms, confidence, duration_ms) return False async def finalize_call_record( self, *, call_id: str, status: str, duration_seconds: int, ) -> bool: _ = (call_id, status, duration_seconds) return False async def search_knowledge_context( self, *, kb_id: str, query: str, n_results: int = 5, ) -> List[Dict[str, Any]]: _ = (kb_id, query, n_results) return [] async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]: _ = tool_id return None class HistoryDisabledBackendAdapter: """Adapter wrapper that disables history writes while keeping reads available.""" def __init__(self, delegate: HttpBackendAdapter | NullBackendAdapter): self._delegate = delegate async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]: return await self._delegate.fetch_assistant_config(assistant_id) async def create_call_record( self, *, user_id: int, assistant_id: Optional[str], source: str = "debug", ) -> Optional[str]: _ = (user_id, assistant_id, source) return None async def add_transcript( self, *, call_id: str, turn_index: int, speaker: str, content: str, start_ms: int, end_ms: int, confidence: Optional[float] = None, duration_ms: Optional[int] = None, ) -> bool: _ = (call_id, turn_index, speaker, content, start_ms, end_ms, confidence, duration_ms) return False async def finalize_call_record( self, *, call_id: str, status: str, duration_seconds: int, ) -> bool: _ = (call_id, status, duration_seconds) return False async def search_knowledge_context( self, *, kb_id: str, query: str, n_results: int = 5, ) -> List[Dict[str, Any]]: return await self._delegate.search_knowledge_context( kb_id=kb_id, query=query, n_results=n_results, ) async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]: return await self._delegate.fetch_tool_resource(tool_id) class HttpBackendAdapter: """HTTP implementation of backend integration ports.""" def __init__(self, backend_url: str, timeout_sec: int = 10): base_url = str(backend_url or "").strip().rstrip("/") if not base_url: raise ValueError("backend_url is required for HttpBackendAdapter") self._base_url = base_url self._timeout_sec = timeout_sec def _timeout(self) -> aiohttp.ClientTimeout: return aiohttp.ClientTimeout(total=self._timeout_sec) async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]: """Fetch assistant config payload from backend API. Expected response shape: { "assistant": {...}, "voice": {...} | null } """ url = f"{self._base_url}/api/assistants/{assistant_id}/config" try: async with aiohttp.ClientSession(timeout=self._timeout()) as session: async with session.get(url) as resp: if resp.status == 404: logger.warning(f"Assistant config not found: {assistant_id}") return {"__error_code": "assistant.not_found", "assistantId": assistant_id} resp.raise_for_status() payload = await resp.json() if not isinstance(payload, dict): logger.warning("Assistant config payload is not a dict; ignoring") return {"__error_code": "assistant.config_unavailable", "assistantId": assistant_id} return payload except Exception as exc: logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}") return {"__error_code": "assistant.config_unavailable", "assistantId": assistant_id} async def create_call_record( self, *, user_id: int, assistant_id: Optional[str], source: str = "debug", ) -> Optional[str]: """Create a call record via backend history API and return call_id.""" url = f"{self._base_url}/api/history" payload: Dict[str, Any] = { "user_id": user_id, "assistant_id": assistant_id, "source": source, "status": "connected", } try: async with aiohttp.ClientSession(timeout=self._timeout()) as session: async with session.post(url, json=payload) as resp: resp.raise_for_status() data = await resp.json() call_id = str((data or {}).get("id") or "") return call_id or None except Exception as exc: logger.warning(f"Failed to create history call record: {exc}") return None async def add_transcript( self, *, call_id: str, turn_index: int, speaker: str, content: str, start_ms: int, end_ms: int, confidence: Optional[float] = None, duration_ms: Optional[int] = None, ) -> bool: """Append a transcript segment to backend history.""" if not call_id: return False url = f"{self._base_url}/api/history/{call_id}/transcripts" payload: Dict[str, Any] = { "turn_index": turn_index, "speaker": speaker, "content": content, "confidence": confidence, "start_ms": start_ms, "end_ms": end_ms, "duration_ms": duration_ms, } try: async with aiohttp.ClientSession(timeout=self._timeout()) as session: async with session.post(url, json=payload) as resp: resp.raise_for_status() return True except Exception as exc: logger.warning(f"Failed to append history transcript (call_id={call_id}, turn={turn_index}): {exc}") return False async def finalize_call_record( self, *, call_id: str, status: str, duration_seconds: int, ) -> bool: """Finalize a call record with status and duration.""" if not call_id: return False url = f"{self._base_url}/api/history/{call_id}" payload: Dict[str, Any] = { "status": status, "duration_seconds": duration_seconds, } try: async with aiohttp.ClientSession(timeout=self._timeout()) as session: async with session.put(url, json=payload) as resp: resp.raise_for_status() return True except Exception as exc: logger.warning(f"Failed to finalize history call record ({call_id}): {exc}") return False async def search_knowledge_context( self, *, kb_id: str, query: str, n_results: int = 5, ) -> List[Dict[str, Any]]: """Search backend knowledge base and return retrieval results.""" if not kb_id or not query.strip(): return [] try: safe_n_results = max(1, int(n_results)) except (TypeError, ValueError): safe_n_results = 5 url = f"{self._base_url}/api/knowledge/search" payload: Dict[str, Any] = { "kb_id": kb_id, "query": query, "nResults": safe_n_results, } try: async with aiohttp.ClientSession(timeout=self._timeout()) as session: async with session.post(url, json=payload) as resp: if resp.status == 404: logger.warning(f"Knowledge base not found for retrieval: {kb_id}") return [] resp.raise_for_status() data = await resp.json() if not isinstance(data, dict): return [] results = data.get("results", []) if not isinstance(results, list): return [] return [r for r in results if isinstance(r, dict)] except Exception as exc: logger.warning(f"Knowledge search failed (kb_id={kb_id}): {exc}") return [] async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]: """Fetch tool resource configuration from backend API.""" if not tool_id: return None url = f"{self._base_url}/api/tools/resources/{tool_id}" try: async with aiohttp.ClientSession(timeout=self._timeout()) as session: async with session.get(url) as resp: if resp.status == 404: return None resp.raise_for_status() data = await resp.json() return data if isinstance(data, dict) else None except Exception as exc: logger.warning(f"Failed to fetch tool resource ({tool_id}): {exc}") return None def build_backend_adapter( *, backend_url: Optional[str], backend_mode: str = "auto", history_enabled: bool = True, timeout_sec: int = 10, ) -> HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter: """Create backend adapter implementation based on runtime settings.""" mode = str(backend_mode or "auto").strip().lower() has_url = bool(str(backend_url or "").strip()) base_adapter: HttpBackendAdapter | NullBackendAdapter if mode in {"disabled", "off", "none", "null", "engine_only", "engine-only"}: base_adapter = NullBackendAdapter() elif mode == "http": if has_url: base_adapter = HttpBackendAdapter(backend_url=str(backend_url), timeout_sec=timeout_sec) else: logger.warning("BACKEND_MODE=http but BACKEND_URL is empty; falling back to NullBackendAdapter") base_adapter = NullBackendAdapter() else: if has_url: base_adapter = HttpBackendAdapter(backend_url=str(backend_url), timeout_sec=timeout_sec) else: base_adapter = NullBackendAdapter() if not history_enabled: return HistoryDisabledBackendAdapter(base_adapter) return base_adapter def build_backend_adapter_from_settings() -> HttpBackendAdapter | NullBackendAdapter | HistoryDisabledBackendAdapter: """Create backend adapter using current app settings.""" return build_backend_adapter( backend_url=settings.backend_url, backend_mode=settings.backend_mode, history_enabled=settings.history_enabled, timeout_sec=settings.backend_timeout_sec, )