"""Voice Activity Detection using Silero VAD.""" import asyncio import os from typing import Tuple, Optional import numpy as np from loguru import logger from processors.eou import EouDetector # Try to import onnxruntime (optional for VAD functionality) try: import onnxruntime as ort ONNX_AVAILABLE = True except ImportError: ONNX_AVAILABLE = False ort = None logger.warning("onnxruntime not available - VAD will be disabled") class SileroVAD: """ Voice Activity Detection using Silero VAD model. Detects speech in audio chunks using the Silero VAD ONNX model. Returns "Speech" or "Silence" for each audio chunk. """ def __init__(self, model_path: str = "data/vad/silero_vad.onnx", sample_rate: int = 16000): """ Initialize Silero VAD. Args: model_path: Path to Silero VAD ONNX model sample_rate: Audio sample rate (must be 16kHz for Silero VAD) """ self.sample_rate = sample_rate self.model_path = model_path # Check if model exists if not os.path.exists(model_path): logger.warning(f"VAD model not found at {model_path}. VAD will be disabled.") self.session = None return # Check if onnxruntime is available if not ONNX_AVAILABLE: logger.warning("onnxruntime not available - VAD will be disabled") self.session = None return # Load ONNX model try: self.session = ort.InferenceSession(model_path) logger.info(f"Loaded Silero VAD model from {model_path}") except Exception as e: logger.error(f"Failed to load VAD model: {e}") self.session = None return # Internal state for VAD self._reset_state() self.buffer = np.array([], dtype=np.float32) self.min_chunk_size = 512 self.last_label = "Silence" self.last_probability = 0.0 def _reset_state(self): # Silero VAD V4+ expects state shape [2, 1, 128] self._state = np.zeros((2, 1, 128), dtype=np.float32) self._sr = np.array([self.sample_rate], dtype=np.int64) def process_audio(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Tuple[str, float]: """ Process audio chunk and detect speech. Args: pcm_bytes: PCM audio data (16-bit, mono, 16kHz) chunk_size_ms: Chunk duration in milliseconds (ignored for buffering logic) Returns: Tuple of (label, probability) where label is "Speech" or "Silence" """ if self.session is None or not ONNX_AVAILABLE: # Fallback energy-based VAD when model isn't available. # Map RMS energy to a pseudo-probability so the existing threshold works. if not pcm_bytes: return "Silence", 0.0 audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16) if audio_int16.size == 0: return "Silence", 0.0 audio_float = audio_int16.astype(np.float32) / 32768.0 rms = float(np.sqrt(np.mean(audio_float * audio_float))) # Typical speech RMS is ~0.02-0.05 at 16-bit normalized scale. # Normalize so threshold=0.5 roughly corresponds to ~0.025 RMS. probability = min(1.0, rms / 0.05) label = "Speech" if probability >= 0.5 else "Silence" return label, probability # Convert bytes to numpy array of int16 audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16) # Normalize to float32 (-1.0 to 1.0) audio_float = audio_int16.astype(np.float32) / 32768.0 # Add to buffer self.buffer = np.concatenate((self.buffer, audio_float)) # Process all complete chunks in the buffer processed_any = False while len(self.buffer) >= self.min_chunk_size: # Slice exactly 512 samples chunk = self.buffer[:self.min_chunk_size] self.buffer = self.buffer[self.min_chunk_size:] # Prepare inputs # Input tensor shape: [batch, samples] -> [1, 512] input_tensor = chunk.reshape(1, -1) # Run inference try: ort_inputs = { 'input': input_tensor, 'state': self._state, 'sr': self._sr } # Outputs: probability, state out, self._state = self.session.run(None, ort_inputs) # Get probability self.last_probability = float(out[0][0]) self.last_label = "Speech" if self.last_probability >= 0.5 else "Silence" processed_any = True except Exception as e: logger.error(f"VAD inference error: {e}") # Try to determine if it's an input name issue try: inputs = [x.name for x in self.session.get_inputs()] logger.error(f"Model expects inputs: {inputs}") except: pass return "Speech", 1.0 return self.last_label, self.last_probability def reset(self) -> None: """Reset VAD internal state.""" self._reset_state() self.buffer = np.array([], dtype=np.float32) self.last_label = "Silence" self.last_probability = 0.0 class VADProcessor: """ High-level VAD processor with state management. Tracks speech/silence state and emits events on transitions. """ def __init__(self, vad_model: SileroVAD, threshold: float = 0.5, silence_threshold_ms: int = 1000, min_speech_duration_ms: int = 250): """ Initialize VAD processor. Args: vad_model: Silero VAD model instance threshold: Speech detection threshold silence_threshold_ms: EOU silence threshold in ms (longer = one EOU across short pauses) min_speech_duration_ms: EOU min speech duration in ms (ignore very short noises) """ self.vad = vad_model self.threshold = threshold self._eou_silence_ms = silence_threshold_ms self._eou_min_speech_ms = min_speech_duration_ms self.is_speaking = False self.speech_start_time: Optional[float] = None self.silence_start_time: Optional[float] = None self.eou_detector = EouDetector(silence_threshold_ms, min_speech_duration_ms) def process(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Optional[Tuple[str, float]]: """ Process audio chunk and detect state changes. Args: pcm_bytes: PCM audio data chunk_size_ms: Chunk duration in milliseconds Returns: Tuple of (event_type, probability) if state changed, None otherwise """ label, probability = self.vad.process_audio(pcm_bytes, chunk_size_ms) # Check if this is speech based on threshold is_speech = probability >= self.threshold # Check EOU if self.eou_detector.process("Speech" if is_speech else "Silence"): return ("eou", probability) # State transition: Silence -> Speech if is_speech and not self.is_speaking: self.is_speaking = True self.speech_start_time = asyncio.get_event_loop().time() self.silence_start_time = None return ("speaking", probability) # State transition: Speech -> Silence elif not is_speech and self.is_speaking: self.is_speaking = False self.silence_start_time = asyncio.get_event_loop().time() self.speech_start_time = None return ("silence", probability) return None def reset(self) -> None: """Reset VAD state.""" self.vad.reset() self.is_speaking = False self.speech_start_time = None self.silence_start_time = None self.eou_detector = EouDetector(self._eou_silence_ms, self._eou_min_speech_ms)