"""LLM (Large Language Model) Service implementations. Provides OpenAI-compatible LLM integration with streaming support for real-time voice conversation. """ import os import asyncio from typing import AsyncIterator, Optional, List, Dict, Any from loguru import logger from app.backend_client import search_knowledge_context from services.base import BaseLLMService, LLMMessage, ServiceState # Try to import openai try: from openai import AsyncOpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False logger.warning("openai package not available - LLM service will be disabled") class OpenAILLMService(BaseLLMService): """ OpenAI-compatible LLM service. Supports streaming responses for low-latency voice conversation. Works with OpenAI API, Azure OpenAI, and compatible APIs. """ def __init__( self, model: str = "gpt-4o-mini", api_key: Optional[str] = None, base_url: Optional[str] = None, system_prompt: Optional[str] = None, knowledge_config: Optional[Dict[str, Any]] = None, ): """ Initialize OpenAI LLM service. Args: model: Model name (e.g., "gpt-4o-mini", "gpt-4o") api_key: OpenAI API key (defaults to OPENAI_API_KEY env var) base_url: Custom API base URL (for Azure or compatible APIs) system_prompt: Default system prompt for conversations """ super().__init__(model=model) self.api_key = api_key or os.getenv("OPENAI_API_KEY") self.base_url = base_url or os.getenv("OPENAI_API_URL") self.system_prompt = system_prompt or ( "You are a helpful, friendly voice assistant. " "Keep your responses concise and conversational. " "Respond naturally as if having a phone conversation." ) self.client: Optional[AsyncOpenAI] = None self._cancel_event = asyncio.Event() self._knowledge_config: Dict[str, Any] = knowledge_config or {} _RAG_DEFAULT_RESULTS = 5 _RAG_MAX_RESULTS = 8 _RAG_MAX_CONTEXT_CHARS = 4000 async def connect(self) -> None: """Initialize OpenAI client.""" if not OPENAI_AVAILABLE: raise RuntimeError("openai package not installed") if not self.api_key: raise ValueError("OpenAI API key not provided") self.client = AsyncOpenAI( api_key=self.api_key, base_url=self.base_url ) self.state = ServiceState.CONNECTED logger.info(f"OpenAI LLM service connected: model={self.model}") async def disconnect(self) -> None: """Close OpenAI client.""" if self.client: await self.client.close() self.client = None self.state = ServiceState.DISCONNECTED logger.info("OpenAI LLM service disconnected") def _prepare_messages(self, messages: List[LLMMessage]) -> List[Dict[str, Any]]: """Prepare messages list with system prompt.""" result = [] # Add system prompt if not already present has_system = any(m.role == "system" for m in messages) if not has_system and self.system_prompt: result.append({"role": "system", "content": self.system_prompt}) # Add all messages for msg in messages: result.append(msg.to_dict()) return result def set_knowledge_config(self, config: Optional[Dict[str, Any]]) -> None: """Update runtime knowledge retrieval config.""" self._knowledge_config = config or {} @staticmethod def _coerce_int(value: Any, default: int) -> int: try: return int(value) except (TypeError, ValueError): return default def _resolve_kb_id(self) -> Optional[str]: cfg = self._knowledge_config if isinstance(self._knowledge_config, dict) else {} kb_id = str( cfg.get("kbId") or cfg.get("knowledgeBaseId") or cfg.get("knowledge_base_id") or "" ).strip() return kb_id or None def _build_knowledge_prompt(self, results: List[Dict[str, Any]]) -> Optional[str]: if not results: return None lines = [ "You have retrieved the following knowledge base snippets.", "Use them only when relevant to the latest user request.", "If snippets are insufficient, say you are not sure instead of guessing.", "", ] used_chars = 0 used_count = 0 for item in results: content = str(item.get("content") or "").strip() if not content: continue if used_chars >= self._RAG_MAX_CONTEXT_CHARS: break metadata = item.get("metadata") if isinstance(item.get("metadata"), dict) else {} doc_id = metadata.get("document_id") chunk_index = metadata.get("chunk_index") distance = item.get("distance") source_parts = [] if doc_id: source_parts.append(f"doc={doc_id}") if chunk_index is not None: source_parts.append(f"chunk={chunk_index}") source = f" ({', '.join(source_parts)})" if source_parts else "" distance_text = "" try: if distance is not None: distance_text = f", distance={float(distance):.4f}" except (TypeError, ValueError): distance_text = "" remaining = self._RAG_MAX_CONTEXT_CHARS - used_chars snippet = content[:remaining].strip() if not snippet: continue used_count += 1 lines.append(f"[{used_count}{source}{distance_text}] {snippet}") used_chars += len(snippet) if used_count == 0: return None return "\n".join(lines) async def _with_knowledge_context(self, messages: List[LLMMessage]) -> List[LLMMessage]: cfg = self._knowledge_config if isinstance(self._knowledge_config, dict) else {} enabled = cfg.get("enabled", True) if isinstance(enabled, str): enabled = enabled.strip().lower() not in {"false", "0", "off", "no"} if not enabled: return messages kb_id = self._resolve_kb_id() if not kb_id: return messages latest_user = "" for msg in reversed(messages): if msg.role == "user": latest_user = (msg.content or "").strip() break if not latest_user: return messages n_results = self._coerce_int(cfg.get("nResults"), self._RAG_DEFAULT_RESULTS) n_results = max(1, min(n_results, self._RAG_MAX_RESULTS)) results = await search_knowledge_context( kb_id=kb_id, query=latest_user, n_results=n_results, ) prompt = self._build_knowledge_prompt(results) if not prompt: return messages logger.debug(f"RAG context injected (kb_id={kb_id}, chunks={len(results)})") rag_system = LLMMessage(role="system", content=prompt) if messages and messages[0].role == "system": return [messages[0], rag_system, *messages[1:]] return [rag_system, *messages] async def generate( self, messages: List[LLMMessage], temperature: float = 0.7, max_tokens: Optional[int] = None ) -> str: """ Generate a complete response. Args: messages: Conversation history temperature: Sampling temperature max_tokens: Maximum tokens to generate Returns: Complete response text """ if not self.client: raise RuntimeError("LLM service not connected") rag_messages = await self._with_knowledge_context(messages) prepared = self._prepare_messages(rag_messages) try: response = await self.client.chat.completions.create( model=self.model, messages=prepared, temperature=temperature, max_tokens=max_tokens ) content = response.choices[0].message.content or "" logger.debug(f"LLM response: {content[:100]}...") return content except Exception as e: logger.error(f"LLM generation error: {e}") raise async def generate_stream( self, messages: List[LLMMessage], temperature: float = 0.7, max_tokens: Optional[int] = None ) -> AsyncIterator[str]: """ Generate response in streaming mode. Args: messages: Conversation history temperature: Sampling temperature max_tokens: Maximum tokens to generate Yields: Text chunks as they are generated """ if not self.client: raise RuntimeError("LLM service not connected") rag_messages = await self._with_knowledge_context(messages) prepared = self._prepare_messages(rag_messages) self._cancel_event.clear() try: stream = await self.client.chat.completions.create( model=self.model, messages=prepared, temperature=temperature, max_tokens=max_tokens, stream=True ) async for chunk in stream: # Check for cancellation if self._cancel_event.is_set(): logger.info("LLM stream cancelled") break if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content yield content except asyncio.CancelledError: logger.info("LLM stream cancelled via asyncio") raise except Exception as e: logger.error(f"LLM streaming error: {e}") raise def cancel(self) -> None: """Cancel ongoing generation.""" self._cancel_event.set() class MockLLMService(BaseLLMService): """ Mock LLM service for testing without API calls. """ def __init__(self, response_delay: float = 0.5): super().__init__(model="mock") self.response_delay = response_delay self.responses = [ "Hello! How can I help you today?", "That's an interesting question. Let me think about it.", "I understand. Is there anything else you'd like to know?", "Great! I'm here if you need anything else.", ] self._response_index = 0 async def connect(self) -> None: self.state = ServiceState.CONNECTED logger.info("Mock LLM service connected") async def disconnect(self) -> None: self.state = ServiceState.DISCONNECTED logger.info("Mock LLM service disconnected") async def generate( self, messages: List[LLMMessage], temperature: float = 0.7, max_tokens: Optional[int] = None ) -> str: await asyncio.sleep(self.response_delay) response = self.responses[self._response_index % len(self.responses)] self._response_index += 1 return response async def generate_stream( self, messages: List[LLMMessage], temperature: float = 0.7, max_tokens: Optional[int] = None ) -> AsyncIterator[str]: response = await self.generate(messages, temperature, max_tokens) # Stream word by word words = response.split() for i, word in enumerate(words): if i > 0: yield " " yield word await asyncio.sleep(0.05) # Simulate streaming delay