Use priority queue for vad event

This commit is contained in:
Xin Wang
2026-02-09 19:17:50 +08:00
parent 0135f718f3
commit b3e1c3d380

View File

@@ -13,7 +13,7 @@ event-driven design.
import asyncio
import time
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, Tuple
import numpy as np
from loguru import logger
@@ -130,6 +130,11 @@ class DuplexPipeline:
self._asr_final_tail_bytes = int(settings.sample_rate * 2 * (asr_final_tail_ms / 1000.0))
self._last_vad_status: str = "Silence"
self._process_lock = asyncio.Lock()
# Priority outbound dispatcher (lower value = higher priority).
self._outbound_q: asyncio.PriorityQueue[Tuple[int, int, str, Any]] = asyncio.PriorityQueue()
self._outbound_seq = 0
self._outbound_task: Optional[asyncio.Task] = None
self._drop_outbound_audio = False
# Interruption handling
self._interrupt_event = asyncio.Event()
@@ -271,6 +276,8 @@ class DuplexPipeline:
await self.asr_service.connect()
logger.info("DuplexPipeline services connected")
if not self._outbound_task or self._outbound_task.done():
self._outbound_task = asyncio.create_task(self._outbound_loop())
# Speak greeting if configured
if self.conversation.greeting:
@@ -280,6 +287,35 @@ class DuplexPipeline:
logger.error(f"Failed to start pipeline: {e}")
raise
async def _enqueue_outbound(self, kind: str, payload: Any, priority: int) -> None:
"""Queue outbound message with priority ordering."""
self._outbound_seq += 1
await self._outbound_q.put((priority, self._outbound_seq, kind, payload))
async def _send_event(self, event: Dict[str, Any], priority: int = 20) -> None:
await self._enqueue_outbound("event", event, priority)
async def _send_audio(self, pcm_bytes: bytes, priority: int = 50) -> None:
await self._enqueue_outbound("audio", pcm_bytes, priority)
async def _outbound_loop(self) -> None:
"""Single sender loop that enforces priority for interrupt events."""
while True:
_priority, _seq, kind, payload = await self._outbound_q.get()
try:
if kind == "stop":
return
if kind == "audio":
if self._drop_outbound_audio:
continue
await self.transport.send_audio(payload)
elif kind == "event":
await self.transport.send_event(payload)
except Exception as e:
logger.error(f"Outbound send error ({kind}): {e}")
finally:
self._outbound_q.task_done()
async def process_audio(self, pcm_bytes: bytes) -> None:
"""
Process incoming audio chunk.
@@ -312,12 +348,13 @@ class DuplexPipeline:
"trackId": self.session_id,
"probability": probability
})
await self.transport.send_event(
await self._send_event(
ev(
"input.speech_started" if event_type == "speaking" else "input.speech_stopped",
trackId=self.session_id,
probability=probability,
)
),
priority=30,
)
else:
# No state change - keep previous status
@@ -420,13 +457,13 @@ class DuplexPipeline:
self._last_sent_transcript = text
# Send transcript event to client
await self.transport.send_event({
await self._send_event({
**ev(
"transcript.final" if is_final else "transcript.delta",
trackId=self.session_id,
text=text,
)
})
}, priority=30)
if not is_final:
logger.info(f"ASR interim: {text[:100]}")
@@ -497,13 +534,13 @@ class DuplexPipeline:
# For ASR backends that already emitted final via callback,
# avoid duplicating transcript.final on EOU.
if user_text != self._last_sent_transcript:
await self.transport.send_event({
await self._send_event({
**ev(
"transcript.final",
trackId=self.session_id,
text=user_text,
)
})
}, priority=25)
# Clear buffers
self._audio_buffer = b""
@@ -536,6 +573,7 @@ class DuplexPipeline:
await self.conversation.start_assistant_turn()
self._is_bot_speaking = True
self._interrupt_event.clear()
self._drop_outbound_audio = False
# Sentence buffer for streaming TTS
sentence_buffer = ""
@@ -553,13 +591,13 @@ class DuplexPipeline:
await self.conversation.update_assistant_text(text_chunk)
# Send LLM response streaming event to client
await self.transport.send_event({
await self._send_event({
**ev(
"assistant.response.delta",
trackId=self.session_id,
text=text_chunk,
)
})
}, priority=40)
# Check for sentence completion - synthesize immediately for low latency
while True:
@@ -591,12 +629,12 @@ class DuplexPipeline:
if not self._interrupt_event.is_set():
# Send track start on first audio
if not first_audio_sent:
await self.transport.send_event({
await self._send_event({
**ev(
"output.audio.start",
trackId=self.session_id,
)
})
}, priority=10)
first_audio_sent = True
await self._speak_sentence(
@@ -608,24 +646,24 @@ class DuplexPipeline:
# Send final LLM response event
if full_response and not self._interrupt_event.is_set():
await self.transport.send_event({
await self._send_event({
**ev(
"assistant.response.final",
trackId=self.session_id,
text=full_response,
)
})
}, priority=20)
# Speak any remaining text
remaining_text = f"{pending_punctuation}{sentence_buffer}".strip()
if remaining_text and has_spoken_content(remaining_text) and not self._interrupt_event.is_set():
if not first_audio_sent:
await self.transport.send_event({
await self._send_event({
**ev(
"output.audio.start",
trackId=self.session_id,
)
})
}, priority=10)
first_audio_sent = True
await self._speak_sentence(
remaining_text,
@@ -635,12 +673,12 @@ class DuplexPipeline:
# Send track end
if first_audio_sent:
await self.transport.send_event({
await self._send_event({
**ev(
"output.audio.end",
trackId=self.session_id,
)
})
}, priority=10)
# End assistant turn
await self.conversation.end_assistant_turn(
@@ -689,13 +727,13 @@ class DuplexPipeline:
logger.info(f"[TTFB] Server first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
await self._send_event({
**ev(
"metrics.ttfb",
trackId=self.session_id,
latencyMs=round(ttfb_ms),
)
})
}, priority=25)
# Double-check interrupt right before sending audio
if self._interrupt_event.is_set():
@@ -711,7 +749,7 @@ class DuplexPipeline:
)
is_first_chunk = False
await self.transport.send_audio(smoothed_audio)
await self._send_audio(smoothed_audio, priority=50)
except asyncio.CancelledError:
logger.debug("TTS sentence cancelled")
except Exception as e:
@@ -760,17 +798,18 @@ class DuplexPipeline:
return
try:
self._drop_outbound_audio = False
# Start latency tracking for greeting
speak_start_time = time.time()
first_audio_sent = False
# Send track start event
await self.transport.send_event({
await self._send_event({
**ev(
"output.audio.start",
trackId=self.session_id,
)
})
}, priority=10)
self._is_bot_speaking = True
@@ -787,27 +826,27 @@ class DuplexPipeline:
logger.info(f"[TTFB] Greeting first audio packet latency: {ttfb_ms:.0f}ms (session {self.session_id})")
# Send TTFB event to client
await self.transport.send_event({
await self._send_event({
**ev(
"metrics.ttfb",
trackId=self.session_id,
latencyMs=round(ttfb_ms),
)
})
}, priority=25)
# Send audio to client
await self.transport.send_audio(chunk.audio)
await self._send_audio(chunk.audio, priority=50)
# Small delay to prevent flooding
await asyncio.sleep(0.01)
# Send track end event
await self.transport.send_event({
await self._send_event({
**ev(
"output.audio.end",
trackId=self.session_id,
)
})
}, priority=10)
except asyncio.CancelledError:
logger.info("TTS cancelled")
@@ -832,15 +871,16 @@ class DuplexPipeline:
# IMPORTANT: Signal interruption FIRST to stop audio sending
self._interrupt_event.set()
self._is_bot_speaking = False
self._drop_outbound_audio = True
# Send interrupt event to client IMMEDIATELY
# This must happen BEFORE canceling services, so client knows to discard in-flight audio
await self.transport.send_event({
await self._send_event({
**ev(
"response.interrupted",
trackId=self.session_id,
)
})
}, priority=0)
# Cancel TTS
if self.tts_service:
@@ -862,6 +902,7 @@ class DuplexPipeline:
async def _stop_current_speech(self) -> None:
"""Stop any current speech task."""
self._drop_outbound_audio = True
if self._current_turn_task and not self._current_turn_task.done():
self._interrupt_event.set()
self._current_turn_task.cancel()
@@ -885,6 +926,10 @@ class DuplexPipeline:
self._running = False
await self._stop_current_speech()
if self._outbound_task and not self._outbound_task.done():
await self._enqueue_outbound("stop", None, priority=-1000)
await self._outbound_task
self._outbound_task = None
# Disconnect services
if self.llm_service: