Files
pipecat/tests/test_vad_processor.py
Aleix Conchillo Flaqué b3bb6fdaa5 Modernize Python typing across the codebase
Automated via ruff UP006, UP007, UP035, UP045 rules (target: py311):

- Replace `typing.List`, `Dict`, `Tuple`, `Set`, `FrozenSet`, `Type`
  with their built-in equivalents (`list`, `dict`, `tuple`, etc.)
- Replace `typing.Optional[X]` with `X | None`
- Replace `typing.Union[X, Y]` with `X | Y`
- Move `Mapping`, `Sequence`, `Callable`, `Awaitable`,
  `MutableMapping`, `MutableSequence`, `Iterator`, `AsyncIterator`,
  `AsyncGenerator` imports from `typing` to `collections.abc`
- Remove now-unused `typing` imports
- Add `from __future__ import annotations` to 5 files that use
  forward-reference strings in `X | "Y"` annotations
2026-04-16 09:28:23 -07:00

150 lines
5.3 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
InputAudioRawFrame,
SpeechControlParamsFrame,
UserSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.audio.vad_processor import VADProcessor
from pipecat.tests.utils import run_test
class MockVADAnalyzer(VADAnalyzer):
"""A mock VAD analyzer that returns states from a predefined sequence."""
def __init__(self, states: list[VADState]):
super().__init__(sample_rate=16000)
self._states = list(states)
self._call_index = 0
def num_frames_required(self) -> int:
return 512
def voice_confidence(self, buffer: bytes) -> float:
return 0.9
async def analyze_audio(self, buffer: bytes) -> VADState:
if self._call_index < len(self._states):
state = self._states[self._call_index]
self._call_index += 1
return state
return VADState.QUIET
class TestVADProcessor(unittest.IsolatedAsyncioTestCase):
def _make_audio_frame(self):
return InputAudioRawFrame(audio=b"\x00" * 1024, sample_rate=16000, num_channels=1)
async def test_forwards_audio_frames(self):
"""Test that audio frames are forwarded downstream."""
analyzer = MockVADAnalyzer([VADState.QUIET])
processor = VADProcessor(vad_analyzer=analyzer)
await run_test(
processor,
frames_to_send=[self._make_audio_frame()],
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame],
)
async def test_pushes_started_speaking_frame(self):
"""Test that VADUserStartedSpeakingFrame is pushed when speech starts."""
analyzer = MockVADAnalyzer([VADState.QUIET, VADState.SPEAKING])
processor = VADProcessor(vad_analyzer=analyzer)
# Audio frames are forwarded first, then VAD processes and broadcasts VAD frames
await run_test(
processor,
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
expected_down_frames=[
SpeechControlParamsFrame,
InputAudioRawFrame,
InputAudioRawFrame,
VADUserStartedSpeakingFrame,
UserSpeakingFrame,
],
)
async def test_pushes_stopped_speaking_frame(self):
"""Test that VADUserStoppedSpeakingFrame is pushed when speech stops."""
analyzer = MockVADAnalyzer([VADState.SPEAKING, VADState.QUIET])
processor = VADProcessor(vad_analyzer=analyzer)
# Audio frames are forwarded first, then VAD processes and broadcasts VAD frames
await run_test(
processor,
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
expected_down_frames=[
SpeechControlParamsFrame,
InputAudioRawFrame,
VADUserStartedSpeakingFrame,
UserSpeakingFrame,
InputAudioRawFrame,
VADUserStoppedSpeakingFrame,
],
)
async def test_pushes_user_speaking_frame(self):
"""Test that UserSpeakingFrame is pushed while speaking."""
analyzer = MockVADAnalyzer([VADState.SPEAKING, VADState.SPEAKING])
processor = VADProcessor(vad_analyzer=analyzer)
# Audio frames are forwarded first, then VAD processes and broadcasts VAD frames
await run_test(
processor,
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
expected_down_frames=[
SpeechControlParamsFrame,
InputAudioRawFrame,
VADUserStartedSpeakingFrame,
UserSpeakingFrame,
InputAudioRawFrame,
UserSpeakingFrame,
],
)
async def test_no_vad_frames_on_starting_state(self):
"""Test that STARTING state doesn't push VAD frames."""
analyzer = MockVADAnalyzer([VADState.STARTING])
processor = VADProcessor(vad_analyzer=analyzer)
await run_test(
processor,
frames_to_send=[self._make_audio_frame()],
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame],
)
async def test_no_vad_frames_on_stopping_state(self):
"""Test that STOPPING state doesn't push VAD frames."""
analyzer = MockVADAnalyzer([VADState.STOPPING])
processor = VADProcessor(vad_analyzer=analyzer)
await run_test(
processor,
frames_to_send=[self._make_audio_frame()],
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame],
)
async def test_no_vad_frames_when_quiet(self):
"""Test that no VAD frames are pushed when staying quiet."""
analyzer = MockVADAnalyzer([VADState.QUIET, VADState.QUIET])
processor = VADProcessor(vad_analyzer=analyzer)
await run_test(
processor,
frames_to_send=[self._make_audio_frame(), self._make_audio_frame()],
expected_down_frames=[SpeechControlParamsFrame, InputAudioRawFrame, InputAudioRawFrame],
)
if __name__ == "__main__":
unittest.main()