"""Voice Activity Detection using Silero VAD.""" import asyncio import os from typing import Tuple, Optional import numpy as np from loguru import logger # Try to import onnxruntime (optional for VAD functionality) try: import onnxruntime as ort ONNX_AVAILABLE = True except ImportError: ONNX_AVAILABLE = False ort = None logger.warning("onnxruntime not available - VAD will be disabled") class SileroVAD: """ Voice Activity Detection using Silero VAD model. Detects speech in audio chunks using the Silero VAD ONNX model. Returns "Speech" or "Silence" for each audio chunk. """ def __init__(self, model_path: str = "data/vad/silero_vad.onnx", sample_rate: int = 16000): """ Initialize Silero VAD. Args: model_path: Path to Silero VAD ONNX model sample_rate: Audio sample rate (must be 16kHz for Silero VAD) """ self.sample_rate = sample_rate self.model_path = model_path # Check if model exists if not os.path.exists(model_path): logger.warning(f"VAD model not found at {model_path}. VAD will be disabled.") self.session = None return # Check if onnxruntime is available if not ONNX_AVAILABLE: logger.warning("onnxruntime not available - VAD will be disabled") self.session = None return # Load ONNX model try: self.session = ort.InferenceSession(model_path) logger.info(f"Loaded Silero VAD model from {model_path}") except Exception as e: logger.error(f"Failed to load VAD model: {e}") self.session = None return # Internal state for VAD self._reset_state() self.buffer = np.array([], dtype=np.float32) self.min_chunk_size = 512 self.last_label = "Silence" self.last_probability = 0.0 self._energy_noise_floor = 1e-4 def _reset_state(self): # Silero VAD V4+ expects state shape [2, 1, 128] self._state = np.zeros((2, 1, 128), dtype=np.float32) self._sr = np.array([self.sample_rate], dtype=np.int64) def process_audio(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Tuple[str, float]: """ Process audio chunk and detect speech. Args: pcm_bytes: PCM audio data (16-bit, mono, 16kHz) chunk_size_ms: Chunk duration in milliseconds (ignored for buffering logic) Returns: Tuple of (label, probability) where label is "Speech" or "Silence" """ if self.session is None or not ONNX_AVAILABLE: # Fallback energy-based VAD with adaptive noise floor. if not pcm_bytes: return "Silence", 0.0 audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16) if audio_int16.size == 0: return "Silence", 0.0 audio_float = audio_int16.astype(np.float32) / 32768.0 rms = float(np.sqrt(np.mean(audio_float * audio_float))) # Update adaptive noise floor (slowly rises, faster to fall) if rms < self._energy_noise_floor: self._energy_noise_floor = 0.95 * self._energy_noise_floor + 0.05 * rms else: self._energy_noise_floor = 0.995 * self._energy_noise_floor + 0.005 * rms # Compute SNR-like ratio and map to probability denom = max(self._energy_noise_floor, 1e-6) snr = max(0.0, (rms - denom) / denom) probability = min(1.0, snr / 3.0) # ~3x above noise => strong speech label = "Speech" if probability >= 0.5 else "Silence" return label, probability # Convert bytes to numpy array of int16 audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16) # Normalize to float32 (-1.0 to 1.0) audio_float = audio_int16.astype(np.float32) / 32768.0 # Add to buffer self.buffer = np.concatenate((self.buffer, audio_float)) # Process all complete chunks in the buffer processed_any = False while len(self.buffer) >= self.min_chunk_size: # Slice exactly 512 samples chunk = self.buffer[:self.min_chunk_size] self.buffer = self.buffer[self.min_chunk_size:] # Prepare inputs # Input tensor shape: [batch, samples] -> [1, 512] input_tensor = chunk.reshape(1, -1) # Run inference try: ort_inputs = { 'input': input_tensor, 'state': self._state, 'sr': self._sr } # Outputs: probability, state out, self._state = self.session.run(None, ort_inputs) # Get probability self.last_probability = float(out[0][0]) self.last_label = "Speech" if self.last_probability >= 0.5 else "Silence" processed_any = True except Exception as e: logger.error(f"VAD inference error: {e}") # Try to determine if it's an input name issue try: inputs = [x.name for x in self.session.get_inputs()] logger.error(f"Model expects inputs: {inputs}") except: pass return "Speech", 1.0 return self.last_label, self.last_probability def reset(self) -> None: """Reset VAD internal state.""" self._reset_state() self.buffer = np.array([], dtype=np.float32) self.last_label = "Silence" self.last_probability = 0.0 class VADProcessor: """ High-level VAD processor with state management. Tracks speech/silence state and emits events on transitions. """ def __init__(self, vad_model: SileroVAD, threshold: float = 0.5): """ Initialize VAD processor. Args: vad_model: Silero VAD model instance threshold: Speech detection threshold """ self.vad = vad_model self.threshold = threshold self.is_speaking = False self.speech_start_time: Optional[float] = None self.silence_start_time: Optional[float] = None def process(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Optional[Tuple[str, float]]: """ Process audio chunk and detect state changes. Args: pcm_bytes: PCM audio data chunk_size_ms: Chunk duration in milliseconds Returns: Tuple of (event_type, probability) if state changed, None otherwise """ label, probability = self.vad.process_audio(pcm_bytes, chunk_size_ms) # Check if this is speech based on threshold is_speech = probability >= self.threshold # State transition: Silence -> Speech if is_speech and not self.is_speaking: self.is_speaking = True self.speech_start_time = asyncio.get_event_loop().time() self.silence_start_time = None return ("speaking", probability) # State transition: Speech -> Silence elif not is_speech and self.is_speaking: self.is_speaking = False self.silence_start_time = asyncio.get_event_loop().time() self.speech_start_time = None return ("silence", probability) return None def reset(self) -> None: """Reset VAD state.""" self.vad.reset() self.is_speaking = False self.speech_start_time = None self.silence_start_time = None