diff --git a/engine/core/duplex_pipeline.py b/engine/core/duplex_pipeline.py index 2a6c121..1b7ed58 100644 --- a/engine/core/duplex_pipeline.py +++ b/engine/core/duplex_pipeline.py @@ -13,7 +13,7 @@ event-driven design. import asyncio import time -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Tuple import numpy as np from loguru import logger @@ -130,6 +130,11 @@ class DuplexPipeline: self._asr_final_tail_bytes = int(settings.sample_rate * 2 * (asr_final_tail_ms / 1000.0)) self._last_vad_status: str = "Silence" self._process_lock = asyncio.Lock() + # Priority outbound dispatcher (lower value = higher priority). + self._outbound_q: asyncio.PriorityQueue[Tuple[int, int, str, Any]] = asyncio.PriorityQueue() + self._outbound_seq = 0 + self._outbound_task: Optional[asyncio.Task] = None + self._drop_outbound_audio = False # Interruption handling self._interrupt_event = asyncio.Event() @@ -271,6 +276,8 @@ class DuplexPipeline: await self.asr_service.connect() logger.info("DuplexPipeline services connected") + if not self._outbound_task or self._outbound_task.done(): + self._outbound_task = asyncio.create_task(self._outbound_loop()) # Speak greeting if configured if self.conversation.greeting: @@ -280,6 +287,35 @@ class DuplexPipeline: logger.error(f"Failed to start pipeline: {e}") raise + async def _enqueue_outbound(self, kind: str, payload: Any, priority: int) -> None: + """Queue outbound message with priority ordering.""" + self._outbound_seq += 1 + await self._outbound_q.put((priority, self._outbound_seq, kind, payload)) + + async def _send_event(self, event: Dict[str, Any], priority: int = 20) -> None: + await self._enqueue_outbound("event", event, priority) + + async def _send_audio(self, pcm_bytes: bytes, priority: int = 50) -> None: + await self._enqueue_outbound("audio", pcm_bytes, priority) + + async def _outbound_loop(self) -> None: + """Single sender loop that enforces priority for interrupt events.""" + while True: + _priority, _seq, kind, payload = await self._outbound_q.get() + try: + if kind == "stop": + return + if kind == "audio": + if self._drop_outbound_audio: + continue + await self.transport.send_audio(payload) + elif kind == "event": + await self.transport.send_event(payload) + except Exception as e: + logger.error(f"Outbound send error ({kind}): {e}") + finally: + self._outbound_q.task_done() + async def process_audio(self, pcm_bytes: bytes) -> None: """ Process incoming audio chunk. @@ -312,12 +348,13 @@ class DuplexPipeline: "trackId": self.session_id, "probability": probability }) - await self.transport.send_event( + await self._send_event( ev( "input.speech_started" if event_type == "speaking" else "input.speech_stopped", trackId=self.session_id, probability=probability, - ) + ), + priority=30, ) else: # No state change - keep previous status @@ -420,13 +457,13 @@ class DuplexPipeline: self._last_sent_transcript = text # Send transcript event to client - await self.transport.send_event({ + await self._send_event({ **ev( "transcript.final" if is_final else "transcript.delta", trackId=self.session_id, text=text, ) - }) + }, priority=30) if not is_final: logger.info(f"[ASR] ASR interim: {text[:100]}") @@ -497,13 +534,13 @@ class DuplexPipeline: # For ASR backends that already emitted final via callback, # avoid duplicating transcript.final on EOU. if user_text != self._last_sent_transcript: - await self.transport.send_event({ + await self._send_event({ **ev( "transcript.final", trackId=self.session_id, text=user_text, ) - }) + }, priority=25) # Clear buffers self._audio_buffer = b"" @@ -536,6 +573,7 @@ class DuplexPipeline: await self.conversation.start_assistant_turn() self._is_bot_speaking = True self._interrupt_event.clear() + self._drop_outbound_audio = False # Sentence buffer for streaming TTS sentence_buffer = "" @@ -553,13 +591,13 @@ class DuplexPipeline: await self.conversation.update_assistant_text(text_chunk) # Send LLM response streaming event to client - await self.transport.send_event({ + await self._send_event({ **ev( "assistant.response.delta", trackId=self.session_id, text=text_chunk, ) - }) + }, priority=40) # Check for sentence completion - synthesize immediately for low latency while True: @@ -591,12 +629,12 @@ class DuplexPipeline: if not self._interrupt_event.is_set(): # Send track start on first audio if not first_audio_sent: - await self.transport.send_event({ + await self._send_event({ **ev( "output.audio.start", trackId=self.session_id, ) - }) + }, priority=10) first_audio_sent = True await self._speak_sentence( @@ -608,24 +646,24 @@ class DuplexPipeline: # Send final LLM response event if full_response and not self._interrupt_event.is_set(): - await self.transport.send_event({ + await self._send_event({ **ev( "assistant.response.final", trackId=self.session_id, text=full_response, ) - }) + }, priority=20) # Speak any remaining text remaining_text = f"{pending_punctuation}{sentence_buffer}".strip() if remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set(): if not first_audio_sent: - await self.transport.send_event({ + await self._send_event({ **ev( "output.audio.start", trackId=self.session_id, ) - }) + }, priority=10) first_audio_sent = True await self._speak_sentence( remaining_text, @@ -635,12 +673,12 @@ class DuplexPipeline: # Send track end if first_audio_sent: - await self.transport.send_event({ + await self._send_event({ **ev( "output.audio.end", trackId=self.session_id, ) - }) + }, priority=10) # End assistant turn await self.conversation.end_assistant_turn( @@ -689,13 +727,13 @@ class DuplexPipeline: logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") # Send TTFB event to client - await self.transport.send_event({ + await self._send_event({ **ev( "metrics.ttfb", trackId=self.session_id, latencyMs=round(ttfb_ms), ) - }) + }, priority=25) # Double-check interrupt right before sending audio if self._interrupt_event.is_set(): @@ -711,7 +749,7 @@ class DuplexPipeline: ) is_first_chunk = False - await self.transport.send_audio(smoothed_audio) + await self._send_audio(smoothed_audio, priority=50) except asyncio.CancelledError: logger.debug("TTS sentence cancelled") except Exception as e: @@ -760,17 +798,18 @@ class DuplexPipeline: return try: + self._drop_outbound_audio = False # Start latency tracking for greeting speak_start_time = time.time() first_audio_sent = False # Send track start event - await self.transport.send_event({ + await self._send_event({ **ev( "output.audio.start", trackId=self.session_id, ) - }) + }, priority=10) self._is_bot_speaking = True @@ -787,27 +826,27 @@ class DuplexPipeline: logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") # Send TTFB event to client - await self.transport.send_event({ + await self._send_event({ **ev( "metrics.ttfb", trackId=self.session_id, latencyMs=round(ttfb_ms), ) - }) + }, priority=25) # Send audio to client - await self.transport.send_audio(chunk.audio) + await self._send_audio(chunk.audio, priority=50) # Small delay to prevent flooding await asyncio.sleep(0.01) # Send track end event - await self.transport.send_event({ + await self._send_event({ **ev( "output.audio.end", trackId=self.session_id, ) - }) + }, priority=10) except asyncio.CancelledError: logger.info("TTS cancelled") @@ -832,15 +871,16 @@ class DuplexPipeline: # IMPORTANT: Signal interruption FIRST to stop audio sending self._interrupt_event.set() self._is_bot_speaking = False + self._drop_outbound_audio = True # Send interrupt event to client IMMEDIATELY # This must happen BEFORE canceling services, so client knows to discard in-flight audio - await self.transport.send_event({ + await self._send_event({ **ev( "response.interrupted", trackId=self.session_id, ) - }) + }, priority=0) # Cancel TTS if self.tts_service: @@ -862,6 +902,7 @@ class DuplexPipeline: async def _stop_current_speech(self) -> None: """Stop any current speech task.""" + self._drop_outbound_audio = True if self._current_turn_task and not self._current_turn_task.done(): self._interrupt_event.set() self._current_turn_task.cancel() @@ -885,6 +926,10 @@ class DuplexPipeline: self._running = False await self._stop_current_speech() + if self._outbound_task and not self._outbound_task.done(): + await self._enqueue_outbound("stop", None, priority=-1000) + await self._outbound_task + self._outbound_task = None # Disconnect services if self.llm_service: