Automated via ruff UP006, UP007, UP035, UP045 rules (target: py311): - Replace `typing.List`, `Dict`, `Tuple`, `Set`, `FrozenSet`, `Type` with their built-in equivalents (`list`, `dict`, `tuple`, etc.) - Replace `typing.Optional[X]` with `X | None` - Replace `typing.Union[X, Y]` with `X | Y` - Move `Mapping`, `Sequence`, `Callable`, `Awaitable`, `MutableMapping`, `MutableSequence`, `Iterator`, `AsyncIterator`, `AsyncGenerator` imports from `typing` to `collections.abc` - Remove now-unused `typing` imports - Add `from __future__ import annotations` to 5 files that use forward-reference strings in `X | "Y"` annotations
150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
#
|
|
# Copyright (c) 2024-2026, Daily
|
|
#
|
|
# SPDX-License-Identifier: BSD 2-Clause License
|
|
#
|
|
|
|
import unittest
|
|
|
|
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
|
|
from pipecat.frames.frames import (
|
|
InputAudioRawFrame,
|
|
SpeechControlParamsFrame,
|
|
UserSpeakingFrame,
|
|
VADUserStartedSpeakingFrame,
|
|
VADUserStoppedSpeakingFrame,
|
|
)
|
|
from pipecat.processors.audio.vad_processor import VADProcessor
|
|
from pipecat.tests.utils import run_test
|
|
|
|
|
|
class MockVADAnalyzer(VADAnalyzer):
|
|
"""A mock VAD analyzer that returns states from a predefined sequence."""
|
|
|
|
def __init__(self, states: list[VADState]):
|
|
super().__init__(sample_rate=16000)
|
|
self._states = list(states)
|
|
self._call_index = 0
|
|
|
|
def num_frames_required(self) -> int:
|
|
return 512
|
|
|
|
def voice_confidence(self, buffer: bytes) -> float:
|
|
return 0.9
|
|
|
|
async def analyze_audio(self, buffer: bytes) -> VADState:
|
|
if self._call_index < len(self._states):
|
|
state = self._states[self._call_index]
|
|
self._call_index += 1
|
|
return state
|
|
return VADState.QUIET
|
|
|
|
|
|
class TestVADProcessor(unittest.IsolatedAsyncioTestCase):
|
|
def _make_audio_frame(self):
|
|
return InputAudioRawFrame(audio=b"\x00" * 1024, sample_rate=16000, num_channels=1)
|
|
|
|
async def test_forwards_audio_frames(self):
|
|
"""Test that audio frames are forwarded downstream."""
|
|
analyzer = MockVADAnalyzer([VADState.QUIET])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame()],
|
|
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame],
|
|
)
|
|
|
|
async def test_pushes_started_speaking_frame(self):
|
|
"""Test that VADUserStartedSpeakingFrame is pushed when speech starts."""
|
|
analyzer = MockVADAnalyzer([VADState.QUIET, VADState.SPEAKING])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
# Audio frames are forwarded first, then VAD processes and broadcasts VAD frames
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
|
|
expected_down_frames=[
|
|
SpeechControlParamsFrame,
|
|
InputAudioRawFrame,
|
|
InputAudioRawFrame,
|
|
VADUserStartedSpeakingFrame,
|
|
UserSpeakingFrame,
|
|
],
|
|
)
|
|
|
|
async def test_pushes_stopped_speaking_frame(self):
|
|
"""Test that VADUserStoppedSpeakingFrame is pushed when speech stops."""
|
|
analyzer = MockVADAnalyzer([VADState.SPEAKING, VADState.QUIET])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
# Audio frames are forwarded first, then VAD processes and broadcasts VAD frames
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
|
|
expected_down_frames=[
|
|
SpeechControlParamsFrame,
|
|
InputAudioRawFrame,
|
|
VADUserStartedSpeakingFrame,
|
|
UserSpeakingFrame,
|
|
InputAudioRawFrame,
|
|
VADUserStoppedSpeakingFrame,
|
|
],
|
|
)
|
|
|
|
async def test_pushes_user_speaking_frame(self):
|
|
"""Test that UserSpeakingFrame is pushed while speaking."""
|
|
analyzer = MockVADAnalyzer([VADState.SPEAKING, VADState.SPEAKING])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
# Audio frames are forwarded first, then VAD processes and broadcasts VAD frames
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
|
|
expected_down_frames=[
|
|
SpeechControlParamsFrame,
|
|
InputAudioRawFrame,
|
|
VADUserStartedSpeakingFrame,
|
|
UserSpeakingFrame,
|
|
InputAudioRawFrame,
|
|
UserSpeakingFrame,
|
|
],
|
|
)
|
|
|
|
async def test_no_vad_frames_on_starting_state(self):
|
|
"""Test that STARTING state doesn't push VAD frames."""
|
|
analyzer = MockVADAnalyzer([VADState.STARTING])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame()],
|
|
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame],
|
|
)
|
|
|
|
async def test_no_vad_frames_on_stopping_state(self):
|
|
"""Test that STOPPING state doesn't push VAD frames."""
|
|
analyzer = MockVADAnalyzer([VADState.STOPPING])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame()],
|
|
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame],
|
|
)
|
|
|
|
async def test_no_vad_frames_when_quiet(self):
|
|
"""Test that no VAD frames are pushed when staying quiet."""
|
|
analyzer = MockVADAnalyzer([VADState.QUIET, VADState.QUIET])
|
|
processor = VADProcessor(vad_analyzer=analyzer)
|
|
|
|
await run_test(
|
|
processor,
|
|
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
|
|
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame, InputAudioRawFrame],
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|