Compare commits

...

3 Commits

Author SHA1 Message Date
Mark Backman
3077395ffc Add VADUserTurnStopStrategy for VAD-only realtime pipelines
Provides a dedicated stop strategy that triggers on VADUserStoppedSpeakingFrame
without requiring a turn analyzer or transcriptions. Complements the existing
VADTurnAnalyzerUserTurnStopStrategy and matches the one-per-modality convention
already used in user_start/.
2026-04-20 10:10:58 -04:00
Mark Backman
2e5e109bb6 Add changelog for #4199 2026-04-20 09:17:33 -04:00
Mark Backman
677ca04a18 Add VADTurnAnalyzerUserTurnStopStrategy for speech-to-speech pipelines
For speech-to-speech models like Gemini Live, audio goes directly to the
LLM and transcriptions arrive too late to be useful for turn decisions.
The existing TurnAnalyzerUserTurnStopStrategy waits for STT transcripts
before triggering end-of-turn, adding unnecessary latency.

This adds VADTurnAnalyzerUserTurnStopStrategy which triggers immediately
on turn analyzer COMPLETE without waiting for any STT transcript. Also
fixes the Gemini Live local VAD example to use UserStartedSpeakingFrame/
UserStoppedSpeakingFrame instead of VAD variants, since with local turn
management these are the frames that flow through the pipeline.
2026-04-20 09:17:30 -04:00
8 changed files with 480 additions and 1 deletions

View File

@@ -0,0 +1 @@
- Added `VADUserTurnStopStrategy`, a VAD-only turn stop strategy that triggers the end of a user turn as soon as VAD reports the user stopped speaking. Intended for realtime speech-to-speech pipelines that rely solely on VAD and don't use a turn analyzer or STT transcriptions to decide end of turn.

1
changelog/4199.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `VADTurnAnalyzerUserTurnStopStrategy`, a turn stop strategy that triggers immediately when the turn analyzer reports COMPLETE without waiting for STT transcriptions. This reduces end-of-turn latency for speech-to-speech pipelines (e.g. Gemini Live) where audio goes directly to the LLM.

View File

@@ -10,6 +10,7 @@ import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
@@ -28,6 +29,9 @@ from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService, Gemini
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_stop import VADTurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
@@ -73,6 +77,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[
VADTurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())
],
),
vad_analyzer=SileroVADAnalyzer(),
),
)

View File

@@ -8,11 +8,13 @@ from .base_user_turn_stop_strategy import BaseUserTurnStopStrategy, UserTurnStop
from .external_user_turn_stop_strategy import ExternalUserTurnStopStrategy
from .speech_timeout_user_turn_stop_strategy import SpeechTimeoutUserTurnStopStrategy
from .turn_analyzer_user_turn_stop_strategy import TurnAnalyzerUserTurnStopStrategy
from .vad_turn_analyzer_user_turn_stop_strategy import VADTurnAnalyzerUserTurnStopStrategy
__all__ = [
"BaseUserTurnStopStrategy",
"ExternalUserTurnStopStrategy",
"SpeechTimeoutUserTurnStopStrategy",
"UserTurnStoppedParams",
"TurnAnalyzerUserTurnStopStrategy",
"UserTurnStoppedParams",
"VADTurnAnalyzerUserTurnStopStrategy",
]

View File

