222 lines
7.6 KiB
Python
222 lines
7.6 KiB
Python
"""Voice Activity Detection using Silero VAD."""
|
|
|
|
import asyncio
|
|
import os
|
|
from typing import Tuple, Optional
|
|
import numpy as np
|
|
from loguru import logger
|
|
|
|
|
|
# Try to import onnxruntime (optional for VAD functionality)
|
|
try:
|
|
import onnxruntime as ort
|
|
ONNX_AVAILABLE = True
|
|
except ImportError:
|
|
ONNX_AVAILABLE = False
|
|
ort = None
|
|
logger.warning("onnxruntime not available - VAD will be disabled")
|
|
|
|
|
|
class SileroVAD:
|
|
"""
|
|
Voice Activity Detection using Silero VAD model.
|
|
|
|
Detects speech in audio chunks using the Silero VAD ONNX model.
|
|
Returns "Speech" or "Silence" for each audio chunk.
|
|
"""
|
|
|
|
def __init__(self, model_path: str = "data/vad/silero_vad.onnx", sample_rate: int = 16000):
|
|
"""
|
|
Initialize Silero VAD.
|
|
|
|
Args:
|
|
model_path: Path to Silero VAD ONNX model
|
|
sample_rate: Audio sample rate (must be 16kHz for Silero VAD)
|
|
"""
|
|
self.sample_rate = sample_rate
|
|
self.model_path = model_path
|
|
|
|
# Check if model exists
|
|
if not os.path.exists(model_path):
|
|
logger.warning(f"VAD model not found at {model_path}. VAD will be disabled.")
|
|
self.session = None
|
|
return
|
|
|
|
# Check if onnxruntime is available
|
|
if not ONNX_AVAILABLE:
|
|
logger.warning("onnxruntime not available - VAD will be disabled")
|
|
self.session = None
|
|
return
|
|
|
|
# Load ONNX model
|
|
try:
|
|
self.session = ort.InferenceSession(model_path)
|
|
logger.info(f"Loaded Silero VAD model from {model_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load VAD model: {e}")
|
|
self.session = None
|
|
return
|
|
|
|
# Internal state for VAD
|
|
self._reset_state()
|
|
self.buffer = np.array([], dtype=np.float32)
|
|
self.min_chunk_size = 512
|
|
self.last_label = "Silence"
|
|
self.last_probability = 0.0
|
|
self._energy_noise_floor = 1e-4
|
|
|
|
def _reset_state(self):
|
|
# Silero VAD V4+ expects state shape [2, 1, 128]
|
|
self._state = np.zeros((2, 1, 128), dtype=np.float32)
|
|
self._sr = np.array([self.sample_rate], dtype=np.int64)
|
|
|
|
def process_audio(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Tuple[str, float]:
|
|
"""
|
|
Process audio chunk and detect speech.
|
|
|
|
Args:
|
|
pcm_bytes: PCM audio data (16-bit, mono, 16kHz)
|
|
chunk_size_ms: Chunk duration in milliseconds (ignored for buffering logic)
|
|
|
|
Returns:
|
|
Tuple of (label, probability) where label is "Speech" or "Silence"
|
|
"""
|
|
if self.session is None or not ONNX_AVAILABLE:
|
|
# Fallback energy-based VAD with adaptive noise floor.
|
|
if not pcm_bytes:
|
|
return "Silence", 0.0
|
|
audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16)
|
|
if audio_int16.size == 0:
|
|
return "Silence", 0.0
|
|
audio_float = audio_int16.astype(np.float32) / 32768.0
|
|
rms = float(np.sqrt(np.mean(audio_float * audio_float)))
|
|
|
|
# Update adaptive noise floor (slowly rises, faster to fall)
|
|
if rms < self._energy_noise_floor:
|
|
self._energy_noise_floor = 0.95 * self._energy_noise_floor + 0.05 * rms
|
|
else:
|
|
self._energy_noise_floor = 0.995 * self._energy_noise_floor + 0.005 * rms
|
|
|
|
# Compute SNR-like ratio and map to probability
|
|
denom = max(self._energy_noise_floor, 1e-6)
|
|
snr = max(0.0, (rms - denom) / denom)
|
|
probability = min(1.0, snr / 3.0) # ~3x above noise => strong speech
|
|
label = "Speech" if probability >= 0.5 else "Silence"
|
|
return label, probability
|
|
|
|
# Convert bytes to numpy array of int16
|
|
audio_int16 = np.frombuffer(pcm_bytes, dtype=np.int16)
|
|
|
|
# Normalize to float32 (-1.0 to 1.0)
|
|
audio_float = audio_int16.astype(np.float32) / 32768.0
|
|
|
|
# Add to buffer
|
|
self.buffer = np.concatenate((self.buffer, audio_float))
|
|
|
|
# Process all complete chunks in the buffer
|
|
processed_any = False
|
|
while len(self.buffer) >= self.min_chunk_size:
|
|
# Slice exactly 512 samples
|
|
chunk = self.buffer[:self.min_chunk_size]
|
|
self.buffer = self.buffer[self.min_chunk_size:]
|
|
|
|
# Prepare inputs
|
|
# Input tensor shape: [batch, samples] -> [1, 512]
|
|
input_tensor = chunk.reshape(1, -1)
|
|
|
|
# Run inference
|
|
try:
|
|
ort_inputs = {
|
|
'input': input_tensor,
|
|
'state': self._state,
|
|
'sr': self._sr
|
|
}
|
|
|
|
# Outputs: probability, state
|
|
out, self._state = self.session.run(None, ort_inputs)
|
|
|
|
# Get probability
|
|
self.last_probability = float(out[0][0])
|
|
self.last_label = "Speech" if self.last_probability >= 0.5 else "Silence"
|
|
processed_any = True
|
|
|
|
except Exception as e:
|
|
logger.error(f"VAD inference error: {e}")
|
|
# Try to determine if it's an input name issue
|
|
try:
|
|
inputs = [x.name for x in self.session.get_inputs()]
|
|
logger.error(f"Model expects inputs: {inputs}")
|
|
except:
|
|
pass
|
|
return "Speech", 1.0
|
|
|
|
return self.last_label, self.last_probability
|
|
|
|
def reset(self) -> None:
|
|
"""Reset VAD internal state."""
|
|
self._reset_state()
|
|
self.buffer = np.array([], dtype=np.float32)
|
|
self.last_label = "Silence"
|
|
self.last_probability = 0.0
|
|
|
|
|
|
class VADProcessor:
|
|
"""
|
|
High-level VAD processor with state management.
|
|
|
|
Tracks speech/silence state and emits events on transitions.
|
|
"""
|
|
|
|
def __init__(self, vad_model: SileroVAD, threshold: float = 0.5):
|
|
"""
|
|
Initialize VAD processor.
|
|
|
|
Args:
|
|
vad_model: Silero VAD model instance
|
|
threshold: Speech detection threshold
|
|
"""
|
|
self.vad = vad_model
|
|
self.threshold = threshold
|
|
self.is_speaking = False
|
|
self.speech_start_time: Optional[float] = None
|
|
self.silence_start_time: Optional[float] = None
|
|
|
|
def process(self, pcm_bytes: bytes, chunk_size_ms: int = 20) -> Optional[Tuple[str, float]]:
|
|
"""
|
|
Process audio chunk and detect state changes.
|
|
|
|
Args:
|
|
pcm_bytes: PCM audio data
|
|
chunk_size_ms: Chunk duration in milliseconds
|
|
|
|
Returns:
|
|
Tuple of (event_type, probability) if state changed, None otherwise
|
|
"""
|
|
label, probability = self.vad.process_audio(pcm_bytes, chunk_size_ms)
|
|
|
|
# Check if this is speech based on threshold
|
|
is_speech = probability >= self.threshold
|
|
|
|
# State transition: Silence -> Speech
|
|
if is_speech and not self.is_speaking:
|
|
self.is_speaking = True
|
|
self.speech_start_time = asyncio.get_event_loop().time()
|
|
self.silence_start_time = None
|
|
return ("speaking", probability)
|
|
|
|
# State transition: Speech -> Silence
|
|
elif not is_speech and self.is_speaking:
|
|
self.is_speaking = False
|
|
self.silence_start_time = asyncio.get_event_loop().time()
|
|
self.speech_start_time = None
|
|
return ("silence", probability)
|
|
|
|
return None
|
|
|
|
def reset(self) -> None:
|
|
"""Reset VAD state."""
|
|
self.vad.reset()
|
|
self.is_speaking = False
|
|
self.speech_start_time = None
|
|
self.silence_start_time = None
|