"""Backend API client for assistant config and history persistence.""" from __future__ import annotations from typing import Any, Dict, List, Optional import aiohttp from loguru import logger from app.config import settings async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]: """Fetch assistant config payload from backend API. Expected response shape: { "assistant": {...}, "voice": {...} | null } """ if not settings.backend_url: logger.warning("BACKEND_URL not set; skipping assistant config fetch") return None url = f"{settings.backend_url.rstrip('/')}/api/assistants/{assistant_id}/config" timeout = aiohttp.ClientTimeout(total=settings.backend_timeout_sec) try: async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(url) as resp: if resp.status == 404: logger.warning(f"Assistant config not found: {assistant_id}") return None resp.raise_for_status() payload = await resp.json() if not isinstance(payload, dict): logger.warning("Assistant config payload is not a dict; ignoring") return None return payload except Exception as exc: logger.warning(f"Failed to fetch assistant config ({assistant_id}): {exc}") return None def _backend_base_url() -> Optional[str]: if not settings.backend_url: return None return settings.backend_url.rstrip("/") def _timeout() -> aiohttp.ClientTimeout: return aiohttp.ClientTimeout(total=settings.backend_timeout_sec) async def create_history_call_record( *, user_id: int, assistant_id: Optional[str], source: str = "debug", ) -> Optional[str]: """Create a call record via backend history API and return call_id.""" base_url = _backend_base_url() if not base_url: return None url = f"{base_url}/api/history" payload: Dict[str, Any] = { "user_id": user_id, "assistant_id": assistant_id, "source": source, "status": "connected", } try: async with aiohttp.ClientSession(timeout=_timeout()) as session: async with session.post(url, json=payload) as resp: resp.raise_for_status() data = await resp.json() call_id = str((data or {}).get("id") or "") return call_id or None except Exception as exc: logger.warning(f"Failed to create history call record: {exc}") return None async def add_history_transcript( *, call_id: str, turn_index: int, speaker: str, content: str, start_ms: int, end_ms: int, confidence: Optional[float] = None, duration_ms: Optional[int] = None, ) -> bool: """Append a transcript segment to backend history.""" base_url = _backend_base_url() if not base_url or not call_id: return False url = f"{base_url}/api/history/{call_id}/transcripts" payload: Dict[str, Any] = { "turn_index": turn_index, "speaker": speaker, "content": content, "confidence": confidence, "start_ms": start_ms, "end_ms": end_ms, "duration_ms": duration_ms, } try: async with aiohttp.ClientSession(timeout=_timeout()) as session: async with session.post(url, json=payload) as resp: resp.raise_for_status() return True except Exception as exc: logger.warning(f"Failed to append history transcript (call_id={call_id}, turn={turn_index}): {exc}") return False async def finalize_history_call_record( *, call_id: str, status: str, duration_seconds: int, ) -> bool: """Finalize a call record with status and duration.""" base_url = _backend_base_url() if not base_url or not call_id: return False url = f"{base_url}/api/history/{call_id}" payload: Dict[str, Any] = { "status": status, "duration_seconds": duration_seconds, } try: async with aiohttp.ClientSession(timeout=_timeout()) as session: async with session.put(url, json=payload) as resp: resp.raise_for_status() return True except Exception as exc: logger.warning(f"Failed to finalize history call record ({call_id}): {exc}") return False async def search_knowledge_context( *, kb_id: str, query: str, n_results: int = 5, ) -> List[Dict[str, Any]]: """Search backend knowledge base and return retrieval results.""" base_url = _backend_base_url() if not base_url: return [] if not kb_id or not query.strip(): return [] try: safe_n_results = max(1, int(n_results)) except (TypeError, ValueError): safe_n_results = 5 url = f"{base_url}/api/knowledge/search" payload: Dict[str, Any] = { "kb_id": kb_id, "query": query, "nResults": safe_n_results, } try: async with aiohttp.ClientSession(timeout=_timeout()) as session: async with session.post(url, json=payload) as resp: if resp.status == 404: logger.warning(f"Knowledge base not found for retrieval: {kb_id}") return [] resp.raise_for_status() data = await resp.json() if not isinstance(data, dict): return [] results = data.get("results", []) if not isinstance(results, list): return [] return [r for r in results if isinstance(r, dict)] except Exception as exc: logger.warning(f"Knowledge search failed (kb_id={kb_id}): {exc}") return [] async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]: """Fetch tool resource configuration from backend API.""" base_url = _backend_base_url() if not base_url or not tool_id: return None url = f"{base_url}/api/tools/resources/{tool_id}" try: async with aiohttp.ClientSession(timeout=_timeout()) as session: async with session.get(url) as resp: if resp.status == 404: return None resp.raise_for_status() data = await resp.json() return data if isinstance(data, dict) else None except Exception as exc: logger.warning(f"Failed to fetch tool resource ({tool_id}): {exc}") return None