This commit is contained in:
Xin Wang
2026-02-09 19:18:24 +08:00

View File

@@ -13,7 +13,7 @@ event-driven design.
import asyncio import asyncio
import time import time
from typing import Any, Dict, Optional from typing import Any, Dict, Optional, Tuple
import numpy as np import numpy as np
from loguru import logger from loguru import logger
@@ -130,6 +130,11 @@ class DuplexPipeline:
self._asr_final_tail_bytes = int(settings.sample_rate * 2 * (asr_final_tail_ms / 1000.0)) self._asr_final_tail_bytes = int(settings.sample_rate * 2 * (asr_final_tail_ms / 1000.0))
self._last_vad_status: str = "Silence" self._last_vad_status: str = "Silence"
self._process_lock = asyncio.Lock() self._process_lock = asyncio.Lock()
# Priority outbound dispatcher (lower value = higher priority).
self._outbound_q: asyncio.PriorityQueue[Tuple[int, int, str, Any]] = asyncio.PriorityQueue()
self._outbound_seq = 0
self._outbound_task: Optional[asyncio.Task] = None
self._drop_outbound_audio = False
# Interruption handling # Interruption handling
self._interrupt_event = asyncio.Event() self._interrupt_event = asyncio.Event()
@@ -271,6 +276,8 @@ class DuplexPipeline:
await self.asr_service.connect() await self.asr_service.connect()
logger.info("DuplexPipeline services connected") logger.info("DuplexPipeline services connected")
if not self._outbound_task or self._outbound_task.done():
self._outbound_task = asyncio.create_task(self._outbound_loop())
# Speak greeting if configured # Speak greeting if configured
if self.conversation.greeting: if self.conversation.greeting:
@@ -280,6 +287,35 @@ class DuplexPipeline:
logger.error(f"Failed to start pipeline: {e}") logger.error(f"Failed to start pipeline: {e}")
raise raise
async def _enqueue_outbound(self, kind: str, payload: Any, priority: int) -> None:
"""Queue outbound message with priority ordering."""
self._outbound_seq += 1
await self._outbound_q.put((priority, self._outbound_seq, kind, payload))
async def _send_event(self, event: Dict[str, Any], priority: int = 20) -> None:
await self._enqueue_outbound("event", event, priority)
async def _send_audio(self, pcm_bytes: bytes, priority: int = 50) -> None:
await self._enqueue_outbound("audio", pcm_bytes, priority)
async def _outbound_loop(self) -> None:
"""Single sender loop that enforces priority for interrupt events."""
while True:
_priority, _seq, kind, payload = await self._outbound_q.get()
try:
if kind == "stop":
return
if kind == "audio":
if self._drop_outbound_audio:
continue
await self.transport.send_audio(payload)
elif kind == "event":
await self.transport.send_event(payload)
except Exception as e:
logger.error(f"Outbound send error ({kind}): {e}")
finally:
self._outbound_q.task_done()
async def process_audio(self, pcm_bytes: bytes) -> None: async def process_audio(self, pcm_bytes: bytes) -> None:
""" """
Process incoming audio chunk. Process incoming audio chunk.
@@ -312,12 +348,13 @@ class DuplexPipeline:
"trackId": self.session_id, "trackId": self.session_id,
"probability": probability "probability": probability
}) })
await self.transport.send_event( await self._send_event(
ev( ev(
"input.speech_started" if event_type == "speaking" else "input.speech_stopped", "input.speech_started" if event_type == "speaking" else "input.speech_stopped",
trackId=self.session_id, trackId=self.session_id,
probability=probability, probability=probability,
) ),
priority=30,
) )
else: else:
# No state change - keep previous status # No state change - keep previous status
@@ -420,13 +457,13 @@ class DuplexPipeline:
self._last_sent_transcript = text self._last_sent_transcript = text
# Send transcript event to client # Send transcript event to client
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"transcript.final" if is_final else "transcript.delta", "transcript.final" if is_final else "transcript.delta",
trackId=self.session_id, trackId=self.session_id,
text=text, text=text,
) )
}) }, priority=30)
if not is_final: if not is_final:
logger.info(f"[ASR] ASR interim: {text[:100]}") logger.info(f"[ASR] ASR interim: {text[:100]}")
@@ -497,13 +534,13 @@ class DuplexPipeline:
# For ASR backends that already emitted final via callback, # For ASR backends that already emitted final via callback,
# avoid duplicating transcript.final on EOU. # avoid duplicating transcript.final on EOU.
if user_text != self._last_sent_transcript: if user_text != self._last_sent_transcript:
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"transcript.final", "transcript.final",
trackId=self.session_id, trackId=self.session_id,
text=user_text, text=user_text,
) )
}) }, priority=25)
# Clear buffers # Clear buffers
self._audio_buffer = b"" self._audio_buffer = b""
@@ -536,6 +573,7 @@ class DuplexPipeline:
await self.conversation.start_assistant_turn() await self.conversation.start_assistant_turn()
self._is_bot_speaking = True self._is_bot_speaking = True
self._interrupt_event.clear() self._interrupt_event.clear()
self._drop_outbound_audio = False
# Sentence buffer for streaming TTS # Sentence buffer for streaming TTS
sentence_buffer = "" sentence_buffer = ""
@@ -553,13 +591,13 @@ class DuplexPipeline:
await self.conversation.update_assistant_text(text_chunk) await self.conversation.update_assistant_text(text_chunk)
# Send LLM response streaming event to client # Send LLM response streaming event to client
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"assistant.response.delta", "assistant.response.delta",
trackId=self.session_id, trackId=self.session_id,
text=text_chunk, text=text_chunk,
) )
}) }, priority=40)
# Check for sentence completion - synthesize immediately for low latency # Check for sentence completion - synthesize immediately for low latency
while True: while True:
@@ -591,12 +629,12 @@ class DuplexPipeline:
if not self._interrupt_event.is_set(): if not self._interrupt_event.is_set():
# Send track start on first audio # Send track start on first audio
if not first_audio_sent: if not first_audio_sent:
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"output.audio.start", "output.audio.start",
trackId=self.session_id, trackId=self.session_id,
) )
}) }, priority=10)
first_audio_sent = True first_audio_sent = True
await self._speak_sentence( await self._speak_sentence(
@@ -608,24 +646,24 @@ class DuplexPipeline:
# Send final LLM response event # Send final LLM response event
if full_response and not self._interrupt_event.is_set(): if full_response and not self._interrupt_event.is_set():
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"assistant.response.final", "assistant.response.final",
trackId=self.session_id, trackId=self.session_id,
text=full_response, text=full_response,
) )
}) }, priority=20)
# Speak any remaining text # Speak any remaining text
remaining_text = f"{pending_punctuation}{sentence_buffer}".strip() remaining_text = f"{pending_punctuation}{sentence_buffer}".strip()
if remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set(): if remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set():
if not first_audio_sent: if not first_audio_sent:
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"output.audio.start", "output.audio.start",
trackId=self.session_id, trackId=self.session_id,
) )
}) }, priority=10)
first_audio_sent = True first_audio_sent = True
await self._speak_sentence( await self._speak_sentence(
remaining_text, remaining_text,
@@ -635,12 +673,12 @@ class DuplexPipeline:
# Send track end # Send track end
if first_audio_sent: if first_audio_sent:
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"output.audio.end", "output.audio.end",
trackId=self.session_id, trackId=self.session_id,
) )
}) }, priority=10)
# End assistant turn # End assistant turn
await self.conversation.end_assistant_turn( await self.conversation.end_assistant_turn(
@@ -689,13 +727,13 @@ class DuplexPipeline:
logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client # Send TTFB event to client
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"metrics.ttfb", "metrics.ttfb",
trackId=self.session_id, trackId=self.session_id,
latencyMs=round(ttfb_ms), latencyMs=round(ttfb_ms),
) )
}) }, priority=25)
# Double-check interrupt right before sending audio # Double-check interrupt right before sending audio
if self._interrupt_event.is_set(): if self._interrupt_event.is_set():
@@ -711,7 +749,7 @@ class DuplexPipeline:
) )
is_first_chunk = False is_first_chunk = False
await self.transport.send_audio(smoothed_audio) await self._send_audio(smoothed_audio, priority=50)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug("TTS sentence cancelled") logger.debug("TTS sentence cancelled")
except Exception as e: except Exception as e:
@@ -760,17 +798,18 @@ class DuplexPipeline:
return return
try: try:
self._drop_outbound_audio = False
# Start latency tracking for greeting # Start latency tracking for greeting
speak_start_time = time.time() speak_start_time = time.time()
first_audio_sent = False first_audio_sent = False
# Send track start event # Send track start event
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"output.audio.start", "output.audio.start",
trackId=self.session_id, trackId=self.session_id,
) )
}) }, priority=10)
self._is_bot_speaking = True self._is_bot_speaking = True
@@ -787,27 +826,27 @@ class DuplexPipeline:
logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})") logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client # Send TTFB event to client
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"metrics.ttfb", "metrics.ttfb",
trackId=self.session_id, trackId=self.session_id,
latencyMs=round(ttfb_ms), latencyMs=round(ttfb_ms),
) )
}) }, priority=25)
# Send audio to client # Send audio to client
await self.transport.send_audio(chunk.audio) await self._send_audio(chunk.audio, priority=50)
# Small delay to prevent flooding # Small delay to prevent flooding
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
# Send track end event # Send track end event
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"output.audio.end", "output.audio.end",
trackId=self.session_id, trackId=self.session_id,
) )
}) }, priority=10)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("TTS cancelled") logger.info("TTS cancelled")
@@ -832,15 +871,16 @@ class DuplexPipeline:
# IMPORTANT: Signal interruption FIRST to stop audio sending # IMPORTANT: Signal interruption FIRST to stop audio sending
self._interrupt_event.set() self._interrupt_event.set()
self._is_bot_speaking = False self._is_bot_speaking = False
self._drop_outbound_audio = True
# Send interrupt event to client IMMEDIATELY # Send interrupt event to client IMMEDIATELY
# This must happen BEFORE canceling services, so client knows to discard in-flight audio # This must happen BEFORE canceling services, so client knows to discard in-flight audio
await self.transport.send_event({ await self._send_event({
**ev( **ev(
"response.interrupted", "response.interrupted",
trackId=self.session_id, trackId=self.session_id,
) )
}) }, priority=0)
# Cancel TTS # Cancel TTS
if self.tts_service: if self.tts_service:
@@ -862,6 +902,7 @@ class DuplexPipeline:
async def _stop_current_speech(self) -> None: async def _stop_current_speech(self) -> None:
"""Stop any current speech task.""" """Stop any current speech task."""
self._drop_outbound_audio = True
if self._current_turn_task and not self._current_turn_task.done(): if self._current_turn_task and not self._current_turn_task.done():
self._interrupt_event.set() self._interrupt_event.set()
self._current_turn_task.cancel() self._current_turn_task.cancel()
@@ -885,6 +926,10 @@ class DuplexPipeline:
self._running = False self._running = False
await self._stop_current_speech() await self._stop_current_speech()
if self._outbound_task and not self._outbound_task.done():
await self._enqueue_outbound("stop", None, priority=-1000)
await self._outbound_task
self._outbound_task = None
# Disconnect services # Disconnect services
if self.llm_service: if self.llm_service: