Files
pipecat/tests/test_user_turn_processor.py
Mark Backman 34b068d657 Improve user turn stop timing by triggering timeout from VAD stop
Refactor TranscriptionUserTurnStopStrategy and TurnAnalyzerUserTurnStopStrategy
to use VADUserStoppedSpeakingFrame as the ground truth for when speech ended,
rather than triggering timeouts from transcription frames.
2026-02-09 14:12:33 -05:00

165 lines
5.4 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (
InterruptionFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.tests.utils import SleepFrame, run_test
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
USER_TURN_STOP_TIMEOUT = 0.2
TRANSCRIPTION_TIMEOUT = 0.1
class TestUserTurnProcessor(unittest.IsolatedAsyncioTestCase):
async def test_default_user_turn_strategies(self):
user_turn_processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
)
)
should_start = None
should_stop = None
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
nonlocal should_start
should_start = True
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
nonlocal should_stop
should_stop = True
pipeline = Pipeline([user_turn_processor])
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
# Wait for user_speech_timeout to elapse
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.1),
]
expected_down_frames = [
VADUserStartedSpeakingFrame,
UserStartedSpeakingFrame,
InterruptionFrame,
TranscriptionFrame,
VADUserStoppedSpeakingFrame,
UserStoppedSpeakingFrame,
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
self.assertTrue(should_start)
self.assertTrue(should_stop)
async def test_user_turn_stop_timeout_no_transcription(self):
user_turn_processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
should_start = None
should_stop = None
timeout = None
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
nonlocal should_start
should_start = True
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
nonlocal should_stop
should_stop = True
@user_turn_processor.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(processor):
nonlocal timeout
timeout = True
pipeline = Pipeline([user_turn_processor])
frames_to_send = [
VADUserStartedSpeakingFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=USER_TURN_STOP_TIMEOUT + 0.1),
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
)
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertTrue(timeout)
async def test_user_turn_stop_timeout_transcription(self):
user_turn_processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
should_start = None
should_stop = None
timeout = None
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
nonlocal should_start
should_start = True
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
nonlocal should_stop
should_stop = True
@user_turn_processor.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(processor):
nonlocal timeout
timeout = True
pipeline = Pipeline([user_turn_processor])
# Transcript arrives before VAD stop, then we wait for user_speech_timeout
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
VADUserStoppedSpeakingFrame(),
# Wait for user_speech_timeout (TRANSCRIPTION_TIMEOUT=0.1s) to elapse
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT + 0.05),
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
)
# The transcription strategy should kick-in before the user turn end timeout.
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertFalse(timeout)
if __name__ == "__main__":
unittest.main()