From 3077395ffcb4505a629eafbcc55d02acfa1c508e Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 20 Apr 2026 10:01:42 -0400 Subject: [PATCH] Add VADUserTurnStopStrategy for VAD-only realtime pipelines Provides a dedicated stop strategy that triggers on VADUserStoppedSpeakingFrame without requiring a turn analyzer or transcriptions. Complements the existing VADTurnAnalyzerUserTurnStopStrategy and matches the one-per-modality convention already used in user_start/. --- changelog/4199.added.2.md | 1 + changelog/4199.fixed.md | 1 - ...d_turn_analyzer_user_turn_stop_strategy.py | 4 +- .../user_stop/vad_user_turn_stop_strategy.py | 43 ++++++++++ ...d_turn_analyzer_user_turn_stop_strategy.py | 5 +- tests/test_vad_user_turn_stop_strategy.py | 79 +++++++++++++++++++ 6 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 changelog/4199.added.2.md delete mode 100644 changelog/4199.fixed.md create mode 100644 src/pipecat/turns/user_stop/vad_user_turn_stop_strategy.py create mode 100644 tests/test_vad_user_turn_stop_strategy.py diff --git a/changelog/4199.added.2.md b/changelog/4199.added.2.md new file mode 100644 index 000000000..8f7f8e72c --- /dev/null +++ b/changelog/4199.added.2.md @@ -0,0 +1 @@ +- Added `VADUserTurnStopStrategy`, a VAD-only turn stop strategy that triggers the end of a user turn as soon as VAD reports the user stopped speaking. Intended for realtime speech-to-speech pipelines that rely solely on VAD and don't use a turn analyzer or STT transcriptions to decide end of turn. diff --git a/changelog/4199.fixed.md b/changelog/4199.fixed.md deleted file mode 100644 index c20c6f4ad..000000000 --- a/changelog/4199.fixed.md +++ /dev/null @@ -1 +0,0 @@ -- Fixed `GeminiLiveLLMService` not responding to user speaking events when using local VAD turn management. The service was listening for `VADUserStartedSpeakingFrame`/`VADUserStoppedSpeakingFrame` instead of `UserStartedSpeakingFrame`/`UserStoppedSpeakingFrame`, which are the frames emitted by the user turn controller. diff --git a/src/pipecat/turns/user_stop/vad_turn_analyzer_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/vad_turn_analyzer_user_turn_stop_strategy.py index 03f868aae..3167ebda0 100644 --- a/src/pipecat/turns/user_stop/vad_turn_analyzer_user_turn_stop_strategy.py +++ b/src/pipecat/turns/user_stop/vad_turn_analyzer_user_turn_stop_strategy.py @@ -12,8 +12,6 @@ indicates COMPLETE, making it suitable for speech-to-speech pipelines where transcriptions arrive too late to be useful for turn decisions. """ -from typing import Optional - from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState from pipecat.frames.frames import ( Frame, @@ -126,7 +124,7 @@ class VADTurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy): if state == EndOfTurnState.COMPLETE: await self.trigger_user_turn_stopped() - async def _handle_prediction_result(self, result: Optional[MetricsData]): + async def _handle_prediction_result(self, result: MetricsData | None): """Handle a prediction result event from the turn analyzer.""" if result: await self.push_frame(MetricsFrame(data=[result])) diff --git a/src/pipecat/turns/user_stop/vad_user_turn_stop_strategy.py b/src/pipecat/turns/user_stop/vad_user_turn_stop_strategy.py new file mode 100644 index 000000000..39079cc66 --- /dev/null +++ b/src/pipecat/turns/user_stop/vad_user_turn_stop_strategy.py @@ -0,0 +1,43 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn stop strategy based on VAD events only. + +This strategy triggers the end of a user turn immediately when VAD indicates +the user has stopped speaking. It does not use a turn analyzer or +transcriptions, making it suitable for realtime speech-to-speech pipelines +that rely solely on VAD for turn detection. +""" + +from pipecat.frames.frames import Frame, VADUserStoppedSpeakingFrame +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy + + +class VADUserTurnStopStrategy(BaseUserTurnStopStrategy): + """User turn stop strategy based on VAD (Voice Activity Detection). + + This strategy triggers the end of a user turn as soon as a VAD frame + indicates the user has stopped speaking. It is intended for realtime + speech-to-speech pipelines where neither a turn analyzer nor STT + transcriptions are used to decide end of turn. + """ + + async def process_frame(self, frame: Frame) -> ProcessFrameResult: + """Process an incoming frame to detect user turn stop. + + Args: + frame: The frame to be analyzed. + + Returns: + Always returns CONTINUE so subsequent stop strategies are evaluated. + """ + await super().process_frame(frame) + + if isinstance(frame, VADUserStoppedSpeakingFrame): + await self.trigger_user_turn_stopped() + + return ProcessFrameResult.CONTINUE diff --git a/tests/test_vad_turn_analyzer_user_turn_stop_strategy.py b/tests/test_vad_turn_analyzer_user_turn_stop_strategy.py index add762b0d..e7e1646a3 100644 --- a/tests/test_vad_turn_analyzer_user_turn_stop_strategy.py +++ b/tests/test_vad_turn_analyzer_user_turn_stop_strategy.py @@ -7,7 +7,6 @@ """Tests for VADTurnAnalyzerUserTurnStopStrategy.""" import unittest -from typing import Optional, Tuple from unittest.mock import AsyncMock import pytest @@ -40,7 +39,7 @@ class MockTurnAnalyzer(BaseTurnAnalyzer): super().__init__() self._append_audio_state = EndOfTurnState.INCOMPLETE self._analyze_state = EndOfTurnState.INCOMPLETE - self._prediction: Optional[MetricsData] = None + self._prediction: MetricsData | None = None self._speech_triggered = False @property @@ -54,7 +53,7 @@ class MockTurnAnalyzer(BaseTurnAnalyzer): def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState: return self._append_audio_state - async def analyze_end_of_turn(self) -> Tuple[EndOfTurnState, Optional[MetricsData]]: + async def analyze_end_of_turn(self) -> tuple[EndOfTurnState, MetricsData | None]: return self._analyze_state, self._prediction def update_vad_start_secs(self, vad_start_secs: float): diff --git a/tests/test_vad_user_turn_stop_strategy.py b/tests/test_vad_user_turn_stop_strategy.py new file mode 100644 index 000000000..7b840943b --- /dev/null +++ b/tests/test_vad_user_turn_stop_strategy.py @@ -0,0 +1,79 @@ +# +# Copyright (c) 2024-2026, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Tests for VADUserTurnStopStrategy.""" + +import unittest + +from pipecat.frames.frames import ( + InputAudioRawFrame, + TranscriptionFrame, + VADUserStartedSpeakingFrame, + VADUserStoppedSpeakingFrame, +) +from pipecat.turns.types import ProcessFrameResult +from pipecat.turns.user_stop.vad_user_turn_stop_strategy import VADUserTurnStopStrategy +from pipecat.utils.asyncio.task_manager import TaskManager + + +class TestVADUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.strategy = VADUserTurnStopStrategy() + self.task_manager = TaskManager() + await self.strategy.setup(self.task_manager) + + self.turn_stopped_called = False + + @self.strategy.event_handler("on_user_turn_stopped") + async def on_user_turn_stopped(strategy, params): + self.turn_stopped_called = True + + async def asyncTearDown(self): + await self.strategy.cleanup() + + async def test_vad_stop_triggers_user_turn_stopped(self): + """VADUserStoppedSpeakingFrame should trigger user turn stopped.""" + await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3)) + + assert self.turn_stopped_called + + async def test_vad_start_does_not_trigger(self): + """VADUserStartedSpeakingFrame should not trigger user turn stopped.""" + await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2)) + + assert not self.turn_stopped_called + + async def test_transcription_frames_ignored(self): + """TranscriptionFrame should not trigger user turn stopped.""" + await self.strategy.process_frame( + TranscriptionFrame(text="hello", user_id="user1", timestamp="now") + ) + + assert not self.turn_stopped_called + + async def test_input_audio_frames_ignored(self): + """InputAudioRawFrame should not trigger user turn stopped.""" + await self.strategy.process_frame( + InputAudioRawFrame(audio=b"\x00" * 320, sample_rate=16000, num_channels=1) + ) + + assert not self.turn_stopped_called + + async def test_process_frame_returns_continue(self): + """process_frame should always return CONTINUE.""" + result = await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3)) + assert result == ProcessFrameResult.CONTINUE + + result = await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2)) + assert result == ProcessFrameResult.CONTINUE + + async def test_reset_does_not_crash(self): + """reset() should complete without error.""" + await self.strategy.reset() + + +if __name__ == "__main__": + unittest.main()