Compare commits

...

2 Commits

Author SHA1 Message Date
Xin Wang
e511cf9077 Fix Potential state duplication on barge-in. 2026-02-06 08:30:37 +08:00
Xin Wang
0576231d8d Fix Race risks if process_audio is called concurrently. 2026-02-06 08:26:56 +08:00

View File

@@ -111,6 +111,7 @@ class DuplexPipeline:
max_buffer_seconds = settings.max_audio_buffer_seconds if hasattr(settings, "max_audio_buffer_seconds") else 30 max_buffer_seconds = settings.max_audio_buffer_seconds if hasattr(settings, "max_audio_buffer_seconds") else 30
self._max_audio_buffer_bytes = int(settings.sample_rate * 2 * max_buffer_seconds) self._max_audio_buffer_bytes = int(settings.sample_rate * 2 * max_buffer_seconds)
self._last_vad_status: str = "Silence" self._last_vad_status: str = "Silence"
self._process_lock = asyncio.Lock()
# Interruption handling # Interruption handling
self._interrupt_event = asyncio.Event() self._interrupt_event = asyncio.Event()
@@ -208,6 +209,7 @@ class DuplexPipeline:
return return
try: try:
async with self._process_lock:
# 1. Process through VAD # 1. Process through VAD
vad_result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms) vad_result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms)
@@ -657,7 +659,9 @@ class DuplexPipeline:
if self.llm_service and hasattr(self.llm_service, 'cancel'): if self.llm_service and hasattr(self.llm_service, 'cancel'):
self.llm_service.cancel() self.llm_service.cancel()
# Interrupt conversation # Interrupt conversation only if there is no active turn task.
# When a turn task exists, it will handle end_assistant_turn() to avoid double callbacks.
if not (self._current_turn_task and not self._current_turn_task.done()):
await self.conversation.interrupt() await self.conversation.interrupt()
# Reset for new user turn # Reset for new user turn