From 4e2450e8004d99c6e23c88e4ffc85ff5dba55c05 Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Fri, 6 Mar 2026 09:00:43 +0800 Subject: [PATCH] Refactor backend integration and service architecture - Removed the backend client compatibility wrapper and associated methods to streamline backend integration. - Updated session management to utilize control plane gateways and runtime configuration providers. - Adjusted TTS service implementations to remove the EdgeTTS service and simplify service dependencies. - Enhanced documentation to reflect changes in backend integration and service architecture. - Updated configuration files to remove deprecated TTS provider options and clarify available settings. --- engine/app/backend_client.py | 87 ------ engine/app/config.py | 2 +- engine/app/main.py | 6 +- engine/app/service_factory.py | 112 ++++++++ engine/config/agents/example.yaml | 2 +- engine/config/agents/tools.yaml | 2 +- engine/core/duplex_pipeline.py | 129 ++++----- engine/core/history_bridge.py | 6 +- engine/core/ports/__init__.py | 37 ++- engine/core/ports/asr.py | 64 +++++ .../ports/{backend.py => control_plane.py} | 27 +- engine/core/ports/llm.py | 67 +++++ engine/core/ports/service_factory.py | 22 ++ engine/core/ports/tts.py | 41 +++ engine/core/session.py | 36 ++- engine/docs/backend_integration.md | 3 +- engine/docs/extension_ports.md | 47 ++++ engine/docs/high_level_architecture.md | 129 +++++++++ engine/requirements.txt | 4 - engine/services/__init__.py | 3 +- engine/services/tts.py | 254 ++---------------- .../tests/test_ws_protocol_session_start.py | 4 +- 22 files changed, 632 insertions(+), 452 deletions(-) delete mode 100644 engine/app/backend_client.py create mode 100644 engine/app/service_factory.py create mode 100644 engine/core/ports/asr.py rename engine/core/ports/{backend.py => control_plane.py} (75%) create mode 100644 engine/core/ports/llm.py create mode 100644 engine/core/ports/service_factory.py create mode 100644 engine/core/ports/tts.py create mode 100644 engine/docs/extension_ports.md create mode 100644 engine/docs/high_level_architecture.md diff --git a/engine/app/backend_client.py b/engine/app/backend_client.py deleted file mode 100644 index 93ea183..0000000 --- a/engine/app/backend_client.py +++ /dev/null @@ -1,87 +0,0 @@ -"""Compatibility wrappers around backend adapter implementations.""" - -from __future__ import annotations - -from typing import Any, Dict, List, Optional - -from app.backend_adapters import build_backend_adapter_from_settings - - -def _adapter(): - return build_backend_adapter_from_settings() - - -async def fetch_assistant_config(assistant_id: str) -> Optional[Dict[str, Any]]: - """Fetch assistant config payload from backend adapter.""" - return await _adapter().fetch_assistant_config(assistant_id) - - -async def create_history_call_record( - *, - user_id: int, - assistant_id: Optional[str], - source: str = "debug", -) -> Optional[str]: - """Create a call record via backend history API and return call_id.""" - return await _adapter().create_call_record( - user_id=user_id, - assistant_id=assistant_id, - source=source, - ) - - -async def add_history_transcript( - *, - call_id: str, - turn_index: int, - speaker: str, - content: str, - start_ms: int, - end_ms: int, - confidence: Optional[float] = None, - duration_ms: Optional[int] = None, -) -> bool: - """Append a transcript segment to backend history.""" - return await _adapter().add_transcript( - call_id=call_id, - turn_index=turn_index, - speaker=speaker, - content=content, - start_ms=start_ms, - end_ms=end_ms, - confidence=confidence, - duration_ms=duration_ms, - ) - - -async def finalize_history_call_record( - *, - call_id: str, - status: str, - duration_seconds: int, -) -> bool: - """Finalize a call record with status and duration.""" - return await _adapter().finalize_call_record( - call_id=call_id, - status=status, - duration_seconds=duration_seconds, - ) - - -async def search_knowledge_context( - *, - kb_id: str, - query: str, - n_results: int = 5, -) -> List[Dict[str, Any]]: - """Search backend knowledge base and return retrieval results.""" - return await _adapter().search_knowledge_context( - kb_id=kb_id, - query=query, - n_results=n_results, - ) - - -async def fetch_tool_resource(tool_id: str) -> Optional[Dict[str, Any]]: - """Fetch tool resource configuration from backend API.""" - return await _adapter().fetch_tool_resource(tool_id) diff --git a/engine/app/config.py b/engine/app/config.py index d1ac72f..233ba75 100644 --- a/engine/app/config.py +++ b/engine/app/config.py @@ -71,7 +71,7 @@ class Settings(BaseSettings): # TTS Configuration tts_provider: str = Field( default="openai_compatible", - description="TTS provider (edge, openai_compatible, siliconflow, dashscope)" + description="TTS provider (openai_compatible, siliconflow, dashscope)" ) tts_api_url: Optional[str] = Field(default=None, description="TTS provider API URL") tts_model: Optional[str] = Field(default=None, description="TTS model name") diff --git a/engine/app/main.py b/engine/app/main.py index bd513f6..5625061 100644 --- a/engine/app/main.py +++ b/engine/app/main.py @@ -76,7 +76,7 @@ app.add_middleware( # Active sessions storage active_sessions: Dict[str, Session] = {} -backend_gateway = build_backend_adapter_from_settings() +control_plane_gateway = build_backend_adapter_from_settings() # Configure logging logger.remove() @@ -187,7 +187,7 @@ async def websocket_endpoint(websocket: WebSocket): session = Session( session_id, transport, - backend_gateway=backend_gateway, + control_plane_gateway=control_plane_gateway, assistant_id=assistant_id, ) active_sessions[session_id] = session @@ -272,7 +272,7 @@ async def webrtc_endpoint(websocket: WebSocket): session = Session( session_id, transport, - backend_gateway=backend_gateway, + control_plane_gateway=control_plane_gateway, assistant_id=assistant_id, ) active_sessions[session_id] = session diff --git a/engine/app/service_factory.py b/engine/app/service_factory.py new file mode 100644 index 0000000..6bdb64c --- /dev/null +++ b/engine/app/service_factory.py @@ -0,0 +1,112 @@ +"""Default runtime service factory implementing core extension ports.""" + +from __future__ import annotations + +from typing import Any + +from loguru import logger + +from core.ports import ( + ASRPort, + ASRServiceSpec, + LLMPort, + LLMServiceSpec, + RealtimeServiceFactory, + TTSPort, + TTSServiceSpec, +) +from services.asr import BufferedASRService +from services.dashscope_tts import DashScopeTTSService +from services.llm import MockLLMService, OpenAILLMService +from services.openai_compatible_asr import OpenAICompatibleASRService +from services.openai_compatible_tts import OpenAICompatibleTTSService +from services.tts import MockTTSService + +_OPENAI_COMPATIBLE_PROVIDERS = {"openai_compatible", "openai-compatible", "siliconflow"} +_SUPPORTED_LLM_PROVIDERS = {"openai", *_OPENAI_COMPATIBLE_PROVIDERS} + + +class DefaultRealtimeServiceFactory(RealtimeServiceFactory): + """Build concrete runtime services from normalized specs.""" + + _DEFAULT_DASHSCOPE_TTS_REALTIME_URL = "wss://dashscope.aliyuncs.com/api-ws/v1/realtime" + _DEFAULT_DASHSCOPE_TTS_MODEL = "qwen3-tts-flash-realtime" + _DEFAULT_OPENAI_COMPATIBLE_TTS_MODEL = "FunAudioLLM/CosyVoice2-0.5B" + _DEFAULT_OPENAI_COMPATIBLE_ASR_MODEL = "FunAudioLLM/SenseVoiceSmall" + + @staticmethod + def _normalize_provider(provider: Any) -> str: + return str(provider or "").strip().lower() + + @staticmethod + def _resolve_dashscope_mode(raw_mode: Any) -> str: + mode = str(raw_mode or "commit").strip().lower() + if mode in {"commit", "server_commit"}: + return mode + return "commit" + + def create_llm_service(self, spec: LLMServiceSpec) -> LLMPort: + provider = self._normalize_provider(spec.provider) + if provider in _SUPPORTED_LLM_PROVIDERS and spec.api_key: + return OpenAILLMService( + api_key=spec.api_key, + base_url=spec.base_url, + model=spec.model, + system_prompt=spec.system_prompt, + knowledge_config=spec.knowledge_config, + knowledge_searcher=spec.knowledge_searcher, + ) + + logger.warning( + "LLM provider unsupported or API key missing (provider={}); using mock LLM", + provider or "-", + ) + return MockLLMService() + + def create_tts_service(self, spec: TTSServiceSpec) -> TTSPort: + provider = self._normalize_provider(spec.provider) + + if provider == "dashscope" and spec.api_key: + return DashScopeTTSService( + api_key=spec.api_key, + api_url=spec.api_url or self._DEFAULT_DASHSCOPE_TTS_REALTIME_URL, + voice=spec.voice, + model=spec.model or self._DEFAULT_DASHSCOPE_TTS_MODEL, + mode=self._resolve_dashscope_mode(spec.mode), + sample_rate=spec.sample_rate, + speed=spec.speed, + ) + + if provider in _OPENAI_COMPATIBLE_PROVIDERS and spec.api_key: + return OpenAICompatibleTTSService( + api_key=spec.api_key, + api_url=spec.api_url, + voice=spec.voice, + model=spec.model or self._DEFAULT_OPENAI_COMPATIBLE_TTS_MODEL, + sample_rate=spec.sample_rate, + speed=spec.speed, + ) + + logger.warning( + "TTS provider unsupported or API key missing (provider={}); using mock TTS", + provider or "-", + ) + return MockTTSService(sample_rate=spec.sample_rate) + + def create_asr_service(self, spec: ASRServiceSpec) -> ASRPort: + provider = self._normalize_provider(spec.provider) + + if provider in _OPENAI_COMPATIBLE_PROVIDERS and spec.api_key: + return OpenAICompatibleASRService( + api_key=spec.api_key, + api_url=spec.api_url, + model=spec.model or self._DEFAULT_OPENAI_COMPATIBLE_ASR_MODEL, + sample_rate=spec.sample_rate, + language=spec.language, + interim_interval_ms=spec.interim_interval_ms, + min_audio_for_interim_ms=spec.min_audio_for_interim_ms, + on_transcript=spec.on_transcript, + ) + + logger.info("Using buffered ASR service (provider={})", provider or "-") + return BufferedASRService(sample_rate=spec.sample_rate, language=spec.language) diff --git a/engine/config/agents/example.yaml b/engine/config/agents/example.yaml index dd0e927..70f4933 100644 --- a/engine/config/agents/example.yaml +++ b/engine/config/agents/example.yaml @@ -21,7 +21,7 @@ agent: api_url: https://api.qnaigc.com/v1 tts: - # provider: edge | openai_compatible | siliconflow | dashscope + # provider: openai_compatible | siliconflow | dashscope # dashscope defaults (if omitted): # api_url: wss://dashscope.aliyuncs.com/api-ws/v1/realtime # model: qwen3-tts-flash-realtime diff --git a/engine/config/agents/tools.yaml b/engine/config/agents/tools.yaml index 4d8bd72..e2968bb 100644 --- a/engine/config/agents/tools.yaml +++ b/engine/config/agents/tools.yaml @@ -18,7 +18,7 @@ agent: api_url: https://api.qnaigc.com/v1 tts: - # provider: edge | openai_compatible | siliconflow | dashscope + # provider: openai_compatible | siliconflow | dashscope # dashscope defaults (if omitted): # api_url: wss://dashscope.aliyuncs.com/api-ws/v1/realtime # model: qwen3-tts-flash-realtime diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index d6c81ee..bbf3d47 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -26,21 +26,25 @@ import aiohttp from loguru import logger from app.config import settings +from app.service_factory import DefaultRealtimeServiceFactory from core.conversation import ConversationManager, ConversationState from core.events import get_event_bus +from core.ports import ( + ASRPort, + ASRServiceSpec, + LLMPort, + LLMServiceSpec, + RealtimeServiceFactory, + TTSPort, + TTSServiceSpec, +) from core.tool_executor import execute_server_tool from core.transports import BaseTransport from models.ws_v1 import ev from processors.eou import EouDetector from processors.vad import SileroVAD, VADProcessor -from services.asr import BufferedASRService -from services.base import BaseASRService, BaseLLMService, BaseTTSService, LLMMessage, LLMStreamEvent -from services.dashscope_tts import DashScopeTTSService -from services.llm import MockLLMService, OpenAILLMService -from services.openai_compatible_asr import OpenAICompatibleASRService -from services.openai_compatible_tts import OpenAICompatibleTTSService +from services.base import LLMMessage, LLMStreamEvent from services.streaming_text import extract_tts_sentence, has_spoken_content -from services.tts import EdgeTTSService, MockTTSService class DuplexPipeline: @@ -258,9 +262,9 @@ class DuplexPipeline: self, transport: BaseTransport, session_id: str, - llm_service: Optional[BaseLLMService] = None, - tts_service: Optional[BaseTTSService] = None, - asr_service: Optional[BaseASRService] = None, + llm_service: Optional[LLMPort] = None, + tts_service: Optional[TTSPort] = None, + asr_service: Optional[ASRPort] = None, system_prompt: Optional[str] = None, greeting: Optional[str] = None, knowledge_searcher: Optional[ @@ -272,6 +276,7 @@ class DuplexPipeline: server_tool_executor: Optional[ Callable[[Dict[str, Any]], Awaitable[Dict[str, Any]]] ] = None, + service_factory: Optional[RealtimeServiceFactory] = None, ): """ Initialize duplex pipeline. @@ -279,8 +284,8 @@ class DuplexPipeline: Args: transport: Transport for sending audio/events session_id: Session identifier - llm_service: LLM service (defaults to OpenAI) - tts_service: TTS service (defaults to EdgeTTS) + llm_service: Optional injected LLM port implementation + tts_service: Optional injected TTS port implementation asr_service: ASR service (optional) system_prompt: System prompt for LLM greeting: Optional greeting to speak on start @@ -312,6 +317,7 @@ class DuplexPipeline: self.llm_service = llm_service self.tts_service = tts_service self.asr_service = asr_service # Will be initialized in start() + self._service_factory = service_factory or DefaultRealtimeServiceFactory() self._knowledge_searcher = knowledge_searcher self._tool_resource_resolver = tool_resource_resolver self._server_tool_executor = server_tool_executor @@ -776,21 +782,11 @@ class DuplexPipeline: return False return None - @staticmethod - def _is_openai_compatible_provider(provider: Any) -> bool: - normalized = str(provider or "").strip().lower() - return normalized in {"openai_compatible", "openai-compatible", "siliconflow"} - @staticmethod def _is_dashscope_tts_provider(provider: Any) -> bool: normalized = str(provider or "").strip().lower() return normalized == "dashscope" - @staticmethod - def _is_llm_provider_supported(provider: Any) -> bool: - normalized = str(provider or "").strip().lower() - return normalized in {"openai", "openai_compatible", "openai-compatible", "siliconflow"} - @staticmethod def _default_llm_base_url(provider: Any) -> Optional[str]: normalized = str(provider or "").strip().lower() @@ -798,10 +794,6 @@ class DuplexPipeline: return "https://api.siliconflow.cn/v1" return None - @staticmethod - def _default_dashscope_tts_realtime_url() -> str: - return "wss://dashscope.aliyuncs.com/api-ws/v1/realtime" - @staticmethod def _default_dashscope_tts_model() -> str: return "qwen3-tts-flash-realtime" @@ -900,18 +892,18 @@ class DuplexPipeline: or self._default_llm_base_url(llm_provider) ) llm_model = self._runtime_llm.get("model") or settings.llm_model - - if self._is_llm_provider_supported(llm_provider) and llm_api_key: - self.llm_service = OpenAILLMService( - api_key=llm_api_key, - base_url=llm_base_url, - model=llm_model, + self.llm_service = self._service_factory.create_llm_service( + LLMServiceSpec( + provider=llm_provider, + model=str(llm_model), + api_key=str(llm_api_key).strip() if llm_api_key else None, + base_url=str(llm_base_url).strip() if llm_base_url else None, + system_prompt=self.conversation.system_prompt, + temperature=settings.llm_temperature, knowledge_config=self._resolved_knowledge_config(), knowledge_searcher=self._knowledge_searcher, ) - else: - logger.warning("LLM provider unsupported or API key missing - using mock LLM") - self.llm_service = MockLLMService() + ) if hasattr(self.llm_service, "set_knowledge_config"): self.llm_service.set_knowledge_config(self._resolved_knowledge_config()) @@ -938,41 +930,29 @@ class DuplexPipeline: "services.tts.mode is DashScope-only and will be ignored " f"for provider={tts_provider}" ) - - if self._is_dashscope_tts_provider(tts_provider) and tts_api_key: - self.tts_service = DashScopeTTSService( - api_key=tts_api_key, - api_url=tts_api_url or self._default_dashscope_tts_realtime_url(), - voice=tts_voice, - model=tts_model or self._default_dashscope_tts_model(), + self.tts_service = self._service_factory.create_tts_service( + TTSServiceSpec( + provider=tts_provider, + api_key=str(tts_api_key).strip() if tts_api_key else None, + api_url=str(tts_api_url).strip() if tts_api_url else None, + voice=str(tts_voice), + model=str(tts_model).strip() if tts_model else None, + sample_rate=settings.sample_rate, + speed=tts_speed, mode=str(tts_mode), - sample_rate=settings.sample_rate, - speed=tts_speed ) - logger.info("Using DashScope realtime TTS service") - elif self._is_openai_compatible_provider(tts_provider) and tts_api_key: - self.tts_service = OpenAICompatibleTTSService( - api_key=tts_api_key, - api_url=tts_api_url, - voice=tts_voice, - model=tts_model or "FunAudioLLM/CosyVoice2-0.5B", - sample_rate=settings.sample_rate, - speed=tts_speed - ) - logger.info(f"Using OpenAI-compatible TTS service (provider={tts_provider})") - else: - self.tts_service = EdgeTTSService( - voice=tts_voice, - sample_rate=settings.sample_rate - ) - logger.info("Using Edge TTS service") + ) try: await self.tts_service.connect() except Exception as e: - logger.warning(f"TTS backend unavailable ({e}); falling back to MockTTS") - self.tts_service = MockTTSService( - sample_rate=settings.sample_rate + logger.warning(f"TTS backend unavailable ({e}); falling back to default TTS adapter") + self.tts_service = self._service_factory.create_tts_service( + TTSServiceSpec( + provider="mock", + voice="mock", + sample_rate=settings.sample_rate, + ) ) await self.tts_service.connect() else: @@ -988,22 +968,19 @@ class DuplexPipeline: asr_interim_interval = int(self._runtime_asr.get("interimIntervalMs") or settings.asr_interim_interval_ms) asr_min_audio_ms = int(self._runtime_asr.get("minAudioMs") or settings.asr_min_audio_ms) - if self._is_openai_compatible_provider(asr_provider) and asr_api_key: - self.asr_service = OpenAICompatibleASRService( - api_key=asr_api_key, - api_url=asr_api_url, - model=asr_model or "FunAudioLLM/SenseVoiceSmall", + self.asr_service = self._service_factory.create_asr_service( + ASRServiceSpec( + provider=asr_provider, sample_rate=settings.sample_rate, + language="auto", + api_key=str(asr_api_key).strip() if asr_api_key else None, + api_url=str(asr_api_url).strip() if asr_api_url else None, + model=str(asr_model).strip() if asr_model else None, interim_interval_ms=asr_interim_interval, min_audio_for_interim_ms=asr_min_audio_ms, - on_transcript=self._on_transcript_callback + on_transcript=self._on_transcript_callback, ) - logger.info(f"Using OpenAI-compatible ASR service (provider={asr_provider})") - else: - self.asr_service = BufferedASRService( - sample_rate=settings.sample_rate - ) - logger.info("Using Buffered ASR service (no real transcription)") + ) await self.asr_service.connect() diff --git a/engine/core/history_bridge.py b/engine/core/history_bridge.py index ead9a3b..70a681b 100644 --- a/engine/core/history_bridge.py +++ b/engine/core/history_bridge.py @@ -5,10 +5,12 @@ from __future__ import annotations import asyncio import time from dataclasses import dataclass -from typing import Any, Optional +from typing import Optional from loguru import logger +from core.ports import ConversationHistoryStore + @dataclass class _HistoryTranscriptJob: @@ -29,7 +31,7 @@ class SessionHistoryBridge: def __init__( self, *, - history_writer: Any, + history_writer: ConversationHistoryStore | None, enabled: bool, queue_max_size: int, retry_max_attempts: int, diff --git a/engine/core/ports/__init__.py b/engine/core/ports/__init__.py index 7d7c9dd..2ae96c4 100644 --- a/engine/core/ports/__init__.py +++ b/engine/core/ports/__init__.py @@ -1,17 +1,32 @@ """Port interfaces for engine-side integration boundaries.""" -from core.ports.backend import ( - AssistantConfigProvider, - BackendGateway, - HistoryWriter, - KnowledgeSearcher, - ToolResourceResolver, +from core.ports.asr import ASRBufferControl, ASRInterimControl, ASRPort, ASRServiceSpec +from core.ports.control_plane import ( + AssistantRuntimeConfigProvider, + ControlPlaneGateway, + ConversationHistoryStore, + KnowledgeRetriever, + ToolCatalog, ) +from core.ports.llm import LLMCancellable, LLMPort, LLMRuntimeConfigurable, LLMServiceSpec +from core.ports.service_factory import RealtimeServiceFactory +from core.ports.tts import TTSPort, TTSServiceSpec __all__ = [ - "AssistantConfigProvider", - "BackendGateway", - "HistoryWriter", - "KnowledgeSearcher", - "ToolResourceResolver", + "ASRPort", + "ASRServiceSpec", + "ASRInterimControl", + "ASRBufferControl", + "AssistantRuntimeConfigProvider", + "ControlPlaneGateway", + "ConversationHistoryStore", + "KnowledgeRetriever", + "ToolCatalog", + "LLMCancellable", + "LLMPort", + "LLMRuntimeConfigurable", + "LLMServiceSpec", + "RealtimeServiceFactory", + "TTSPort", + "TTSServiceSpec", ] diff --git a/engine/core/ports/asr.py b/engine/core/ports/asr.py new file mode 100644 index 0000000..fa302cd --- /dev/null +++ b/engine/core/ports/asr.py @@ -0,0 +1,64 @@ +"""ASR extension port contracts.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import AsyncIterator, Awaitable, Callable, Optional, Protocol + +from services.base import ASRResult + +TranscriptCallback = Callable[[str, bool], Awaitable[None]] + + +@dataclass(frozen=True) +class ASRServiceSpec: + """Resolved runtime configuration for ASR service creation.""" + + provider: str + sample_rate: int + language: str = "auto" + api_key: Optional[str] = None + api_url: Optional[str] = None + model: Optional[str] = None + interim_interval_ms: int = 500 + min_audio_for_interim_ms: int = 300 + on_transcript: Optional[TranscriptCallback] = None + + +class ASRPort(Protocol): + """Port for speech recognition providers.""" + + async def connect(self) -> None: + """Establish connection to ASR provider.""" + + async def disconnect(self) -> None: + """Release ASR resources.""" + + async def send_audio(self, audio: bytes) -> None: + """Push one PCM audio chunk for recognition.""" + + async def receive_transcripts(self) -> AsyncIterator[ASRResult]: + """Stream partial/final recognition results.""" + + +class ASRInterimControl(Protocol): + """Optional extension for explicit interim transcription control.""" + + async def start_interim_transcription(self) -> None: + """Start interim transcription loop if supported.""" + + async def stop_interim_transcription(self) -> None: + """Stop interim transcription loop if supported.""" + + +class ASRBufferControl(Protocol): + """Optional extension for explicit ASR buffer lifecycle control.""" + + def clear_buffer(self) -> None: + """Clear provider-side ASR buffer.""" + + async def get_final_transcription(self) -> str: + """Return final transcription for the current utterance.""" + + def get_and_clear_text(self) -> str: + """Return buffered text and clear internal state.""" diff --git a/engine/core/ports/backend.py b/engine/core/ports/control_plane.py similarity index 75% rename from engine/core/ports/backend.py rename to engine/core/ports/control_plane.py index 227c743..c50d642 100644 --- a/engine/core/ports/backend.py +++ b/engine/core/ports/control_plane.py @@ -1,7 +1,7 @@ -"""Backend integration ports. +"""Control-plane integration ports. These interfaces define the boundary between engine runtime logic and -backend-side capabilities (config lookup, history persistence, retrieval, +control-plane capabilities (config lookup, history persistence, retrieval, and tool resource discovery). """ @@ -10,14 +10,14 @@ from __future__ import annotations from typing import Any, Dict, List, Optional, Protocol -class AssistantConfigProvider(Protocol): +class AssistantRuntimeConfigProvider(Protocol): """Port for loading trusted assistant runtime configuration.""" async def fetch_assistant_config(self, assistant_id: str) -> Optional[Dict[str, Any]]: """Fetch assistant configuration payload.""" -class HistoryWriter(Protocol): +class ConversationHistoryStore(Protocol): """Port for persisting call and transcript history.""" async def create_call_record( @@ -27,7 +27,7 @@ class HistoryWriter(Protocol): assistant_id: Optional[str], source: str = "debug", ) -> Optional[str]: - """Create a call record and return backend call ID.""" + """Create a call record and return control-plane call ID.""" async def add_transcript( self, @@ -53,7 +53,7 @@ class HistoryWriter(Protocol): """Finalize a call record.""" -class KnowledgeSearcher(Protocol): +class KnowledgeRetriever(Protocol): """Port for RAG / knowledge retrieval operations.""" async def search_knowledge_context( @@ -66,19 +66,18 @@ class KnowledgeSearcher(Protocol): """Search a knowledge source and return ranked snippets.""" -class ToolResourceResolver(Protocol): +class ToolCatalog(Protocol): """Port for resolving tool metadata/configuration.""" async def fetch_tool_resource(self, tool_id: str) -> Optional[Dict[str, Any]]: """Fetch tool resource configuration.""" -class BackendGateway( - AssistantConfigProvider, - HistoryWriter, - KnowledgeSearcher, - ToolResourceResolver, +class ControlPlaneGateway( + AssistantRuntimeConfigProvider, + ConversationHistoryStore, + KnowledgeRetriever, + ToolCatalog, Protocol, ): - """Composite backend gateway interface used by engine services.""" - + """Composite control-plane gateway used by engine services.""" diff --git a/engine/core/ports/llm.py b/engine/core/ports/llm.py new file mode 100644 index 0000000..ca515ac --- /dev/null +++ b/engine/core/ports/llm.py @@ -0,0 +1,67 @@ +"""LLM extension port contracts.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, AsyncIterator, Awaitable, Callable, Dict, List, Optional, Protocol + +from services.base import LLMMessage, LLMStreamEvent + +KnowledgeRetrieverFn = Callable[..., Awaitable[List[Dict[str, Any]]]] + + +@dataclass(frozen=True) +class LLMServiceSpec: + """Resolved runtime configuration for LLM service creation.""" + + provider: str + model: str + api_key: Optional[str] = None + base_url: Optional[str] = None + system_prompt: Optional[str] = None + temperature: float = 0.7 + knowledge_config: Dict[str, Any] = field(default_factory=dict) + knowledge_searcher: Optional[KnowledgeRetrieverFn] = None + + +class LLMPort(Protocol): + """Port for LLM providers.""" + + async def connect(self) -> None: + """Establish connection to LLM provider.""" + + async def disconnect(self) -> None: + """Release LLM resources.""" + + async def generate( + self, + messages: List[LLMMessage], + temperature: float = 0.7, + max_tokens: Optional[int] = None, + ) -> str: + """Generate a complete assistant response.""" + + async def generate_stream( + self, + messages: List[LLMMessage], + temperature: float = 0.7, + max_tokens: Optional[int] = None, + ) -> AsyncIterator[LLMStreamEvent]: + """Generate streaming assistant response events.""" + + +class LLMCancellable(Protocol): + """Optional extension for interrupting in-flight LLM generation.""" + + def cancel(self) -> None: + """Cancel an in-flight generation request.""" + + +class LLMRuntimeConfigurable(Protocol): + """Optional extension for runtime config updates.""" + + def set_knowledge_config(self, config: Optional[Dict[str, Any]]) -> None: + """Apply runtime knowledge retrieval settings.""" + + def set_tool_schemas(self, schemas: Optional[List[Dict[str, Any]]]) -> None: + """Apply runtime tool schemas used for tool calling.""" diff --git a/engine/core/ports/service_factory.py b/engine/core/ports/service_factory.py new file mode 100644 index 0000000..d1d8476 --- /dev/null +++ b/engine/core/ports/service_factory.py @@ -0,0 +1,22 @@ +"""Factory port for creating runtime ASR/LLM/TTS services.""" + +from __future__ import annotations + +from typing import Protocol + +from core.ports.asr import ASRPort, ASRServiceSpec +from core.ports.llm import LLMPort, LLMServiceSpec +from core.ports.tts import TTSPort, TTSServiceSpec + + +class RealtimeServiceFactory(Protocol): + """Port for provider-specific service construction.""" + + def create_llm_service(self, spec: LLMServiceSpec) -> LLMPort: + """Create an LLM service instance from a resolved spec.""" + + def create_tts_service(self, spec: TTSServiceSpec) -> TTSPort: + """Create a TTS service instance from a resolved spec.""" + + def create_asr_service(self, spec: ASRServiceSpec) -> ASRPort: + """Create an ASR service instance from a resolved spec.""" diff --git a/engine/core/ports/tts.py b/engine/core/ports/tts.py new file mode 100644 index 0000000..0693cdb --- /dev/null +++ b/engine/core/ports/tts.py @@ -0,0 +1,41 @@ +"""TTS extension port contracts.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import AsyncIterator, Optional, Protocol + +from services.base import TTSChunk + + +@dataclass(frozen=True) +class TTSServiceSpec: + """Resolved runtime configuration for TTS service creation.""" + + provider: str + voice: str + sample_rate: int + speed: float = 1.0 + api_key: Optional[str] = None + api_url: Optional[str] = None + model: Optional[str] = None + mode: str = "commit" + + +class TTSPort(Protocol): + """Port for speech synthesis providers.""" + + async def connect(self) -> None: + """Establish connection to TTS provider.""" + + async def disconnect(self) -> None: + """Release TTS resources.""" + + async def synthesize(self, text: str) -> bytes: + """Synthesize complete PCM payload for text.""" + + async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]: + """Stream synthesized PCM chunks for text.""" + + async def cancel(self) -> None: + """Cancel an in-flight synthesis request.""" diff --git a/engine/core/session.py b/engine/core/session.py index bc8bab8..eff77a6 100644 --- a/engine/core/session.py +++ b/engine/core/session.py @@ -11,6 +11,13 @@ from loguru import logger from app.backend_adapters import build_backend_adapter_from_settings from core.transports import BaseTransport +from core.ports import ( + AssistantRuntimeConfigProvider, + ControlPlaneGateway, + ConversationHistoryStore, + KnowledgeRetriever, + ToolCatalog, +) from core.duplex_pipeline import DuplexPipeline from core.conversation import ConversationTurn from core.history_bridge import SessionHistoryBridge @@ -97,7 +104,11 @@ class Session: session_id: str, transport: BaseTransport, use_duplex: bool = None, - backend_gateway: Optional[Any] = None, + control_plane_gateway: Optional[ControlPlaneGateway] = None, + runtime_config_provider: Optional[AssistantRuntimeConfigProvider] = None, + history_store: Optional[ConversationHistoryStore] = None, + knowledge_retriever: Optional[KnowledgeRetriever] = None, + tool_catalog: Optional[ToolCatalog] = None, assistant_id: Optional[str] = None, ): """ @@ -107,15 +118,24 @@ class Session: session_id: Unique session identifier transport: Transport instance for communication use_duplex: Whether to use duplex pipeline (defaults to settings.duplex_enabled) + control_plane_gateway: Optional composite control-plane dependency + runtime_config_provider: Optional assistant runtime config provider + history_store: Optional conversation history store + knowledge_retriever: Optional knowledge retrieval dependency + tool_catalog: Optional tool resource catalog """ self.id = session_id self.transport = transport self.use_duplex = use_duplex if use_duplex is not None else settings.duplex_enabled self.audio_frame_bytes = self._compute_audio_frame_bytes() self._assistant_id = str(assistant_id or "").strip() or None - self._backend_gateway = backend_gateway or build_backend_adapter_from_settings() + self._control_plane_gateway = control_plane_gateway or build_backend_adapter_from_settings() + self._runtime_config_provider = runtime_config_provider or self._control_plane_gateway + self._history_store = history_store or self._control_plane_gateway + self._knowledge_retriever = knowledge_retriever or self._control_plane_gateway + self._tool_catalog = tool_catalog or self._control_plane_gateway self._history_bridge = SessionHistoryBridge( - history_writer=self._backend_gateway, + history_writer=self._history_store, enabled=settings.history_enabled, queue_max_size=settings.history_queue_max_size, retry_max_attempts=settings.history_retry_max_attempts, @@ -128,8 +148,8 @@ class Session: session_id=session_id, system_prompt=settings.duplex_system_prompt, greeting=settings.duplex_greeting, - knowledge_searcher=getattr(self._backend_gateway, "search_knowledge_context", None), - tool_resource_resolver=getattr(self._backend_gateway, "fetch_tool_resource", None), + knowledge_searcher=getattr(self._knowledge_retriever, "search_knowledge_context", None), + tool_resource_resolver=getattr(self._tool_catalog, "fetch_tool_resource", None), ) # Session state @@ -935,18 +955,18 @@ class Session: self, assistant_id: str, ) -> tuple[Dict[str, Any], Optional[Dict[str, str]]]: - """Load trusted runtime metadata from backend assistant config.""" + """Load trusted runtime metadata from control-plane assistant config.""" if not assistant_id: return {}, { "code": "protocol.assistant_id_required", "message": "Missing required query parameter assistant_id", } - provider = getattr(self._backend_gateway, "fetch_assistant_config", None) + provider = getattr(self._runtime_config_provider, "fetch_assistant_config", None) if not callable(provider): return {}, { "code": "assistant.config_unavailable", - "message": "Assistant config backend unavailable", + "message": "Assistant config control plane unavailable", } payload = await provider(str(assistant_id).strip()) diff --git a/engine/docs/backend_integration.md b/engine/docs/backend_integration.md index e8165fd..bcd44fc 100644 --- a/engine/docs/backend_integration.md +++ b/engine/docs/backend_integration.md @@ -27,9 +27,8 @@ Assistant config source behavior: ## Architecture -- Ports: `core/ports/backend.py` +- Ports: `core/ports/control_plane.py` - Adapters: `app/backend_adapters.py` -- Compatibility wrappers: `app/backend_client.py` `Session` and `DuplexPipeline` receive backend capabilities via injected adapter methods instead of hard-coding backend client imports. diff --git a/engine/docs/extension_ports.md b/engine/docs/extension_ports.md new file mode 100644 index 0000000..47f596a --- /dev/null +++ b/engine/docs/extension_ports.md @@ -0,0 +1,47 @@ +# Engine Extension Ports (Draft) + +This document defines the draft port set used to keep core runtime extensible. + +## Port Modules + +- `core/ports/control_plane.py` + - `AssistantRuntimeConfigProvider` + - `ConversationHistoryStore` + - `KnowledgeRetriever` + - `ToolCatalog` + - `ControlPlaneGateway` +- `core/ports/llm.py` + - `LLMServiceSpec` + - `LLMPort` + - optional extensions: `LLMCancellable`, `LLMRuntimeConfigurable` +- `core/ports/tts.py` + - `TTSServiceSpec` + - `TTSPort` +- `core/ports/asr.py` + - `ASRServiceSpec` + - `ASRPort` + - optional extensions: `ASRInterimControl`, `ASRBufferControl` +- `core/ports/service_factory.py` + - `RealtimeServiceFactory` + +## Adapter Layer + +- `app/service_factory.py` provides `DefaultRealtimeServiceFactory`. +- It maps resolved provider specs to concrete adapters. +- Core orchestration (`core/duplex_pipeline.py`) depends on the factory port/specs, not concrete provider classes. + +## Provider Behavior (Current) + +- LLM: + - supported providers: `openai`, `openai_compatible`, `openai-compatible`, `siliconflow` + - fallback: `MockLLMService` +- TTS: + - supported providers: `dashscope`, `openai_compatible`, `openai-compatible`, `siliconflow` + - fallback: `MockTTSService` +- ASR: + - supported providers: `openai_compatible`, `openai-compatible`, `siliconflow` + - fallback: `BufferedASRService` + +## Notes + +- This is a draft contract set; follow-up work can add explicit capability negotiation and contract-version fields. diff --git a/engine/docs/high_level_architecture.md b/engine/docs/high_level_architecture.md new file mode 100644 index 0000000..91e6845 --- /dev/null +++ b/engine/docs/high_level_architecture.md @@ -0,0 +1,129 @@ +# Engine High-Level Architecture + +This document describes the runtime architecture of `engine` for realtime voice/text assistant interactions. + +## Goals + +- Low-latency duplex interaction (user speaks while assistant can respond) +- Clear separation between transport, orchestration, and model/service integrations +- Backend-optional runtime (works with or without external backend) +- Protocol-first interoperability through strict WS v1 control messages + +## Top-Level Components + +```mermaid +flowchart LR + C[Client\nWeb / Mobile / Device] <-- WS v1 + PCM --> A[FastAPI App\napp/main.py] + A --> S[Session\ncore/session.py] + S --> D[Duplex Pipeline\ncore/duplex_pipeline.py] + + D --> P[Processors\nVAD / EOU / Tracks] + D --> R[Workflow Runner\ncore/workflow_runner.py] + D --> E[Event Bus + Models\ncore/events.py + models/*] + + R --> SV[Service Layer\nservices/asr.py\nservices/llm.py\nservices/tts.py] + R --> TE[Tool Executor\ncore/tool_executor.py] + + S --> HB[History Bridge\ncore/history_bridge.py] + S --> BA[Control Plane Port\ncore/ports/control_plane.py] + BA --> AD[Adapters\napp/backend_adapters.py] + + AD --> B[(External Backend API\noptional)] + SV --> M[(ASR/LLM/TTS Providers)] +``` + +## Request Lifecycle (Simplified) + +1. Client connects to `/ws?assistant_id=` and sends `session.start`. +2. App creates a `Session` with resolved assistant config (backend or local YAML). +3. Binary PCM frames enter the duplex pipeline. +4. `VAD`/`EOU` processors detect speech segments and trigger ASR finalization. +5. ASR text is routed into workflow + LLM generation. +6. Optional tool calls are executed (server-side or client-side result return). +7. LLM output streams as text deltas; TTS produces audio chunks for playback. +8. Session emits structured events (`transcript.*`, `assistant.*`, `output.audio.*`, `error`). +9. History bridge persists conversation data asynchronously. +10. On `session.stop` (or disconnect), session finalizes and drains pending writes. + +## Layering and Responsibilities + +### 1) Transport / API Layer + +- Entry point: `app/main.py` +- Responsibilities: + - WebSocket lifecycle management + - WS v1 message validation and order guarantees + - Session creation and teardown + - Converting raw WS frames into internal events + +### 2) Session + Orchestration Layer + +- Core: `core/session.py`, `core/duplex_pipeline.py`, `core/conversation.py` +- Responsibilities: + - Per-session state machine + - Turn boundaries and interruption/cancel handling + - Event sequencing (`seq`) and envelope consistency + - Bridging input/output tracks (`audio_in`, `audio_out`, `control`) + +### 3) Processing Layer + +- Modules: `processors/vad.py`, `processors/eou.py`, `processors/tracks.py` +- Responsibilities: + - Speech activity detection + - End-of-utterance decisioning + - Track-oriented routing and timing-sensitive pre/post processing + +### 4) Workflow + Tooling Layer + +- Modules: `core/workflow_runner.py`, `core/tool_executor.py` +- Responsibilities: + - Assistant workflow execution + - Tool call planning/execution and timeout handling + - Tool result normalization into protocol events + +### 5) Service Integration Layer + +- Modules: `services/*` +- Responsibilities: + - Abstracting ASR/LLM/TTS provider differences + - Streaming token/audio adaptation + - Provider-specific adapters (OpenAI-compatible, DashScope, SiliconFlow, etc.) + +### 6) Backend Integration Layer (Optional) + +- Port: `core/ports/control_plane.py` +- Adapters: `app/backend_adapters.py` +- Responsibilities: + - Fetching assistant runtime config + - Persisting call/session metadata and history + - Supporting `BACKEND_MODE=auto|http|disabled` + +### 7) Persistence / Reliability Layer + +- Module: `core/history_bridge.py` +- Responsibilities: + - Non-blocking queue-based history writes + - Retry with backoff on backend failures + - Best-effort drain on session finalize + +## Key Design Principles + +- Dependency inversion for backend: session/pipeline depend on port interfaces, not concrete clients. +- Streaming-first: text/audio are emitted incrementally to minimize perceived latency. +- Fail-soft behavior: backend/history failures should not block realtime interaction paths. +- Protocol strictness: WS v1 rejects malformed/out-of-order control traffic early. +- Explicit event model: all client-observable state changes are represented as typed events. + +## Configuration Boundaries + +- Runtime environment settings live in `app/config.py`. +- Assistant-specific behavior is loaded by `assistant_id`: + - backend mode: from backend API + - engine-only mode: local `engine/config/agents/.yaml` +- Client-provided `metadata.overrides` and `dynamicVariables` can alter runtime behavior within protocol constraints. + +## Related Docs + +- WS protocol: `engine/docs/ws_v1_schema.md` +- Backend integration details: `engine/docs/backend_integration.md` +- Duplex interaction diagram: `engine/docs/duplex_interaction.svg` diff --git a/engine/requirements.txt b/engine/requirements.txt index a32b7d2..2818030 100644 --- a/engine/requirements.txt +++ b/engine/requirements.txt @@ -29,10 +29,6 @@ aiohttp>=3.9.1 openai>=1.0.0 dashscope>=1.25.11 -# AI Services - TTS -edge-tts>=6.1.0 -pydub>=0.25.0 # For audio format conversion - # Microphone client dependencies sounddevice>=0.4.6 soundfile>=0.12.1 diff --git a/engine/services/__init__.py b/engine/services/__init__.py index 0e46834..f64ef05 100644 --- a/engine/services/__init__.py +++ b/engine/services/__init__.py @@ -14,7 +14,7 @@ from services.base import ( ) from services.llm import OpenAILLMService, MockLLMService from services.dashscope_tts import DashScopeTTSService -from services.tts import EdgeTTSService, MockTTSService +from services.tts import MockTTSService from services.asr import BufferedASRService, MockASRService from services.openai_compatible_asr import OpenAICompatibleASRService, SiliconFlowASRService from services.openai_compatible_tts import OpenAICompatibleTTSService, SiliconFlowTTSService @@ -35,7 +35,6 @@ __all__ = [ "MockLLMService", # TTS "DashScopeTTSService", - "EdgeTTSService", "MockTTSService", # ASR "BufferedASRService", diff --git a/engine/services/tts.py b/engine/services/tts.py index e838f08..0ed629d 100644 --- a/engine/services/tts.py +++ b/engine/services/tts.py @@ -1,271 +1,49 @@ -"""TTS (Text-to-Speech) Service implementations. +"""TTS service implementations used by the engine runtime.""" -Provides multiple TTS backend options including edge-tts (free) -and placeholder for cloud services. -""" - -import os -import io import asyncio -import struct -from typing import AsyncIterator, Optional +from typing import AsyncIterator + from loguru import logger from services.base import BaseTTSService, TTSChunk, ServiceState -# Try to import edge-tts -try: - import edge_tts - EDGE_TTS_AVAILABLE = True -except ImportError: - EDGE_TTS_AVAILABLE = False - logger.warning("edge-tts not available - EdgeTTS service will be disabled") - - -class EdgeTTSService(BaseTTSService): - """ - Microsoft Edge TTS service. - - Uses edge-tts library for free, high-quality speech synthesis. - Supports streaming for low-latency playback. - """ - - # Voice mapping for common languages - VOICE_MAP = { - "en": "en-US-JennyNeural", - "en-US": "en-US-JennyNeural", - "en-GB": "en-GB-SoniaNeural", - "zh": "zh-CN-XiaoxiaoNeural", - "zh-CN": "zh-CN-XiaoxiaoNeural", - "zh-TW": "zh-TW-HsiaoChenNeural", - "ja": "ja-JP-NanamiNeural", - "ko": "ko-KR-SunHiNeural", - "fr": "fr-FR-DeniseNeural", - "de": "de-DE-KatjaNeural", - "es": "es-ES-ElviraNeural", - } - - def __init__( - self, - voice: str = "en-US-JennyNeural", - sample_rate: int = 16000, - speed: float = 1.0 - ): - """ - Initialize Edge TTS service. - - Args: - voice: Voice name (e.g., "en-US-JennyNeural") or language code (e.g., "en") - sample_rate: Target sample rate (will be resampled) - speed: Speech speed multiplier - """ - # Resolve voice from language code if needed - if voice in self.VOICE_MAP: - voice = self.VOICE_MAP[voice] - - super().__init__(voice=voice, sample_rate=sample_rate, speed=speed) - self._cancel_event = asyncio.Event() - - async def connect(self) -> None: - """Edge TTS doesn't require explicit connection.""" - if not EDGE_TTS_AVAILABLE: - raise RuntimeError("edge-tts package not installed") - self.state = ServiceState.CONNECTED - logger.info(f"Edge TTS service ready: voice={self.voice}") - - async def disconnect(self) -> None: - """Edge TTS doesn't require explicit disconnection.""" - self.state = ServiceState.DISCONNECTED - logger.info("Edge TTS service disconnected") - - def _get_rate_string(self) -> str: - """Convert speed to rate string for edge-tts.""" - # edge-tts uses percentage format: "+0%", "-10%", "+20%" - percentage = int((self.speed - 1.0) * 100) - if percentage >= 0: - return f"+{percentage}%" - return f"{percentage}%" - - async def synthesize(self, text: str) -> bytes: - """ - Synthesize complete audio for text. - - Args: - text: Text to synthesize - - Returns: - PCM audio data (16-bit, mono, 16kHz) - """ - if not EDGE_TTS_AVAILABLE: - raise RuntimeError("edge-tts not available") - - # Collect all chunks - audio_data = b"" - async for chunk in self.synthesize_stream(text): - audio_data += chunk.audio - - return audio_data - - async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]: - """ - Synthesize audio in streaming mode. - - Args: - text: Text to synthesize - - Yields: - TTSChunk objects with PCM audio - """ - if not EDGE_TTS_AVAILABLE: - raise RuntimeError("edge-tts not available") - - self._cancel_event.clear() - - try: - communicate = edge_tts.Communicate( - text, - voice=self.voice, - rate=self._get_rate_string() - ) - - # edge-tts outputs MP3, we need to decode to PCM - # For now, collect MP3 chunks and yield after conversion - mp3_data = b"" - - async for chunk in communicate.stream(): - # Check for cancellation - if self._cancel_event.is_set(): - logger.info("TTS synthesis cancelled") - return - - if chunk["type"] == "audio": - mp3_data += chunk["data"] - - # Convert MP3 to PCM - if mp3_data: - pcm_data = await self._convert_mp3_to_pcm(mp3_data) - if pcm_data: - # Yield in chunks for streaming playback - chunk_size = self.sample_rate * 2 // 10 # 100ms chunks - for i in range(0, len(pcm_data), chunk_size): - if self._cancel_event.is_set(): - return - - chunk_data = pcm_data[i:i + chunk_size] - yield TTSChunk( - audio=chunk_data, - sample_rate=self.sample_rate, - is_final=(i + chunk_size >= len(pcm_data)) - ) - - except asyncio.CancelledError: - logger.info("TTS synthesis cancelled via asyncio") - raise - except Exception as e: - logger.error(f"TTS synthesis error: {e}") - raise - - async def _convert_mp3_to_pcm(self, mp3_data: bytes) -> bytes: - """ - Convert MP3 audio to PCM. - - Uses pydub or ffmpeg for conversion. - """ - try: - # Try using pydub (requires ffmpeg) - from pydub import AudioSegment - - # Load MP3 from bytes - audio = AudioSegment.from_mp3(io.BytesIO(mp3_data)) - - # Convert to target format - audio = audio.set_frame_rate(self.sample_rate) - audio = audio.set_channels(1) - audio = audio.set_sample_width(2) # 16-bit - - # Export as raw PCM - return audio.raw_data - - except ImportError: - logger.warning("pydub not available, trying fallback") - # Fallback: Use subprocess to call ffmpeg directly - return await self._ffmpeg_convert(mp3_data) - except Exception as e: - logger.error(f"Audio conversion error: {e}") - return b"" - - async def _ffmpeg_convert(self, mp3_data: bytes) -> bytes: - """Convert MP3 to PCM using ffmpeg subprocess.""" - try: - process = await asyncio.create_subprocess_exec( - "ffmpeg", - "-i", "pipe:0", - "-f", "s16le", - "-acodec", "pcm_s16le", - "-ar", str(self.sample_rate), - "-ac", "1", - "pipe:1", - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.DEVNULL - ) - - stdout, _ = await process.communicate(input=mp3_data) - return stdout - - except Exception as e: - logger.error(f"ffmpeg conversion error: {e}") - return b"" - - async def cancel(self) -> None: - """Cancel ongoing synthesis.""" - self._cancel_event.set() - class MockTTSService(BaseTTSService): - """ - Mock TTS service for testing without actual synthesis. - - Generates silence or simple tones. - """ - + """Mock TTS service for tests and no-provider fallback.""" + def __init__( self, voice: str = "mock", sample_rate: int = 16000, - speed: float = 1.0 + speed: float = 1.0, ): super().__init__(voice=voice, sample_rate=sample_rate, speed=speed) - + async def connect(self) -> None: self.state = ServiceState.CONNECTED logger.info("Mock TTS service connected") - + async def disconnect(self) -> None: self.state = ServiceState.DISCONNECTED logger.info("Mock TTS service disconnected") - + async def synthesize(self, text: str) -> bytes: """Generate silence based on text length.""" - # Approximate: 100ms per word word_count = len(text.split()) duration_ms = word_count * 100 samples = int(self.sample_rate * duration_ms / 1000) - - # Generate silence (zeros) - return bytes(samples * 2) # 16-bit = 2 bytes per sample - + return bytes(samples * 2) + async def synthesize_stream(self, text: str) -> AsyncIterator[TTSChunk]: - """Generate silence chunks.""" + """Generate silence chunks to emulate streaming synthesis.""" audio = await self.synthesize(text) - - # Yield in 100ms chunks + chunk_size = self.sample_rate * 2 // 10 for i in range(0, len(audio), chunk_size): - chunk_data = audio[i:i + chunk_size] + chunk_data = audio[i : i + chunk_size] yield TTSChunk( audio=chunk_data, sample_rate=self.sample_rate, - is_final=(i + chunk_size >= len(audio)) + is_final=(i + chunk_size >= len(audio)), ) - await asyncio.sleep(0.05) # Simulate processing time + await asyncio.sleep(0.05) diff --git a/engine/tests/test_ws_protocol_session_start.py b/engine/tests/test_ws_protocol_session_start.py index ac15b16..ad68674 100644 --- a/engine/tests/test_ws_protocol_session_start.py +++ b/engine/tests/test_ws_protocol_session_start.py @@ -139,7 +139,7 @@ async def test_load_server_runtime_metadata_returns_not_found_error(): _ = assistant_id return {"__error_code": "assistant.not_found"} - session._backend_gateway = _Gateway() + session._runtime_config_provider = _Gateway() runtime, error = await session._load_server_runtime_metadata("assistant_demo") assert runtime == {} assert error is not None @@ -155,7 +155,7 @@ async def test_load_server_runtime_metadata_returns_config_unavailable_error(): _ = assistant_id return None - session._backend_gateway = _Gateway() + session._runtime_config_provider = _Gateway() runtime, error = await session._load_server_runtime_metadata("assistant_demo") assert runtime == {} assert error is not None