@@ -0,0 +1,130 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn stop strategy based on VAD and turn detection analyzers.
This strategy uses a turn analyzer to detect end-of-turn but does not
require STT transcriptions. It triggers immediately when the turn analyzer
indicates COMPLETE, making it suitable for speech-to-speech pipelines
where transcriptions arrive too late to be useful for turn decisions.
"""
from pipecat.audio.turn.base_turn_analyzer import BaseTurnAnalyzer, EndOfTurnState
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
MetricsFrame,
SpeechControlParamsFrame,
StartFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import MetricsData
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class VADTurnAnalyzerUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""User turn stop strategy that uses a turn analyzer without waiting for transcriptions.
This strategy feeds audio and VAD frames to a turn detection model
(``BaseTurnAnalyzer``) and triggers immediately when the model indicates
the turn is complete. Unlike ``TurnAnalyzerUserTurnStopStrategy``, it does
not wait for STT transcriptions, making it ideal for speech-to-speech
pipelines (e.g. Gemini Live) where audio goes directly to the LLM.
The ``UserTurnController`` provides a safety-net timeout
(``user_turn_stop_timeout``, default 5s) if the turn analyzer never
returns COMPLETE.
"""
def __init__(self, *, turn_analyzer: BaseTurnAnalyzer, **kwargs):
"""Initialize the user turn stop strategy.
Args:
turn_analyzer: The turn detection analyzer instance to detect end of user turn.
**kwargs: Additional keyword arguments.
"""
super().__init__(**kwargs)
self._turn_analyzer = turn_analyzer
self._vad_user_speaking = False
async def reset(self):
"""Reset the strategy to its initial state."""
await super().reset()
self._vad_user_speaking = False
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
await super().setup(task_manager)
async def cleanup(self):
"""Cleanup the strategy."""
await super().cleanup()
await self._turn_analyzer.cleanup()
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to update the turn analyzer and strategy state.
Args:
frame: The frame to be analyzed.
Returns:
Always returns CONTINUE so subsequent stop strategies are evaluated.
"""
await super().process_frame(frame)
if isinstance(frame, StartFrame):
await self._start(frame)
elif isinstance(frame, VADUserStartedSpeakingFrame):
await self._handle_vad_user_started_speaking(frame)
elif isinstance(frame, VADUserStoppedSpeakingFrame):
await self._handle_vad_user_stopped_speaking(frame)
elif isinstance(frame, InputAudioRawFrame):
await self._handle_input_audio(frame)
return ProcessFrameResult.CONTINUE
async def _start(self, frame: StartFrame):
"""Process the start frame to configure the turn analyzer."""
self._turn_analyzer.set_sample_rate(frame.audio_in_sample_rate)
await self.broadcast_frame(SpeechControlParamsFrame, turn_params=self._turn_analyzer.params)
async def _handle_input_audio(self, frame: InputAudioRawFrame):
"""Handle input audio to check if the turn is completed."""
state = self._turn_analyzer.append_audio(frame.audio, self._vad_user_speaking)
# Streaming analyzers (e.g. KrispVivaTurn) detect turn completion
# frame-by-frame inside append_audio, so COMPLETE is returned here.
if state == EndOfTurnState.COMPLETE:
_, prediction = await self._turn_analyzer.analyze_end_of_turn()
await self._handle_prediction_result(prediction)
await self.trigger_user_turn_stopped()
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
"""Handle when the VAD indicates the user is speaking."""
self._turn_analyzer.update_vad_start_secs(frame.start_secs)
self._vad_user_speaking = True
async def _handle_vad_user_stopped_speaking(self, frame: VADUserStoppedSpeakingFrame):
"""Handle when the VAD indicates the user has stopped speaking."""
self._vad_user_speaking = False
state, prediction = await self._turn_analyzer.analyze_end_of_turn()
await self._handle_prediction_result(prediction)
if state == EndOfTurnState.COMPLETE:
await self.trigger_user_turn_stopped()
async def _handle_prediction_result(self, result: MetricsData | None):
"""Handle a prediction result event from the turn analyzer."""
if result:
await self.push_frame(MetricsFrame(data=[result]))

View File

@@ -0,0 +1,43 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn stop strategy based on VAD events only.
This strategy triggers the end of a user turn immediately when VAD indicates
the user has stopped speaking. It does not use a turn analyzer or
transcriptions, making it suitable for realtime speech-to-speech pipelines
that rely solely on VAD for turn detection.
"""
from pipecat.frames.frames import Frame, VADUserStoppedSpeakingFrame
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.base_user_turn_stop_strategy import BaseUserTurnStopStrategy
class VADUserTurnStopStrategy(BaseUserTurnStopStrategy):
"""User turn stop strategy based on VAD (Voice Activity Detection).
This strategy triggers the end of a user turn as soon as a VAD frame
indicates the user has stopped speaking. It is intended for realtime
speech-to-speech pipelines where neither a turn analyzer nor STT
transcriptions are used to decide end of turn.
"""
async def process_frame(self, frame: Frame) -> ProcessFrameResult:
"""Process an incoming frame to detect user turn stop.
Args:
frame: The frame to be analyzed.
Returns:
Always returns CONTINUE so subsequent stop strategies are evaluated.
"""
await super().process_frame(frame)
if isinstance(frame, VADUserStoppedSpeakingFrame):
await self.trigger_user_turn_stopped()
return ProcessFrameResult.CONTINUE

