diff --git a/core/duplex_pipeline.py b/core/duplex_pipeline.py index b6ae930..396ca29 100644 --- a/core/duplex_pipeline.py +++ b/core/duplex_pipeline.py @@ -111,6 +111,7 @@ class DuplexPipeline: max_buffer_seconds = settings.max_audio_buffer_seconds if hasattr(settings, "max_audio_buffer_seconds") else 30 self._max_audio_buffer_bytes = int(settings.sample_rate * 2 * max_buffer_seconds) self._last_vad_status: str = "Silence" + self._process_lock = asyncio.Lock() # Interruption handling self._interrupt_event = asyncio.Event() @@ -208,8 +209,9 @@ class DuplexPipeline: return try: - # 1. Process through VAD - vad_result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms) + async with self._process_lock: + # 1. Process through VAD + vad_result = self.vad_processor.process(pcm_bytes, settings.chunk_size_ms) vad_status = "Silence" if vad_result: @@ -231,51 +233,51 @@ class DuplexPipeline: self._last_vad_status = vad_status - # 2. Check for barge-in (user speaking while bot speaking) - # Filter false interruptions by requiring minimum speech duration - if self._is_bot_speaking: - if vad_status == "Speech": - # User is speaking while bot is speaking - self._barge_in_silence_frames = 0 # Reset silence counter - - if self._barge_in_speech_start_time is None: - # Start tracking speech duration - self._barge_in_speech_start_time = time.time() - self._barge_in_speech_frames = 1 - logger.debug("Potential barge-in detected, tracking duration...") + # 2. Check for barge-in (user speaking while bot speaking) + # Filter false interruptions by requiring minimum speech duration + if self._is_bot_speaking: + if vad_status == "Speech": + # User is speaking while bot is speaking + self._barge_in_silence_frames = 0 # Reset silence counter + + if self._barge_in_speech_start_time is None: + # Start tracking speech duration + self._barge_in_speech_start_time = time.time() + self._barge_in_speech_frames = 1 + logger.debug("Potential barge-in detected, tracking duration...") + else: + self._barge_in_speech_frames += 1 + # Check if speech duration exceeds threshold + speech_duration_ms = (time.time() - self._barge_in_speech_start_time) * 1000 + if speech_duration_ms >= self._barge_in_min_duration_ms: + logger.info(f"Barge-in confirmed after {speech_duration_ms:.0f}ms of speech ({self._barge_in_speech_frames} frames)") + await self._handle_barge_in() else: - self._barge_in_speech_frames += 1 - # Check if speech duration exceeds threshold - speech_duration_ms = (time.time() - self._barge_in_speech_start_time) * 1000 - if speech_duration_ms >= self._barge_in_min_duration_ms: - logger.info(f"Barge-in confirmed after {speech_duration_ms:.0f}ms of speech ({self._barge_in_speech_frames} frames)") - await self._handle_barge_in() - else: - # Silence frame during potential barge-in - if self._barge_in_speech_start_time is not None: - self._barge_in_silence_frames += 1 - # Allow brief silence gaps (VAD flickering) - if self._barge_in_silence_frames > self._barge_in_silence_tolerance: - # Too much silence - reset barge-in tracking - logger.debug(f"Barge-in cancelled after {self._barge_in_silence_frames} silence frames") - self._barge_in_speech_start_time = None - self._barge_in_speech_frames = 0 - self._barge_in_silence_frames = 0 - - # 3. Buffer audio for ASR - if vad_status == "Speech" or self.conversation.state == ConversationState.LISTENING: - self._audio_buffer += pcm_bytes - if len(self._audio_buffer) > self._max_audio_buffer_bytes: - # Keep only the most recent audio to cap memory usage - self._audio_buffer = self._audio_buffer[-self._max_audio_buffer_bytes:] - await self.asr_service.send_audio(pcm_bytes) + # Silence frame during potential barge-in + if self._barge_in_speech_start_time is not None: + self._barge_in_silence_frames += 1 + # Allow brief silence gaps (VAD flickering) + if self._barge_in_silence_frames > self._barge_in_silence_tolerance: + # Too much silence - reset barge-in tracking + logger.debug(f"Barge-in cancelled after {self._barge_in_silence_frames} silence frames") + self._barge_in_speech_start_time = None + self._barge_in_speech_frames = 0 + self._barge_in_silence_frames = 0 - # For SiliconFlow ASR, trigger interim transcription periodically - # The service handles timing internally via start_interim_transcription() - - # 4. Check for End of Utterance - this triggers LLM response - if self.eou_detector.process(vad_status): - await self._on_end_of_utterance() + # 3. Buffer audio for ASR + if vad_status == "Speech" or self.conversation.state == ConversationState.LISTENING: + self._audio_buffer += pcm_bytes + if len(self._audio_buffer) > self._max_audio_buffer_bytes: + # Keep only the most recent audio to cap memory usage + self._audio_buffer = self._audio_buffer[-self._max_audio_buffer_bytes:] + await self.asr_service.send_audio(pcm_bytes) + + # For SiliconFlow ASR, trigger interim transcription periodically + # The service handles timing internally via start_interim_transcription() + + # 4. Check for End of Utterance - this triggers LLM response + if self.eou_detector.process(vad_status): + await self._on_end_of_utterance() except Exception as e: logger.error(f"Pipeline audio processing error: {e}", exc_info=True)