Add VADUserTurnStopStrategy for VAD-only realtime pipelines

Provides a dedicated stop strategy that triggers on VADUserStoppedSpeakingFrame
without requiring a turn analyzer or transcriptions. Complements the existing
VADTurnAnalyzerUserTurnStopStrategy and matches the one-per-modality convention
already used in user_start/.
This commit is contained in:
Mark Backman
2026-04-20 10:01:42 -04:00
parent 2e5e109bb6
commit 3077395ffc
6 changed files with 126 additions and 7 deletions

View File

@@ -0,0 +1 @@
- Added `VADUserTurnStopStrategy`, a VAD-only turn stop strategy that triggers the end of a user turn as soon as VAD reports the user stopped speaking. Intended for realtime speech-to-speech pipelines that rely solely on VAD and don't use a turn analyzer or STT transcriptions to decide end of turn.

View File

@@ -1 +0,0 @@
- Fixed `GeminiLiveLLMService` not responding to user speaking events when using local VAD turn management. The service was listening for `VADUserStartedSpeakingFrame`/`VADUserStoppedSpeakingFrame` instead of `UserStartedSpeakingFrame`/`UserStoppedSpeakingFrame`, which are the frames emitted by the user turn controller.

View File

@@ -12,8 +12,6 @@ indicates COMPLETE, making it suitable for speech-to-speech pipelines
where transcriptions arrive too late to be useful for turn decisions.
"""
from typing import Optional
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.frames.frames import (
Frame,
@@ -126,7 +124,7 @@ class VADTurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
if state == EndOfTurnState.COMPLETE:
await self.trigger_user_turn_stopped()
async def _handle_prediction_result(self, result: Optional[MetricsData]):
async def _handle_prediction_result(self, result: MetricsData | None):
"""Handle a prediction result event from the turn analyzer."""
if result:
await self.push_frame(MetricsFrame(data=[result]))

View File

@@ -0,0 +1,43 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn stop strategy based on VAD events only.
This strategy triggers the end of a user turn immediately when VAD indicates
the user has stopped speaking. It does not use a turn analyzer or
transcriptions, making it suitable for realtime speech-to-speech pipelines
that rely solely on VAD for turn detection.
"""
from pipecat.frames.frames import Frame, VADUserStoppedSpeakingFrame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
class VADUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""User turn stop strategy based on VAD (Voice Activity Detection).
This strategy triggers the end of a user turn as soon as a VAD frame
indicates the user has stopped speaking. It is intended for realtime
speech-to-speech pipelines where neither a turn analyzer nor STT
transcriptions are used to decide end of turn.
"""
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect user turn stop.
Args:
frame: The frame to be analyzed.
Returns:
Always returns CONTINUE so subsequent stop strategies are evaluated.
"""
await super().process_frame(frame)
if isinstance(frame, VADUserStoppedSpeakingFrame):
await self.trigger_user_turn_stopped()
return ProcessFrameResult.CONTINUE

View File

@@ -7,7 +7,6 @@
"""Tests for VADTurnAnalyzerUserTurnStopStrategy."""
import unittest
from typing import Optional, Tuple
from unittest.mock import AsyncMock
import pytest
@@ -40,7 +39,7 @@ class MockTurnAnalyzer(BaseTurnAnalyzer):
super().__init__()
self._append_audio_state = EndOfTurnState.INCOMPLETE
self._analyze_state = EndOfTurnState.INCOMPLETE
self._prediction: Optional[MetricsData] = None
self._prediction: MetricsData | None = None
self._speech_triggered = False
@property
@@ -54,7 +53,7 @@ class MockTurnAnalyzer(BaseTurnAnalyzer):
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
return self._append_audio_state
async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]:
async def analyze_end_of_turn(self) -> tuple[EndOfTurnState, MetricsData | None]:
return self._analyze_state, self._prediction
def update_vad_start_secs(self, vad_start_secs: float):

View File

@@ -0,0 +1,79 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Tests for VADUserTurnStopStrategy."""
import unittest
from pipecat.frames.frames import (
InputAudioRawFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.vad_user_turn_stop_strategy import VADUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import TaskManager
class TestVADUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.strategy = VADUserTurnStopStrategy()
self.task_manager = TaskManager()
await self.strategy.setup(self.task_manager)
self.turn_stopped_called = False
@self.strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
self.turn_stopped_called = True
async def asyncTearDown(self):
await self.strategy.cleanup()
async def test_vad_stop_triggers_user_turn_stopped(self):
"""VADUserStoppedSpeakingFrame should trigger user turn stopped."""
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert self.turn_stopped_called
async def test_vad_start_does_not_trigger(self):
"""VADUserStartedSpeakingFrame should not trigger user turn stopped."""
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert not self.turn_stopped_called
async def test_transcription_frames_ignored(self):
"""TranscriptionFrame should not trigger user turn stopped."""
await self.strategy.process_frame(
TranscriptionFrame(text="hello", user_id="user1", timestamp="now")
)
assert not self.turn_stopped_called
async def test_input_audio_frames_ignored(self):
"""InputAudioRawFrame should not trigger user turn stopped."""
await self.strategy.process_frame(
InputAudioRawFrame(audio=b"\x00" * 320, sample_rate=16000, num_channels=1)
)
assert not self.turn_stopped_called
async def test_process_frame_returns_continue(self):
"""process_frame should always return CONTINUE."""
result = await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert result == ProcessFrameResult.CONTINUE
result = await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert result == ProcessFrameResult.CONTINUE
async def test_reset_does_not_crash(self):
"""reset() should complete without error."""
await self.strategy.reset()
if __name__ == "__main__":
unittest.main()