View File

@@ -0,0 +1,213 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Tests for VADTurnAnalyzerUserTurnStopStrategy."""
import unittest
from unittest.mock import AsyncMock
import pytest
from pipecat.audio.turn.base_turn_analyzer import (
BaseTurnAnalyzer,
BaseTurnParams,
EndOfTurnState,
)
from pipecat.frames.frames import (
InputAudioRawFrame,
MetricsFrame,
SpeechControlParamsFrame,
StartFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.metrics.metrics import MetricsData
from pipecat.turns.user_stop.vad_turn_analyzer_user_turn_stop_strategy import (
VADTurnAnalyzerUserTurnStopStrategy,
)
from pipecat.utils.asyncio.task_manager import TaskManager
class MockTurnAnalyzer(BaseTurnAnalyzer):
"""Mock turn analyzer for testing."""
def __init__(self):
super().__init__()
self._append_audio_state = EndOfTurnState.INCOMPLETE
self._analyze_state = EndOfTurnState.INCOMPLETE
self._prediction: MetricsData | None = None
self._speech_triggered = False
@property
def speech_triggered(self) -> bool:
return self._speech_triggered
@property
def params(self) -> BaseTurnParams:
return BaseTurnParams()
def append_audio(self, buffer: bytes, is_speech: bool) -> EndOfTurnState:
return self._append_audio_state
async def analyze_end_of_turn(self) -> tuple[EndOfTurnState, MetricsData | None]:
return self._analyze_state, self._prediction
def update_vad_start_secs(self, vad_start_secs: float):
pass
def clear(self):
pass
class TestVADTurnAnalyzerUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.analyzer = MockTurnAnalyzer()
self.strategy = VADTurnAnalyzerUserTurnStopStrategy(turn_analyzer=self.analyzer)
self.task_manager = TaskManager()
await self.strategy.setup(self.task_manager)
self.turn_stopped_called = False
@self.strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
self.turn_stopped_called = True
self.pushed_frames = []
@self.strategy.event_handler("on_push_frame")
async def on_push_frame(strategy, frame, direction):
self.pushed_frames.append(frame)
self.broadcast_frames = []
@self.strategy.event_handler("on_broadcast_frame")
async def on_broadcast_frame(strategy, frame_cls, **kwargs):
self.broadcast_frames.append((frame_cls, kwargs))
async def asyncTearDown(self):
await self.strategy.cleanup()
async def test_vad_stop_complete_triggers_immediately(self):
"""VAD stop with COMPLETE should trigger user turn stopped immediately."""
self.analyzer._analyze_state = EndOfTurnState.COMPLETE
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert self.turn_stopped_called
async def test_vad_stop_incomplete_does_not_trigger(self):
"""VAD stop with INCOMPLETE should not trigger user turn stopped."""
self.analyzer._analyze_state = EndOfTurnState.INCOMPLETE
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert not self.turn_stopped_called
async def test_streaming_complete_via_append_audio_triggers(self):
"""Streaming COMPLETE from append_audio should trigger immediately."""
self.analyzer._append_audio_state = EndOfTurnState.COMPLETE
# analyze_end_of_turn is called after append_audio returns COMPLETE
self.analyzer._analyze_state = EndOfTurnState.COMPLETE
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
await self.strategy.process_frame(
InputAudioRawFrame(audio=b"\x00" * 320, sample_rate=16000, num_channels=1)
)
assert self.turn_stopped_called
async def test_streaming_incomplete_does_not_trigger(self):
"""Streaming INCOMPLETE from append_audio should not trigger."""
self.analyzer._append_audio_state = EndOfTurnState.INCOMPLETE
await self.strategy.process_frame(
InputAudioRawFrame(audio=b"\x00" * 320, sample_rate=16000, num_channels=1)
)
assert not self.turn_stopped_called
async def test_vad_start_resets_speaking_state(self):
"""VAD start should set speaking state to True."""
assert not self.strategy._vad_user_speaking
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert self.strategy._vad_user_speaking
async def test_vad_stop_resets_speaking_state(self):
"""VAD stop should set speaking state to False."""
self.analyzer._analyze_state = EndOfTurnState.INCOMPLETE
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert self.strategy._vad_user_speaking
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert not self.strategy._vad_user_speaking
async def test_reset_clears_state(self):
"""Reset should clear speaking state."""
self.strategy._vad_user_speaking = True
await self.strategy.reset()
assert not self.strategy._vad_user_speaking
async def test_metrics_frame_pushed_on_prediction(self):
"""MetricsFrame should be pushed when prediction result is available."""
prediction = MetricsData(processor="test_analyzer")
self.analyzer._analyze_state = EndOfTurnState.COMPLETE
self.analyzer._prediction = prediction
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert len(self.pushed_frames) == 1
assert isinstance(self.pushed_frames[0], MetricsFrame)
assert self.pushed_frames[0].data == [prediction]
async def test_no_metrics_frame_when_no_prediction(self):
"""No MetricsFrame should be pushed when prediction is None."""
self.analyzer._analyze_state = EndOfTurnState.COMPLETE
self.analyzer._prediction = None
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert len(self.pushed_frames) == 0
async def test_speech_control_params_broadcast_on_start(self):
"""SpeechControlParamsFrame should be broadcast on StartFrame."""
await self.strategy.process_frame(
StartFrame(audio_in_sample_rate=16000, audio_out_sample_rate=16000)
)
assert len(self.broadcast_frames) == 1
frame_cls, kwargs = self.broadcast_frames[0]
assert frame_cls is SpeechControlParamsFrame
async def test_transcription_frames_ignored(self):
"""TranscriptionFrame should not affect state or trigger turn stop."""
self.analyzer._analyze_state = EndOfTurnState.INCOMPLETE
await self.strategy.process_frame(
TranscriptionFrame(text="hello", user_id="user1", timestamp="now")
)
assert not self.turn_stopped_called
async def test_process_frame_returns_continue(self):
"""process_frame should always return CONTINUE."""
from pipecat.turns.types import ProcessFrameResult
result = await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert result == ProcessFrameResult.CONTINUE
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,79 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Tests for VADUserTurnStopStrategy."""
import unittest
from pipecat.frames.frames import (
InputAudioRawFrame,
TranscriptionFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.types import ProcessFrameResult
from pipecat.turns.user_stop.vad_user_turn_stop_strategy import VADUserTurnStopStrategy
from pipecat.utils.asyncio.task_manager import TaskManager
class TestVADUserTurnStopStrategy(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.strategy = VADUserTurnStopStrategy()
self.task_manager = TaskManager()
await self.strategy.setup(self.task_manager)
self.turn_stopped_called = False
@self.strategy.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(strategy, params):
self.turn_stopped_called = True
async def asyncTearDown(self):
await self.strategy.cleanup()
async def test_vad_stop_triggers_user_turn_stopped(self):
"""VADUserStoppedSpeakingFrame should trigger user turn stopped."""
await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert self.turn_stopped_called
async def test_vad_start_does_not_trigger(self):
"""VADUserStartedSpeakingFrame should not trigger user turn stopped."""
await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert not self.turn_stopped_called
async def test_transcription_frames_ignored(self):
"""TranscriptionFrame should not trigger user turn stopped."""
await self.strategy.process_frame(
TranscriptionFrame(text="hello", user_id="user1", timestamp="now")
)
assert not self.turn_stopped_called
async def test_input_audio_frames_ignored(self):
"""InputAudioRawFrame should not trigger user turn stopped."""
await self.strategy.process_frame(
InputAudioRawFrame(audio=b"\x00" * 320, sample_rate=16000, num_channels=1)
)
assert not self.turn_stopped_called
async def test_process_frame_returns_continue(self):
"""process_frame should always return CONTINUE."""
result = await self.strategy.process_frame(VADUserStoppedSpeakingFrame(stop_secs=0.3))
assert result == ProcessFrameResult.CONTINUE
result = await self.strategy.process_frame(VADUserStartedSpeakingFrame(start_secs=0.2))
assert result == ProcessFrameResult.CONTINUE
async def test_reset_does_not_crash(self):
"""reset() should complete without error."""
await self.strategy.reset()
if __name__ == "__main__":
unittest.main()