diff --git a/app/config.py b/app/config.py index c728c45..9ae41af 100644 --- a/app/config.py +++ b/app/config.py @@ -31,7 +31,7 @@ class Settings(BaseSettings): vad_model_path: str = Field(default="data/vad/silero_vad.onnx", description="Path to VAD model") vad_threshold: float = Field(default=0.5, description="VAD detection threshold") vad_min_speech_duration_ms: int = Field(default=250, description="Minimum speech duration in milliseconds") - vad_eou_threshold_ms: int = Field(default=400, description="End of utterance (silence) threshold in milliseconds") + vad_eou_threshold_ms: int = Field(default=800, description="End of utterance (silence) threshold in milliseconds") # OpenAI / LLM Configuration openai_api_key: Optional[str] = Field(default=None, description="OpenAI API key") @@ -61,6 +61,12 @@ class Settings(BaseSettings): default="You are a helpful, friendly voice assistant. Keep your responses concise and conversational.", description="System prompt for LLM" ) + + # Barge-in (interruption) Configuration + barge_in_min_duration_ms: int = Field( + default=50, + description="Minimum speech duration (ms) required to trigger barge-in. 50-100ms recommended." + ) # Logging log_level: str = Field(default="INFO", description="Logging level") diff --git a/core/duplex_pipeline.py b/core/duplex_pipeline.py index db2ed81..4714a1d 100644 --- a/core/duplex_pipeline.py +++ b/core/duplex_pipeline.py @@ -12,6 +12,7 @@ event-driven design. """ import asyncio +import time from typing import Optional, Callable, Awaitable from loguru import logger @@ -112,6 +113,13 @@ class DuplexPipeline: # Interruption handling self._interrupt_event = asyncio.Event() + # Barge-in filtering - require minimum speech duration to interrupt + self._barge_in_speech_start_time: Optional[float] = None + self._barge_in_min_duration_ms: int = settings.barge_in_min_duration_ms if hasattr(settings, 'barge_in_min_duration_ms') else 50 + self._barge_in_speech_frames: int = 0 # Count speech frames + self._barge_in_silence_frames: int = 0 # Count silence frames during potential barge-in + self._barge_in_silence_tolerance: int = 3 # Allow up to 3 silence frames (60ms at 20ms chunks) + logger.info(f"DuplexPipeline initialized for session {session_id}") async def start(self) -> None: @@ -218,8 +226,35 @@ class DuplexPipeline: self._last_vad_status = vad_status # 2. Check for barge-in (user speaking while bot speaking) - if self._is_bot_speaking and vad_status == "Speech": - await self._handle_barge_in() + # Filter false interruptions by requiring minimum speech duration + if self._is_bot_speaking: + if vad_status == "Speech": + # User is speaking while bot is speaking + self._barge_in_silence_frames = 0 # Reset silence counter + + if self._barge_in_speech_start_time is None: + # Start tracking speech duration + self._barge_in_speech_start_time = time.time() + self._barge_in_speech_frames = 1 + logger.debug("Potential barge-in detected, tracking duration...") + else: + self._barge_in_speech_frames += 1 + # Check if speech duration exceeds threshold + speech_duration_ms = (time.time() - self._barge_in_speech_start_time) * 1000 + if speech_duration_ms >= self._barge_in_min_duration_ms: + logger.info(f"Barge-in confirmed after {speech_duration_ms:.0f}ms of speech ({self._barge_in_speech_frames} frames)") + await self._handle_barge_in() + else: + # Silence frame during potential barge-in + if self._barge_in_speech_start_time is not None: + self._barge_in_silence_frames += 1 + # Allow brief silence gaps (VAD flickering) + if self._barge_in_silence_frames > self._barge_in_silence_tolerance: + # Too much silence - reset barge-in tracking + logger.debug(f"Barge-in cancelled after {self._barge_in_silence_frames} silence frames") + self._barge_in_speech_start_time = None + self._barge_in_speech_frames = 0 + self._barge_in_silence_frames = 0 # 3. Buffer audio for ASR if vad_status == "Speech" or self.conversation.state == ConversationState.LISTENING: @@ -334,6 +369,15 @@ class DuplexPipeline: logger.info(f"EOU detected - user said: {user_text[:100]}...") + # Send final transcription to client + await self.transport.send_event({ + "event": "transcript", + "trackId": self.session_id, + "text": user_text, + "isFinal": True, + "timestamp": self._get_timestamp_ms() + }) + # Clear buffers self._audio_buffer = b"" self._last_sent_transcript = "" @@ -434,6 +478,10 @@ class DuplexPipeline: await self.conversation.end_assistant_turn(was_interrupted=True) finally: self._is_bot_speaking = False + # Reset barge-in tracking when bot finishes speaking + self._barge_in_speech_start_time = None + self._barge_in_speech_frames = 0 + self._barge_in_silence_frames = 0 async def _speak_sentence(self, text: str) -> None: """ @@ -508,6 +556,11 @@ class DuplexPipeline: logger.info("Barge-in detected - interrupting bot speech") + # Reset barge-in tracking + self._barge_in_speech_start_time = None + self._barge_in_speech_frames = 0 + self._barge_in_silence_frames = 0 + # Signal interruption self._interrupt_event.set() diff --git a/examples/mic_client.py b/examples/mic_client.py index e3941ff..b7b95cb 100644 --- a/examples/mic_client.py +++ b/examples/mic_client.py @@ -151,53 +151,57 @@ class MicrophoneClient: with self.audio_output_lock: self.audio_output_buffer += audio_data - async def _playback_task(self): - """Background task to play buffered audio smoothly using output stream.""" - # Use a continuous output stream for smooth playback - chunk_samples = int(self.sample_rate * 0.05) # 50ms chunks - chunk_bytes = chunk_samples * 2 # 16-bit = 2 bytes per sample + def _playback_thread_func(self): + """Thread function for continuous audio playback.""" + import time - def output_callback(outdata, frames, time_info, status): - """Audio output callback.""" - if status: - print(f"Output status: {status}") - - bytes_needed = frames * 2 - with self.audio_output_lock: - if len(self.audio_output_buffer) >= bytes_needed: - audio_data = self.audio_output_buffer[:bytes_needed] - self.audio_output_buffer = self.audio_output_buffer[bytes_needed:] - samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32) / 32767.0 - outdata[:, 0] = samples - else: - outdata.fill(0) + # Chunk size: 50ms of audio + chunk_samples = int(self.sample_rate * 0.05) + chunk_bytes = chunk_samples * 2 + + print(f"Audio playback thread started (device: {self.output_device or 'default'})") - # Create and start output stream try: - output_stream = sd.OutputStream( + # Create output stream with callback + with sd.OutputStream( samplerate=self.sample_rate, channels=1, - dtype=np.float32, + dtype='int16', blocksize=chunk_samples, device=self.output_device, - callback=output_callback, latency='low' - ) - output_stream.start() - print(f"Audio output stream started (device: {self.output_device or 'default'})") - - # Keep stream running while client is active - while self.running: - await asyncio.sleep(0.1) - - output_stream.stop() - output_stream.close() - + ) as stream: + while self.running: + # Get audio from buffer + with self.audio_output_lock: + if len(self.audio_output_buffer) >= chunk_bytes: + audio_data = self.audio_output_buffer[:chunk_bytes] + self.audio_output_buffer = self.audio_output_buffer[chunk_bytes:] + else: + # Not enough audio - output silence + audio_data = b'\x00' * chunk_bytes + + # Convert to numpy array and write to stream + samples = np.frombuffer(audio_data, dtype=np.int16).reshape(-1, 1) + stream.write(samples) + except Exception as e: - print(f"Playback error: {e}") + print(f"Playback thread error: {e}") import traceback traceback.print_exc() + async def _playback_task(self): + """Start playback thread and monitor it.""" + # Run playback in a dedicated thread for reliable timing + playback_thread = threading.Thread(target=self._playback_thread_func, daemon=True) + playback_thread.start() + + # Wait for client to stop + while self.running and playback_thread.is_alive(): + await asyncio.sleep(0.1) + + print("Audio playback stopped") + async def audio_sender(self) -> None: """Send audio from microphone to server.""" while self.running: @@ -274,10 +278,13 @@ class MicrophoneClient: text = event.get("text", "") is_final = event.get("isFinal", False) if is_final: - print(f"← You said: {text}") + # Clear the interim line and print final + print(" " * 80, end="\r") # Clear previous interim text + print(f"→ You: {text}") else: - # Interim result - show with indicator - print(f"← [listening] {text}", end="\r") + # Interim result - show with indicator (overwrite same line) + display_text = text[:60] + "..." if len(text) > 60 else text + print(f" [listening] {display_text}".ljust(80), end="\r") elif event_type == "trackStart": print("← Bot started speaking") # Clear any old audio in buffer @@ -287,6 +294,11 @@ class MicrophoneClient: print("← Bot finished speaking") elif event_type == "interrupt": print("← Bot interrupted!") + # IMPORTANT: Clear audio buffer immediately on interrupt + with self.audio_output_lock: + buffer_ms = len(self.audio_output_buffer) / (self.sample_rate * 2) * 1000 + self.audio_output_buffer = b"" + print(f" (cleared {buffer_ms:.0f}ms of buffered audio)") elif event_type == "error": print(f"← Error: {event.get('error')}") elif event_type == "hangup":