"""Full duplex audio pipeline for AI voice conversation. This module implements the core duplex pipeline that orchestrates: - VAD (Voice Activity Detection) - EOU (End of Utterance) Detection - ASR (Automatic Speech Recognition) - optional - LLM (Language Model) - TTS (Text-to-Speech) Inspired by pipecat's frame-based architecture and active-call's event-driven design. """ import asyncio import json import time import uuid from typing import Any, Callable, Dict, List, Optional, Tuple import numpy as np from loguru import logger from app.config import settings from core.conversation import ConversationManager, ConversationState from core.events import get_event_bus from core.tool_executor import execute_server_tool from core.transports import BaseTransport from models.ws_v1 import ev from processors.eou import EouDetector from processors.vad import SileroVAD, VADProcessor from services.asr import BufferedASRService from services.base import BaseASRService, BaseLLMService, BaseTTSService, LLMMessage, LLMStreamEvent from services.llm import MockLLMService, OpenAILLMService from services.openai_compatible_asr import OpenAICompatibleASRService from services.openai_compatible_tts import OpenAICompatibleTTSService from services.streaming_text import extract_tts_sentence, has_spoken_content from services.tts import EdgeTTSService, MockTTSService class DuplexPipeline: """ Full duplex audio pipeline for AI voice conversation. Handles bidirectional audio flow with: - User speech detection and transcription - AI response generation - Text-to-speech synthesis - Barge-in (interruption) support Architecture (inspired by pipecat): User Audio → VAD → EOU → [ASR] → LLM → TTS → Audio Out ↓ Barge-in Detection → Interrupt """ _SENTENCE_END_CHARS = frozenset({"。", "!", "?", ".", "!", "?", "\n"}) _SENTENCE_TRAILING_CHARS = frozenset({"。", "!", "?", ".", "!", "?", "…", "~", "~", "\n"}) _SENTENCE_CLOSERS = frozenset({'"', "'", "”", "’", ")", "]", "}", ")", "】", "」", "』", "》"}) _MIN_SPLIT_SPOKEN_CHARS = 6 _TOOL_WAIT_TIMEOUT_SECONDS = 15.0 _SERVER_TOOL_TIMEOUT_SECONDS = 15.0 TRACK_AUDIO_IN = "audio_in" TRACK_AUDIO_OUT = "audio_out" TRACK_CONTROL = "control" _PCM_FRAME_BYTES = 640 # 16k mono pcm_s16le, 20ms _ASR_DELTA_THROTTLE_MS = 300 _LLM_DELTA_THROTTLE_MS = 80 _DEFAULT_TOOL_SCHEMAS: Dict[str, Dict[str, Any]] = { "current_time": { "name": "current_time", "description": "Get current local time", "parameters": { "type": "object", "properties": {}, "required": [], }, }, } def __init__( self, transport: BaseTransport, session_id: str, llm_service: Optional[BaseLLMService] = None, tts_service: Optional[BaseTTSService] = None, asr_service: Optional[BaseASRService] = None, system_prompt: Optional[str] = None, greeting: Optional[str] = None ): """ Initialize duplex pipeline. Args: transport: Transport for sending audio/events session_id: Session identifier llm_service: LLM service (defaults to OpenAI) tts_service: TTS service (defaults to EdgeTTS) asr_service: ASR service (optional) system_prompt: System prompt for LLM greeting: Optional greeting to speak on start """ self.transport = transport self.session_id = session_id self.event_bus = get_event_bus() self.track_audio_in = self.TRACK_AUDIO_IN self.track_audio_out = self.TRACK_AUDIO_OUT self.track_control = self.TRACK_CONTROL # Initialize VAD self.vad_model = SileroVAD( model_path=settings.vad_model_path, sample_rate=settings.sample_rate ) self.vad_processor = VADProcessor( vad_model=self.vad_model, threshold=settings.vad_threshold ) # Initialize EOU detector self.eou_detector = EouDetector( silence_threshold_ms=settings.vad_eou_threshold_ms, min_speech_duration_ms=settings.vad_min_speech_duration_ms ) # Initialize services self.llm_service = llm_service self.tts_service = tts_service self.asr_service = asr_service # Will be initialized in start() # Track last sent transcript to avoid duplicates self._last_sent_transcript = "" self._pending_transcript_delta: str = "" self._last_transcript_delta_emit_ms: float = 0.0 # Conversation manager self.conversation = ConversationManager( system_prompt=system_prompt, greeting=greeting ) # State self._running = True self._is_bot_speaking = False self._current_turn_task: Optional[asyncio.Task] = None self._audio_buffer: bytes = b"" max_buffer_seconds = settings.max_audio_buffer_seconds self._max_audio_buffer_bytes = int(settings.sample_rate * 2 * max_buffer_seconds) self._asr_start_min_speech_ms: int = settings.asr_start_min_speech_ms self._asr_capture_active: bool = False self._pending_speech_audio: bytes = b"" # Keep a short rolling pre-speech window so VAD transition latency # does not clip the first phoneme/character sent to ASR. pre_speech_ms = settings.asr_pre_speech_ms self._asr_pre_speech_bytes = int(settings.sample_rate * 2 * (pre_speech_ms / 1000.0)) self._pre_speech_buffer: bytes = b"" # Add a tiny trailing silence tail before final ASR to avoid # clipping the last phoneme at utterance boundaries. asr_final_tail_ms = settings.asr_final_tail_ms self._asr_final_tail_bytes = int(settings.sample_rate * 2 * (asr_final_tail_ms / 1000.0)) self._last_vad_status: str = "Silence" self._process_lock = asyncio.Lock() # Priority outbound dispatcher (lower value = higher priority). self._outbound_q: asyncio.PriorityQueue[Tuple[int, int, str, Any]] = asyncio.PriorityQueue() self._outbound_seq = 0 self._outbound_task: Optional[asyncio.Task] = None self._drop_outbound_audio = False self._audio_out_frame_buffer: bytes = b"" # Interruption handling self._interrupt_event = asyncio.Event() # Latency tracking - TTFB (Time to First Byte) self._turn_start_time: Optional[float] = None self._first_audio_sent: bool = False # Barge-in filtering - require minimum speech duration to interrupt self._barge_in_speech_start_time: Optional[float] = None self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms self._barge_in_silence_tolerance_ms: int = settings.barge_in_silence_tolerance_ms self._barge_in_speech_frames: int = 0 # Count speech frames self._barge_in_silence_frames: int = 0 # Count silence frames during potential barge-in # Runtime overrides injected from session.start metadata self._runtime_llm: Dict[str, Any] = {} self._runtime_asr: Dict[str, Any] = {} self._runtime_tts: Dict[str, Any] = {} self._runtime_output: Dict[str, Any] = {} self._runtime_system_prompt: Optional[str] = None self._runtime_first_turn_mode: str = "bot_first" self._runtime_greeting: Optional[str] = None self._runtime_generated_opener_enabled: Optional[bool] = None self._runtime_barge_in_enabled: Optional[bool] = None self._runtime_barge_in_min_duration_ms: Optional[int] = None self._runtime_knowledge: Dict[str, Any] = {} self._runtime_knowledge_base_id: Optional[str] = None self._runtime_tools: List[Any] = [] self._runtime_tool_executor: Dict[str, str] = {} self._pending_tool_waiters: Dict[str, asyncio.Future] = {} self._early_tool_results: Dict[str, Dict[str, Any]] = {} self._completed_tool_call_ids: set[str] = set() self._pending_client_tool_call_ids: set[str] = set() self._next_seq: Optional[Callable[[], int]] = None self._local_seq: int = 0 # Cross-service correlation IDs self._turn_count: int = 0 self._response_count: int = 0 self._tts_count: int = 0 self._utterance_count: int = 0 self._current_turn_id: Optional[str] = None self._current_utterance_id: Optional[str] = None self._current_response_id: Optional[str] = None self._current_tts_id: Optional[str] = None self._pending_llm_delta: str = "" self._last_llm_delta_emit_ms: float = 0.0 logger.info(f"DuplexPipeline initialized for session {session_id}") def set_event_sequence_provider(self, provider: Callable[[], int]) -> None: """Use session-scoped monotonic sequence provider for envelope events.""" self._next_seq = provider def apply_runtime_overrides(self, metadata: Optional[Dict[str, Any]]) -> None: """ Apply runtime overrides from WS session.start metadata. Expected metadata shape: { "systemPrompt": "...", "greeting": "...", "services": { "llm": {...}, "asr": {...}, "tts": {...} } } """ if not metadata: return if "systemPrompt" in metadata: self._runtime_system_prompt = str(metadata.get("systemPrompt") or "") if self._runtime_system_prompt: self.conversation.system_prompt = self._runtime_system_prompt if "firstTurnMode" in metadata: raw_mode = str(metadata.get("firstTurnMode") or "").strip().lower() self._runtime_first_turn_mode = "user_first" if raw_mode == "user_first" else "bot_first" if "greeting" in metadata: greeting_payload = metadata.get("greeting") if isinstance(greeting_payload, dict): self._runtime_greeting = str(greeting_payload.get("text") or "") generated_flag = self._coerce_bool(greeting_payload.get("generated")) if generated_flag is not None: self._runtime_generated_opener_enabled = generated_flag else: self._runtime_greeting = str(greeting_payload or "") self.conversation.greeting = self._runtime_greeting or None generated_opener_flag = self._coerce_bool(metadata.get("generatedOpenerEnabled")) if generated_opener_flag is not None: self._runtime_generated_opener_enabled = generated_opener_flag services = metadata.get("services") or {} if isinstance(services, dict): if isinstance(services.get("llm"), dict): self._runtime_llm = services["llm"] if isinstance(services.get("asr"), dict): self._runtime_asr = services["asr"] if isinstance(services.get("tts"), dict): self._runtime_tts = services["tts"] output = metadata.get("output") or {} if isinstance(output, dict): self._runtime_output = output barge_in = metadata.get("bargeIn") if isinstance(barge_in, dict): barge_in_enabled = self._coerce_bool(barge_in.get("enabled")) if barge_in_enabled is not None: self._runtime_barge_in_enabled = barge_in_enabled min_duration = barge_in.get("minDurationMs") if isinstance(min_duration, (int, float, str)): try: self._runtime_barge_in_min_duration_ms = max(0, int(min_duration)) except (TypeError, ValueError): self._runtime_barge_in_min_duration_ms = None knowledge_base_id = metadata.get("knowledgeBaseId") if knowledge_base_id is not None: kb_id = str(knowledge_base_id).strip() self._runtime_knowledge_base_id = kb_id or None knowledge = metadata.get("knowledge") if isinstance(knowledge, dict): self._runtime_knowledge = knowledge kb_id = str(knowledge.get("kbId") or knowledge.get("knowledgeBaseId") or "").strip() if kb_id: self._runtime_knowledge_base_id = kb_id tools_payload = metadata.get("tools") if isinstance(tools_payload, list): self._runtime_tools = tools_payload self._runtime_tool_executor = self._resolved_tool_executor_map() elif "tools" in metadata: self._runtime_tools = [] self._runtime_tool_executor = {} if self.llm_service and hasattr(self.llm_service, "set_knowledge_config"): self.llm_service.set_knowledge_config(self._resolved_knowledge_config()) if self.llm_service and hasattr(self.llm_service, "set_tool_schemas"): self.llm_service.set_tool_schemas(self._resolved_tool_schemas()) def resolved_runtime_config(self) -> Dict[str, Any]: """Return current effective runtime configuration without secrets.""" llm_provider = str(self._runtime_llm.get("provider") or settings.llm_provider).lower() llm_base_url = ( self._runtime_llm.get("baseUrl") or settings.llm_api_url or self._default_llm_base_url(llm_provider) ) tts_provider = str(self._runtime_tts.get("provider") or settings.tts_provider).lower() asr_provider = str(self._runtime_asr.get("provider") or settings.asr_provider).lower() output_mode = str(self._runtime_output.get("mode") or "").strip().lower() if not output_mode: output_mode = "audio" if self._tts_output_enabled() else "text" return { "output": {"mode": output_mode}, "services": { "llm": { "provider": llm_provider, "model": str(self._runtime_llm.get("model") or settings.llm_model), "baseUrl": llm_base_url, }, "asr": { "provider": asr_provider, "model": str(self._runtime_asr.get("model") or settings.asr_model or ""), "interimIntervalMs": int(self._runtime_asr.get("interimIntervalMs") or settings.asr_interim_interval_ms), "minAudioMs": int(self._runtime_asr.get("minAudioMs") or settings.asr_min_audio_ms), }, "tts": { "enabled": self._tts_output_enabled(), "provider": tts_provider, "model": str(self._runtime_tts.get("model") or settings.tts_model or ""), "voice": str(self._runtime_tts.get("voice") or settings.tts_voice), "speed": float(self._runtime_tts.get("speed") or settings.tts_speed), }, }, "tools": { "allowlist": sorted(self._runtime_tool_executor.keys()), }, "tracks": { "audio_in": self.track_audio_in, "audio_out": self.track_audio_out, "control": self.track_control, }, } def _next_event_seq(self) -> int: if self._next_seq: return self._next_seq() self._local_seq += 1 return self._local_seq def _event_source(self, event_type: str) -> str: if event_type.startswith("transcript.") or event_type.startswith("input.speech_"): return "asr" if event_type.startswith("assistant.response."): return "llm" if event_type.startswith("assistant.tool_"): return "tool" if event_type.startswith("output.audio.") or event_type == "metrics.ttfb": return "tts" return "system" def _new_id(self, prefix: str, counter: int) -> str: return f"{prefix}_{counter}_{uuid.uuid4().hex[:8]}" def _start_turn(self) -> str: self._turn_count += 1 self._current_turn_id = self._new_id("turn", self._turn_count) self._current_utterance_id = None self._current_response_id = None self._current_tts_id = None return self._current_turn_id def _start_response(self) -> str: self._response_count += 1 self._current_response_id = self._new_id("resp", self._response_count) self._current_tts_id = None return self._current_response_id def _start_tts(self) -> str: self._tts_count += 1 self._current_tts_id = self._new_id("tts", self._tts_count) return self._current_tts_id def _finalize_utterance(self) -> str: if self._current_utterance_id: return self._current_utterance_id self._utterance_count += 1 self._current_utterance_id = self._new_id("utt", self._utterance_count) if not self._current_turn_id: self._start_turn() return self._current_utterance_id def _envelope_event(self, event: Dict[str, Any]) -> Dict[str, Any]: event_type = str(event.get("type") or "") source = str(event.get("source") or self._event_source(event_type)) track_id = event.get("trackId") if not track_id: if source == "asr": track_id = self.track_audio_in elif source in {"llm", "tts", "tool"}: track_id = self.track_audio_out else: track_id = self.track_control data = event.get("data") if not isinstance(data, dict): data = {} if self._current_turn_id: data.setdefault("turn_id", self._current_turn_id) if self._current_utterance_id: data.setdefault("utterance_id", self._current_utterance_id) if self._current_response_id: data.setdefault("response_id", self._current_response_id) if self._current_tts_id: data.setdefault("tts_id", self._current_tts_id) for k, v in event.items(): if k in {"type", "timestamp", "sessionId", "seq", "source", "trackId", "data"}: continue data.setdefault(k, v) event["sessionId"] = self.session_id event["seq"] = self._next_event_seq() event["source"] = source event["trackId"] = track_id event["data"] = data return event @staticmethod def _coerce_bool(value: Any) -> Optional[bool]: if isinstance(value, bool): return value if isinstance(value, (int, float)): return bool(value) if isinstance(value, str): normalized = value.strip().lower() if normalized in {"1", "true", "yes", "on", "enabled"}: return True if normalized in {"0", "false", "no", "off", "disabled"}: return False return None @staticmethod def _is_openai_compatible_provider(provider: Any) -> bool: normalized = str(provider or "").strip().lower() return normalized in {"openai_compatible", "openai-compatible", "siliconflow"} @staticmethod def _is_llm_provider_supported(provider: Any) -> bool: normalized = str(provider or "").strip().lower() return normalized in {"openai", "openai_compatible", "openai-compatible", "siliconflow"} @staticmethod def _default_llm_base_url(provider: Any) -> Optional[str]: normalized = str(provider or "").strip().lower() if normalized == "siliconflow": return "https://api.siliconflow.cn/v1" return None def _tts_output_enabled(self) -> bool: enabled = self._coerce_bool(self._runtime_tts.get("enabled")) if enabled is not None: return enabled output_mode = str(self._runtime_output.get("mode") or "").strip().lower() if output_mode in {"text", "text_only", "text-only"}: return False return True def _generated_opener_enabled(self) -> bool: return self._runtime_generated_opener_enabled is True def _bot_starts_first(self) -> bool: return self._runtime_first_turn_mode != "user_first" def _barge_in_enabled(self) -> bool: if self._runtime_barge_in_enabled is not None: return self._runtime_barge_in_enabled return True def _resolved_barge_in_min_duration_ms(self) -> int: if self._runtime_barge_in_min_duration_ms is not None: return self._runtime_barge_in_min_duration_ms return self._barge_in_min_duration_ms def _barge_in_silence_tolerance_frames(self) -> int: """Convert silence tolerance from ms to frame count using current chunk size.""" chunk_ms = max(1, settings.chunk_size_ms) return max(1, int(np.ceil(self._barge_in_silence_tolerance_ms / chunk_ms))) async def _generate_runtime_greeting(self) -> Optional[str]: if not self.llm_service: return None prompt_hint = (self._runtime_greeting or "").strip() system_context = (self.conversation.system_prompt or self._runtime_system_prompt or "").strip() # Keep context concise to avoid overloading greeting generation. if len(system_context) > 1200: system_context = system_context[:1200] system_prompt = ( "你是语音通话助手的开场白生成器。" "请只输出一句自然、简洁、友好的中文开场白。" "不要使用引号,不要使用 markdown,不要加解释。" ) user_prompt = "请生成一句中文开场白(不超过25个汉字)。" if system_context: user_prompt += f"\n\n以下是该助手的系统提示词,请据此决定语气、角色和边界:\n{system_context}" if prompt_hint: user_prompt += f"\n\n额外风格提示:{prompt_hint}" try: generated = await self.llm_service.generate( [ LLMMessage(role="system", content=system_prompt), LLMMessage(role="user", content=user_prompt), ], temperature=0.7, max_tokens=64, ) except Exception as exc: logger.warning(f"Failed to generate runtime greeting: {exc}") return None text = (generated or "").strip() if not text: return None return text.strip().strip('"').strip("'") async def start(self) -> None: """Start the pipeline and connect services.""" try: # Connect LLM service if not self.llm_service: llm_provider = (self._runtime_llm.get("provider") or settings.llm_provider).lower() llm_api_key = self._runtime_llm.get("apiKey") or settings.llm_api_key llm_base_url = ( self._runtime_llm.get("baseUrl") or settings.llm_api_url or self._default_llm_base_url(llm_provider) ) llm_model = self._runtime_llm.get("model") or settings.llm_model if self._is_llm_provider_supported(llm_provider) and llm_api_key: self.llm_service = OpenAILLMService( api_key=llm_api_key, base_url=llm_base_url, model=llm_model, knowledge_config=self._resolved_knowledge_config(), ) else: logger.warning("LLM provider unsupported or API key missing - using mock LLM") self.llm_service = MockLLMService() if hasattr(self.llm_service, "set_knowledge_config"): self.llm_service.set_knowledge_config(self._resolved_knowledge_config()) if hasattr(self.llm_service, "set_tool_schemas"): self.llm_service.set_tool_schemas(self._resolved_tool_schemas()) await self.llm_service.connect() tts_output_enabled = self._tts_output_enabled() # Connect TTS service only when audio output is enabled. if tts_output_enabled: if not self.tts_service: tts_provider = (self._runtime_tts.get("provider") or settings.tts_provider).lower() tts_api_key = self._runtime_tts.get("apiKey") or settings.tts_api_key tts_api_url = self._runtime_tts.get("baseUrl") or settings.tts_api_url tts_voice = self._runtime_tts.get("voice") or settings.tts_voice tts_model = self._runtime_tts.get("model") or settings.tts_model tts_speed = float(self._runtime_tts.get("speed") or settings.tts_speed) if self._is_openai_compatible_provider(tts_provider) and tts_api_key: self.tts_service = OpenAICompatibleTTSService( api_key=tts_api_key, api_url=tts_api_url, voice=tts_voice, model=tts_model or "FunAudioLLM/CosyVoice2-0.5B", sample_rate=settings.sample_rate, speed=tts_speed ) logger.info(f"Using OpenAI-compatible TTS service (provider={tts_provider})") else: self.tts_service = EdgeTTSService( voice=tts_voice, sample_rate=settings.sample_rate ) logger.info("Using Edge TTS service") try: await self.tts_service.connect() except Exception as e: logger.warning(f"TTS backend unavailable ({e}); falling back to MockTTS") self.tts_service = MockTTSService( sample_rate=settings.sample_rate ) await self.tts_service.connect() else: self.tts_service = None logger.info("TTS output disabled by runtime metadata") # Connect ASR service if not self.asr_service: asr_provider = (self._runtime_asr.get("provider") or settings.asr_provider).lower() asr_api_key = self._runtime_asr.get("apiKey") or settings.asr_api_key asr_api_url = self._runtime_asr.get("baseUrl") or settings.asr_api_url asr_model = self._runtime_asr.get("model") or settings.asr_model asr_interim_interval = int(self._runtime_asr.get("interimIntervalMs") or settings.asr_interim_interval_ms) asr_min_audio_ms = int(self._runtime_asr.get("minAudioMs") or settings.asr_min_audio_ms) if self._is_openai_compatible_provider(asr_provider) and asr_api_key: self.asr_service = OpenAICompatibleASRService( api_key=asr_api_key, api_url=asr_api_url, model=asr_model or "FunAudioLLM/SenseVoiceSmall", sample_rate=settings.sample_rate, interim_interval_ms=asr_interim_interval, min_audio_for_interim_ms=asr_min_audio_ms, on_transcript=self._on_transcript_callback ) logger.info(f"Using OpenAI-compatible ASR service (provider={asr_provider})") else: self.asr_service = BufferedASRService( sample_rate=settings.sample_rate ) logger.info("Using Buffered ASR service (no real transcription)") await self.asr_service.connect() logger.info("DuplexPipeline services connected") if not self._outbound_task or self._outbound_task.done(): self._outbound_task = asyncio.create_task(self._outbound_loop()) # Resolve greeting once per session start. # Always emit text opener event so text-only sessions can display it. if self._bot_starts_first(): greeting_to_speak = self.conversation.greeting if self._generated_opener_enabled(): generated_greeting = await self._generate_runtime_greeting() if generated_greeting: greeting_to_speak = generated_greeting self.conversation.greeting = generated_greeting if greeting_to_speak: self._start_turn() self._start_response() await self._send_event( ev( "assistant.response.final", text=greeting_to_speak, trackId=self.track_audio_out, ), priority=20, ) await self.conversation.add_assistant_turn(greeting_to_speak) if tts_output_enabled: await self._speak(greeting_to_speak) except Exception as e: logger.error(f"Failed to start pipeline: {e}") raise async def _enqueue_outbound(self, kind: str, payload: Any, priority: int) -> None: """Queue outbound message with priority ordering.""" self._outbound_seq += 1 await self._outbound_q.put((priority, self._outbound_seq, kind, payload)) async def _send_event(self, event: Dict[str, Any], priority: int = 20) -> None: await self._enqueue_outbound("event", self._envelope_event(event), priority) async def _send_audio(self, pcm_bytes: bytes, priority: int = 50) -> None: if not pcm_bytes: return self._audio_out_frame_buffer += pcm_bytes while len(self._audio_out_frame_buffer) >= self._PCM_FRAME_BYTES: frame = self._audio_out_frame_buffer[: self._PCM_FRAME_BYTES] self._audio_out_frame_buffer = self._audio_out_frame_buffer[self._PCM_FRAME_BYTES :] await self._enqueue_outbound("audio", frame, priority) async def _flush_audio_out_frames(self, priority: int = 50) -> None: """Flush remaining outbound audio as one padded 20ms PCM frame.""" if not self._audio_out_frame_buffer: return tail = self._audio_out_frame_buffer self._audio_out_frame_buffer = b"" if len(tail) < self._PCM_FRAME_BYTES: tail = tail + (b"\x00" * (self._PCM_FRAME_BYTES - len(tail))) await self._enqueue_outbound("audio", tail, priority) async def _emit_transcript_delta(self, text: str) -> None: await self._send_event( { **ev( "transcript.delta", trackId=self.track_audio_in, text=text, ) }, priority=30, ) async def _emit_llm_delta(self, text: str) -> None: await self._send_event( { **ev( "assistant.response.delta", trackId=self.track_audio_out, text=text, ) }, priority=20, ) async def _flush_pending_llm_delta(self) -> None: if not self._pending_llm_delta: return chunk = self._pending_llm_delta self._pending_llm_delta = "" self._last_llm_delta_emit_ms = time.monotonic() * 1000.0 await self._emit_llm_delta(chunk) async def _outbound_loop(self) -> None: """Single sender loop that enforces priority for interrupt events.""" while True: _priority, _seq, kind, payload = await self._outbound_q.get() try: if kind == "stop": return if kind == "audio": if self._drop_outbound_audio: continue await self.transport.send_audio(payload) elif kind == "event": await self.transport.send_event(payload) except Exception as e: logger.error(f"Outbound send error ({kind}): {e}") finally: self._outbound_q.task_done() async def process_audio(self, pcm_bytes: bytes) -> None: """ Process incoming audio chunk. This is the main entry point for audio from the user. Args: pcm_bytes: PCM audio data (16-bit, mono, 16kHz) """ if not self._running: return try: async with self._process_lock: if pcm_bytes: self._pre_speech_buffer += pcm_bytes if len(self._pre_speech_buffer) > self._asr_pre_speech_bytes: self._pre_speech_buffer = self._pre_speech_buffer[-self._asr_pre_speech_bytes:] # 1. Process through VAD vad_result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms) vad_status = "Silence" if vad_result: event_type, probability = vad_result vad_status = "Speech" if event_type == "speaking" else "Silence" # Emit VAD event await self.event_bus.publish(event_type, { "trackId": self.track_audio_in, "probability": probability }) await self._send_event( ev( "input.speech_started" if event_type == "speaking" else "input.speech_stopped", trackId=self.track_audio_in, probability=probability, ), priority=30, ) else: # No state change - keep previous status vad_status = self._last_vad_status # Update state based on VAD if vad_status == "Speech" and self._last_vad_status != "Speech": await self._on_speech_start() self._last_vad_status = vad_status # 2. Check for barge-in (user speaking while bot speaking) # Filter false interruptions by requiring minimum speech duration if self._is_bot_speaking and self._barge_in_enabled(): if vad_status == "Speech": # User is speaking while bot is speaking self._barge_in_silence_frames = 0 # Reset silence counter if self._barge_in_speech_start_time is None: # Start tracking speech duration self._barge_in_speech_start_time = time.time() self._barge_in_speech_frames = 1 logger.debug("Potential barge-in detected, tracking duration...") else: self._barge_in_speech_frames += 1 # Check if speech duration exceeds threshold speech_duration_ms = (time.time() - self._barge_in_speech_start_time) * 1000 if speech_duration_ms >= self._resolved_barge_in_min_duration_ms(): logger.info(f"Barge-in confirmed after {speech_duration_ms:.0f}ms of speech ({self._barge_in_speech_frames} frames)") await self._handle_barge_in() else: # Silence frame during potential barge-in if self._barge_in_speech_start_time is not None: self._barge_in_silence_frames += 1 # Allow brief silence gaps (VAD flickering) if self._barge_in_silence_frames > self._barge_in_silence_tolerance_frames(): # Too much silence - reset barge-in tracking logger.debug(f"Barge-in cancelled after {self._barge_in_silence_frames} silence frames") self._barge_in_speech_start_time = None self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 elif self._is_bot_speaking and not self._barge_in_enabled(): self._barge_in_speech_start_time = None self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 # 3. Buffer audio for ASR. # Gate ASR startup by a short speech-duration threshold to reduce # false positives from micro noises, then always close the turn # by EOU once ASR has started. just_started_asr = False if vad_status == "Speech" and not self._asr_capture_active: self._pending_speech_audio += pcm_bytes pending_ms = (len(self._pending_speech_audio) / (settings.sample_rate * 2)) * 1000.0 if pending_ms >= self._asr_start_min_speech_ms: await self._start_asr_capture() just_started_asr = True if self._asr_capture_active: if not just_started_asr: self._audio_buffer += pcm_bytes if len(self._audio_buffer) > self._max_audio_buffer_bytes: # Keep only the most recent audio to cap memory usage self._audio_buffer = self._audio_buffer[-self._max_audio_buffer_bytes:] await self.asr_service.send_audio(pcm_bytes) # For SiliconFlow ASR, trigger interim transcription periodically # The service handles timing internally via start_interim_transcription() # 4. Check for End of Utterance - this triggers LLM response if self.eou_detector.process(vad_status, force_eligible=self._asr_capture_active): await self._on_end_of_utterance() elif ( vad_status == "Silence" and not self.eou_detector.is_speaking and not self._asr_capture_active and self.conversation.state == ConversationState.LISTENING ): # Speech was too short to pass ASR gate; reset turn so next # utterance can start cleanly. self._pending_speech_audio = b"" self._audio_buffer = b"" self._last_sent_transcript = "" await self.conversation.set_state(ConversationState.IDLE) except Exception as e: logger.error(f"Pipeline audio processing error: {e}", exc_info=True) async def process_text(self, text: str) -> None: """ Process text input (chat command). Allows direct text input to bypass ASR. Args: text: User text input """ if not self._running: return logger.info(f"Processing text input: {text[:50]}...") # Cancel any current speaking await self._stop_current_speech() self._start_turn() self._finalize_utterance() # Start new turn await self.conversation.end_user_turn(text) self._current_turn_task = asyncio.create_task(self._handle_turn(text)) async def interrupt(self) -> None: """Interrupt current bot speech (manual interrupt command).""" await self._handle_barge_in() async def _on_transcript_callback(self, text: str, is_final: bool) -> None: """ Callback for ASR transcription results. Streams transcription to client for display. Args: text: Transcribed text is_final: Whether this is the final transcription """ # Avoid sending duplicate transcripts if text == self._last_sent_transcript and not is_final: return now_ms = time.monotonic() * 1000.0 self._last_sent_transcript = text if is_final: self._pending_transcript_delta = "" self._last_transcript_delta_emit_ms = 0.0 await self._send_event( { **ev( "transcript.final", trackId=self.track_audio_in, text=text, ) }, priority=30, ) logger.debug(f"Sent transcript (final): {text[:50]}...") return self._pending_transcript_delta = text should_emit = ( self._last_transcript_delta_emit_ms <= 0.0 or now_ms - self._last_transcript_delta_emit_ms >= self._ASR_DELTA_THROTTLE_MS ) if should_emit and self._pending_transcript_delta: delta = self._pending_transcript_delta self._pending_transcript_delta = "" self._last_transcript_delta_emit_ms = now_ms await self._emit_transcript_delta(delta) if not is_final: logger.info(f"[ASR] ASR interim: {text[:100]}") logger.debug(f"Sent transcript (interim): {text[:50]}...") async def _on_speech_start(self) -> None: """Handle user starting to speak.""" if self.conversation.state in (ConversationState.IDLE, ConversationState.INTERRUPTED): self._start_turn() self._finalize_utterance() await self.conversation.start_user_turn() self._audio_buffer = b"" self._last_sent_transcript = "" self.eou_detector.reset() self._asr_capture_active = False self._pending_speech_audio = b"" # Clear ASR buffer. Interim starts only after ASR capture is activated. if hasattr(self.asr_service, 'clear_buffer'): self.asr_service.clear_buffer() logger.debug("User speech started") async def _start_asr_capture(self) -> None: """Start ASR capture for the current turn after min speech gate passes.""" if self._asr_capture_active: return if hasattr(self.asr_service, 'start_interim_transcription'): await self.asr_service.start_interim_transcription() # Prime ASR with a short pre-speech context window so the utterance # start isn't lost while waiting for VAD to transition to Speech. pre_roll = self._pre_speech_buffer # _pre_speech_buffer already includes current speech frames; avoid # duplicating onset audio when we append pending speech below. if self._pending_speech_audio and len(pre_roll) > len(self._pending_speech_audio): pre_roll = pre_roll[:-len(self._pending_speech_audio)] elif self._pending_speech_audio: pre_roll = b"" capture_audio = pre_roll + self._pending_speech_audio if capture_audio: await self.asr_service.send_audio(capture_audio) self._audio_buffer = capture_audio[-self._max_audio_buffer_bytes:] self._asr_capture_active = True logger.debug( f"ASR capture started after speech gate ({self._asr_start_min_speech_ms}ms), " f"capture={len(capture_audio)} bytes" ) async def _on_end_of_utterance(self) -> None: """Handle end of user utterance.""" if self.conversation.state not in (ConversationState.LISTENING, ConversationState.INTERRUPTED): return # Add a tiny trailing silence tail to stabilize final-token decoding. if self._asr_final_tail_bytes > 0: final_tail = b"\x00" * self._asr_final_tail_bytes await self.asr_service.send_audio(final_tail) # Stop interim transcriptions if hasattr(self.asr_service, 'stop_interim_transcription'): await self.asr_service.stop_interim_transcription() # Get final transcription from ASR service user_text = "" if hasattr(self.asr_service, 'get_final_transcription'): # SiliconFlow ASR - get final transcription user_text = await self.asr_service.get_final_transcription() elif hasattr(self.asr_service, 'get_and_clear_text'): # Buffered ASR - get accumulated text user_text = self.asr_service.get_and_clear_text() # Skip if no meaningful text if not user_text or not user_text.strip(): logger.debug("[EOU] Detected but no transcription - skipping") # Reset for next utterance self._audio_buffer = b"" self._last_sent_transcript = "" self._asr_capture_active = False self._pending_speech_audio = b"" # Return to idle; don't force LISTENING which causes buffering on silence await self.conversation.set_state(ConversationState.IDLE) return logger.info(f"[EOU] Detected - user said: {user_text[:100]}...") self._finalize_utterance() # For ASR backends that already emitted final via callback, # avoid duplicating transcript.final on EOU. if user_text != self._last_sent_transcript: await self._send_event({ **ev( "transcript.final", trackId=self.track_audio_in, text=user_text, ) }, priority=25) # Clear buffers self._audio_buffer = b"" self._last_sent_transcript = "" self._pending_transcript_delta = "" self._last_transcript_delta_emit_ms = 0.0 self._asr_capture_active = False self._pending_speech_audio = b"" # Process the turn - trigger LLM response # Cancel any existing turn to avoid overlapping assistant responses await self._stop_current_speech() await self.conversation.end_user_turn(user_text) self._current_turn_task = asyncio.create_task(self._handle_turn(user_text)) def _resolved_knowledge_config(self) -> Dict[str, Any]: cfg: Dict[str, Any] = {} if isinstance(self._runtime_knowledge, dict): cfg.update(self._runtime_knowledge) kb_id = self._runtime_knowledge_base_id or str( cfg.get("kbId") or cfg.get("knowledgeBaseId") or "" ).strip() if kb_id: cfg["kbId"] = kb_id cfg.setdefault("enabled", True) return cfg def _resolved_tool_schemas(self) -> List[Dict[str, Any]]: schemas: List[Dict[str, Any]] = [] for item in self._runtime_tools: if isinstance(item, str): base = self._DEFAULT_TOOL_SCHEMAS.get(item) if base: schemas.append( { "type": "function", "function": { "name": base["name"], "description": base.get("description") or "", "parameters": base.get("parameters") or {"type": "object", "properties": {}}, }, } ) continue if not isinstance(item, dict): continue fn = item.get("function") if isinstance(fn, dict) and fn.get("name"): fn_name = str(fn.get("name")) schemas.append( { "type": "function", "function": { "name": str(fn.get("name")), "description": str(fn.get("description") or item.get("description") or ""), "parameters": fn.get("parameters") or {"type": "object", "properties": {}}, }, } ) continue if item.get("name"): schemas.append( { "type": "function", "function": { "name": str(item.get("name")), "description": str(item.get("description") or ""), "parameters": item.get("parameters") or {"type": "object", "properties": {}}, }, } ) return schemas def _resolved_tool_executor_map(self) -> Dict[str, str]: result: Dict[str, str] = {} for item in self._runtime_tools: if not isinstance(item, dict): continue fn = item.get("function") if isinstance(fn, dict) and fn.get("name"): name = str(fn.get("name")) else: name = str(item.get("name") or "").strip() if not name: continue executor = str(item.get("executor") or item.get("run_on") or "").strip().lower() if executor in {"client", "server"}: result[name] = executor return result def _tool_name(self, tool_call: Dict[str, Any]) -> str: fn = tool_call.get("function") if isinstance(fn, dict): return str(fn.get("name") or "").strip() return "" def _tool_executor(self, tool_call: Dict[str, Any]) -> str: name = self._tool_name(tool_call) if name and name in self._runtime_tool_executor: return self._runtime_tool_executor[name] # Default to server execution unless explicitly marked as client. return "server" def _tool_arguments(self, tool_call: Dict[str, Any]) -> Dict[str, Any]: fn = tool_call.get("function") if not isinstance(fn, dict): return {} raw = fn.get("arguments") if isinstance(raw, dict): return raw if isinstance(raw, str) and raw.strip(): try: parsed = json.loads(raw) return parsed if isinstance(parsed, dict) else {"raw": raw} except Exception: return {"raw": raw} return {} def _normalize_tool_result(self, result: Dict[str, Any]) -> Dict[str, Any]: status = result.get("status") if isinstance(result.get("status"), dict) else {} status_code = int(status.get("code") or 0) if status else 0 status_message = str(status.get("message") or "") if status else "" tool_call_id = str(result.get("tool_call_id") or result.get("id") or "") tool_name = str(result.get("name") or "unknown_tool") ok = bool(200 <= status_code < 300) retryable = status_code >= 500 or status_code in {429, 408} error: Optional[Dict[str, Any]] = None if not ok: error = { "code": status_code or 500, "message": status_message or "tool_execution_failed", "retryable": retryable, } return { "tool_call_id": tool_call_id, "tool_name": tool_name, "ok": ok, "error": error, "status": {"code": status_code, "message": status_message}, } async def _emit_tool_result(self, result: Dict[str, Any], source: str) -> None: tool_name = str(result.get("name") or "unknown_tool") call_id = str(result.get("tool_call_id") or result.get("id") or "") status = result.get("status") if isinstance(result.get("status"), dict) else {} status_code = int(status.get("code") or 0) if status else 0 status_message = str(status.get("message") or "") if status else "" logger.info( f"[Tool] emit result source={source} name={tool_name} call_id={call_id} " f"status={status_code} {status_message}".strip() ) normalized = self._normalize_tool_result(result) await self._send_event( { **ev( "assistant.tool_result", trackId=self.track_audio_out, source=source, tool_call_id=normalized["tool_call_id"], tool_name=normalized["tool_name"], ok=normalized["ok"], error=normalized["error"], result=result, ) }, priority=22, ) async def handle_tool_call_results(self, results: List[Dict[str, Any]]) -> None: """Handle client tool execution results.""" if not isinstance(results, list): return for item in results: if not isinstance(item, dict): continue call_id = str(item.get("tool_call_id") or item.get("id") or "").strip() if not call_id: continue if self._pending_client_tool_call_ids and call_id not in self._pending_client_tool_call_ids: logger.warning(f"[Tool] ignore unsolicited client result call_id={call_id}") continue if call_id in self._completed_tool_call_ids: logger.debug(f"[Tool] ignore duplicate client result call_id={call_id}") continue status = item.get("status") if isinstance(item.get("status"), dict) else {} status_code = int(status.get("code") or 0) if status else 0 status_message = str(status.get("message") or "") if status else "" tool_name = str(item.get("name") or "unknown_tool") logger.info( f"[Tool] received client result name={tool_name} call_id={call_id} " f"status={status_code} {status_message}".strip() ) waiter = self._pending_tool_waiters.get(call_id) if waiter and not waiter.done(): waiter.set_result(item) self._completed_tool_call_ids.add(call_id) continue self._early_tool_results[call_id] = item self._completed_tool_call_ids.add(call_id) async def _wait_for_single_tool_result(self, call_id: str) -> Dict[str, Any]: if call_id in self._completed_tool_call_ids and call_id not in self._early_tool_results: return { "tool_call_id": call_id, "status": {"code": 208, "message": "tool_call result already handled"}, "output": "", } if call_id in self._early_tool_results: self._completed_tool_call_ids.add(call_id) return self._early_tool_results.pop(call_id) loop = asyncio.get_running_loop() future = loop.create_future() self._pending_tool_waiters[call_id] = future try: return await asyncio.wait_for(future, timeout=self._TOOL_WAIT_TIMEOUT_SECONDS) except asyncio.TimeoutError: self._completed_tool_call_ids.add(call_id) return { "tool_call_id": call_id, "status": {"code": 504, "message": "tool_call timeout"}, "output": "", } finally: self._pending_tool_waiters.pop(call_id, None) self._pending_client_tool_call_ids.discard(call_id) def _normalize_stream_event(self, item: Any) -> LLMStreamEvent: if isinstance(item, LLMStreamEvent): return item if isinstance(item, str): return LLMStreamEvent(type="text_delta", text=item) if isinstance(item, dict): event_type = str(item.get("type") or "") if event_type in {"text_delta", "tool_call", "done"}: return LLMStreamEvent( type=event_type, # type: ignore[arg-type] text=item.get("text"), tool_call=item.get("tool_call"), ) return LLMStreamEvent(type="done") async def _handle_turn(self, user_text: str) -> None: """ Handle a complete conversation turn. Uses sentence-by-sentence streaming TTS for lower latency. Args: user_text: User's transcribed text """ try: if not self._current_turn_id: self._start_turn() if not self._current_utterance_id: self._finalize_utterance() self._start_response() # Start latency tracking self._turn_start_time = time.time() self._first_audio_sent = False full_response = "" messages = self.conversation.get_messages() max_rounds = 3 await self.conversation.start_assistant_turn() self._is_bot_speaking = True self._interrupt_event.clear() self._drop_outbound_audio = False first_audio_sent = False self._pending_llm_delta = "" self._last_llm_delta_emit_ms = 0.0 for _ in range(max_rounds): if self._interrupt_event.is_set(): break sentence_buffer = "" pending_punctuation = "" round_response = "" tool_calls: List[Dict[str, Any]] = [] allow_text_output = True async for raw_event in self.llm_service.generate_stream(messages): if self._interrupt_event.is_set(): break event = self._normalize_stream_event(raw_event) if event.type == "tool_call": await self._flush_pending_llm_delta() tool_call = event.tool_call if isinstance(event.tool_call, dict) else None if not tool_call: continue allow_text_output = False executor = self._tool_executor(tool_call) enriched_tool_call = dict(tool_call) enriched_tool_call["executor"] = executor tool_name = self._tool_name(enriched_tool_call) or "unknown_tool" call_id = str(enriched_tool_call.get("id") or "").strip() fn_payload = enriched_tool_call.get("function") raw_args = str(fn_payload.get("arguments") or "") if isinstance(fn_payload, dict) else "" args_preview = raw_args if len(raw_args) <= 160 else f"{raw_args[:160]}..." logger.info( f"[Tool] call requested name={tool_name} call_id={call_id} " f"executor={executor} args={args_preview}" ) tool_calls.append(enriched_tool_call) tool_arguments = self._tool_arguments(enriched_tool_call) if executor == "client" and call_id: self._pending_client_tool_call_ids.add(call_id) await self._send_event( { **ev( "assistant.tool_call", trackId=self.track_audio_out, tool_call_id=call_id, tool_name=tool_name, arguments=tool_arguments, executor=executor, timeout_ms=int(self._TOOL_WAIT_TIMEOUT_SECONDS * 1000), tool_call=enriched_tool_call, ) }, priority=22, ) continue if event.type != "text_delta": continue text_chunk = event.text or "" if not text_chunk: continue if not allow_text_output: continue full_response += text_chunk round_response += text_chunk sentence_buffer += text_chunk await self.conversation.update_assistant_text(text_chunk) self._pending_llm_delta += text_chunk now_ms = time.monotonic() * 1000.0 if ( self._last_llm_delta_emit_ms <= 0.0 or now_ms - self._last_llm_delta_emit_ms >= self._LLM_DELTA_THROTTLE_MS ): await self._flush_pending_llm_delta() while True: split_result = extract_tts_sentence( sentence_buffer, end_chars=self._SENTENCE_END_CHARS, trailing_chars=self._SENTENCE_TRAILING_CHARS, closers=self._SENTENCE_CLOSERS, min_split_spoken_chars=self._MIN_SPLIT_SPOKEN_CHARS, hold_trailing_at_buffer_end=True, force=False, ) if not split_result: break sentence, sentence_buffer = split_result if not sentence: continue sentence = f"{pending_punctuation}{sentence}".strip() pending_punctuation = "" if not sentence: continue if not has_spoken_content(sentence): pending_punctuation = sentence continue if self._tts_output_enabled() and not self._interrupt_event.is_set(): if not first_audio_sent: self._start_tts() await self._send_event( { **ev( "output.audio.start", trackId=self.track_audio_out, ) }, priority=10, ) first_audio_sent = True await self._speak_sentence( sentence, fade_in_ms=0, fade_out_ms=8, ) remaining_text = f"{pending_punctuation}{sentence_buffer}".strip() await self._flush_pending_llm_delta() if ( self._tts_output_enabled() and remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set() ): if not first_audio_sent: self._start_tts() await self._send_event( { **ev( "output.audio.start", trackId=self.track_audio_out, ) }, priority=10, ) first_audio_sent = True await self._speak_sentence( remaining_text, fade_in_ms=0, fade_out_ms=8, ) if not tool_calls: break tool_results: List[Dict[str, Any]] = [] for call in tool_calls: call_id = str(call.get("id") or "").strip() if not call_id: continue executor = str(call.get("executor") or "server").strip().lower() tool_name = self._tool_name(call) or "unknown_tool" logger.info(f"[Tool] execute start name={tool_name} call_id={call_id} executor={executor}") if executor == "client": result = await self._wait_for_single_tool_result(call_id) await self._emit_tool_result(result, source="client") tool_results.append(result) continue try: result = await asyncio.wait_for( execute_server_tool(call), timeout=self._SERVER_TOOL_TIMEOUT_SECONDS, ) except asyncio.TimeoutError: result = { "tool_call_id": call_id, "name": self._tool_name(call) or "unknown_tool", "output": {"message": "server tool timeout"}, "status": {"code": 504, "message": "server_tool_timeout"}, } await self._emit_tool_result(result, source="server") tool_results.append(result) messages = [ *messages, LLMMessage( role="assistant", content=round_response.strip(), ), LLMMessage( role="system", content=( "Tool execution results are available. " "Continue answering the user naturally using these results. " "Do not request the same tool again in this turn.\n" f"tool_calls={json.dumps(tool_calls, ensure_ascii=False)}\n" f"tool_results={json.dumps(tool_results, ensure_ascii=False)}" ), ), ] if full_response and not self._interrupt_event.is_set(): await self._flush_pending_llm_delta() await self._send_event( { **ev( "assistant.response.final", trackId=self.track_audio_out, text=full_response, ) }, priority=20, ) # Send track end if first_audio_sent: await self._flush_audio_out_frames(priority=50) await self._send_event({ **ev( "output.audio.end", trackId=self.track_audio_out, ) }, priority=10) # End assistant turn await self.conversation.end_assistant_turn( was_interrupted=self._interrupt_event.is_set() ) except asyncio.CancelledError: logger.info("Turn handling cancelled") await self.conversation.end_assistant_turn(was_interrupted=True) except Exception as e: logger.error(f"Turn handling error: {e}", exc_info=True) await self.conversation.end_assistant_turn(was_interrupted=True) finally: self._is_bot_speaking = False # Reset barge-in tracking when bot finishes speaking self._barge_in_speech_start_time = None self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 self._current_response_id = None self._current_tts_id = None async def _speak_sentence(self, text: str, fade_in_ms: int = 0, fade_out_ms: int = 8) -> None: """ Synthesize and send a single sentence. Args: text: Sentence to speak fade_in_ms: Fade-in duration for sentence start chunks fade_out_ms: Fade-out duration for sentence end chunks """ if not self._tts_output_enabled(): return if not text.strip() or self._interrupt_event.is_set() or not self.tts_service: return logger.info(f"[TTS] split sentence: {text!r}") try: is_first_chunk = True async for chunk in self.tts_service.synthesize_stream(text): # Check interrupt at the start of each iteration if self._interrupt_event.is_set(): logger.debug("TTS sentence interrupted") break # Track and log first audio packet latency (TTFB) if not self._first_audio_sent and self._turn_start_time: ttfb_ms = (time.time() - self._turn_start_time) * 1000 self._first_audio_sent = True logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") # Send TTFB event to client await self._send_event({ **ev( "metrics.ttfb", trackId=self.track_audio_out, latencyMs=round(ttfb_ms), ) }, priority=25) # Double-check interrupt right before sending audio if self._interrupt_event.is_set(): break smoothed_audio = self._apply_edge_fade( pcm_bytes=chunk.audio, sample_rate=chunk.sample_rate, fade_in=is_first_chunk, fade_out=bool(chunk.is_final), fade_in_ms=fade_in_ms, fade_out_ms=fade_out_ms, ) is_first_chunk = False await self._send_audio(smoothed_audio, priority=50) except asyncio.CancelledError: logger.debug("TTS sentence cancelled") except Exception as e: logger.error(f"TTS sentence error: {e}") def _apply_edge_fade( self, pcm_bytes: bytes, sample_rate: int, fade_in: bool = False, fade_out: bool = False, fade_in_ms: int = 0, fade_out_ms: int = 8, ) -> bytes: """Apply short edge fades to reduce click/pop at sentence boundaries.""" if not pcm_bytes or (not fade_in and not fade_out): return pcm_bytes try: samples = np.frombuffer(pcm_bytes, dtype=" 0: fade_in_samples = int(sample_rate * (fade_in_ms / 1000.0)) fade_in_samples = max(1, min(fade_in_samples, samples.size)) samples[:fade_in_samples] *= np.linspace(0.0, 1.0, fade_in_samples, endpoint=True) if fade_out: fade_out_samples = int(sample_rate * (fade_out_ms / 1000.0)) fade_out_samples = max(1, min(fade_out_samples, samples.size)) samples[-fade_out_samples:] *= np.linspace(1.0, 0.0, fade_out_samples, endpoint=True) return np.clip(samples, -32768, 32767).astype(" None: """ Synthesize and send speech. Args: text: Text to speak """ if not self._tts_output_enabled(): return if not text.strip() or not self.tts_service: return try: self._drop_outbound_audio = False # Start latency tracking for greeting speak_start_time = time.time() first_audio_sent = False # Send track start event self._start_tts() await self._send_event({ **ev( "output.audio.start", trackId=self.track_audio_out, ) }, priority=10) self._is_bot_speaking = True # Stream TTS audio async for chunk in self.tts_service.synthesize_stream(text): if self._interrupt_event.is_set(): logger.info("TTS interrupted by barge-in") break # Track and log first audio packet latency (TTFB) if not first_audio_sent: ttfb_ms = (time.time() - speak_start_time) * 1000 first_audio_sent = True logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") # Send TTFB event to client await self._send_event({ **ev( "metrics.ttfb", trackId=self.track_audio_out, latencyMs=round(ttfb_ms), ) }, priority=25) # Send audio to client await self._send_audio(chunk.audio, priority=50) # Small delay to prevent flooding await asyncio.sleep(0.01) # Send track end event await self._flush_audio_out_frames(priority=50) await self._send_event({ **ev( "output.audio.end", trackId=self.track_audio_out, ) }, priority=10) except asyncio.CancelledError: logger.info("TTS cancelled") raise except Exception as e: logger.error(f"TTS error: {e}") finally: self._is_bot_speaking = False async def _handle_barge_in(self) -> None: """Handle user barge-in (interruption).""" if not self._is_bot_speaking: return logger.info("Barge-in detected - interrupting bot speech") # Reset barge-in tracking self._barge_in_speech_start_time = None self._barge_in_speech_frames = 0 self._barge_in_silence_frames = 0 # IMPORTANT: Signal interruption FIRST to stop audio sending self._interrupt_event.set() self._is_bot_speaking = False self._drop_outbound_audio = True self._audio_out_frame_buffer = b"" # Send interrupt event to client IMMEDIATELY # This must happen BEFORE canceling services, so client knows to discard in-flight audio await self._send_event({ **ev( "response.interrupted", trackId=self.track_audio_out, ) }, priority=0) # Cancel TTS if self.tts_service: await self.tts_service.cancel() # Cancel LLM if self.llm_service and hasattr(self.llm_service, 'cancel'): self.llm_service.cancel() # Interrupt conversation only if there is no active turn task. # When a turn task exists, it will handle end_assistant_turn() to avoid double callbacks. if not (self._current_turn_task and not self._current_turn_task.done()): await self.conversation.interrupt() # Reset for new user turn await self.conversation.start_user_turn() self._audio_buffer = b"" self.eou_detector.reset() self._asr_capture_active = False self._pending_speech_audio = b"" async def _stop_current_speech(self) -> None: """Stop any current speech task.""" self._drop_outbound_audio = True self._audio_out_frame_buffer = b"" if self._current_turn_task and not self._current_turn_task.done(): self._interrupt_event.set() self._current_turn_task.cancel() try: await self._current_turn_task except asyncio.CancelledError: pass # Ensure underlying services are cancelled to avoid leaking work/audio if self.tts_service: await self.tts_service.cancel() if self.llm_service and hasattr(self.llm_service, 'cancel'): self.llm_service.cancel() self._is_bot_speaking = False self._interrupt_event.clear() async def cleanup(self) -> None: """Cleanup pipeline resources.""" logger.info(f"Cleaning up DuplexPipeline for session {self.session_id}") self._running = False await self._stop_current_speech() if self._outbound_task and not self._outbound_task.done(): await self._enqueue_outbound("stop", None, priority=-1000) await self._outbound_task self._outbound_task = None # Disconnect services if self.llm_service: await self.llm_service.disconnect() if self.tts_service: await self.tts_service.disconnect() if self.asr_service: await self.asr_service.disconnect() def _get_timestamp_ms(self) -> int: """Get current timestamp in milliseconds.""" import time return int(time.time() * 1000) @property def is_speaking(self) -> bool: """Check if bot is currently speaking.""" return self._is_bot_speaking @property def state(self) -> ConversationState: """Get current conversation state.""" return self.conversation.state