Files
pipecat/tests/test_user_turn_processor.py
2026-01-30 10:07:34 -08:00

159 lines
5.1 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import unittest
from pipecat.frames.frames import (
InterruptionFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.tests.utils import SleepFrame, run_test
from pipecat.turns.user_stop import TranscriptionUserTurnStopStrategy
from pipecat.turns.user_turn_processor import UserTurnProcessor
from pipecat.turns.user_turn_strategies import UserTurnStrategies
USER_TURN_STOP_TIMEOUT = 0.2
TRANSCRIPTION_TIMEOUT = 0.1
class TestUserTurnProcessor(unittest.IsolatedAsyncioTestCase):
async def test_default_user_turn_strategies(self):
user_turn_processor = UserTurnProcessor(user_turn_strategies=UserTurnStrategies())
should_start = None
should_stop = None
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
nonlocal should_start
should_start = True
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
nonlocal should_stop
should_stop = True
pipeline = Pipeline([user_turn_processor])
frames_to_send = [
VADUserStartedSpeakingFrame(),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
SleepFrame(),
VADUserStoppedSpeakingFrame(),
]
expected_down_frames = [
VADUserStartedSpeakingFrame,
UserStartedSpeakingFrame,
InterruptionFrame,
TranscriptionFrame,
VADUserStoppedSpeakingFrame,
UserStoppedSpeakingFrame,
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
expected_down_frames=expected_down_frames,
)
self.assertTrue(should_start)
self.assertTrue(should_stop)
async def test_user_turn_stop_timeout_no_transcription(self):
user_turn_processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
should_start = None
should_stop = None
timeout = None
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
nonlocal should_start
should_start = True
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
nonlocal should_stop
should_stop = True
@user_turn_processor.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(processor):
nonlocal timeout
timeout = True
pipeline = Pipeline([user_turn_processor])
frames_to_send = [
VADUserStartedSpeakingFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=USER_TURN_STOP_TIMEOUT + 0.1),
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
)
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertTrue(timeout)
async def test_user_turn_stop_timeout_transcription(self):
user_turn_processor = UserTurnProcessor(
user_turn_strategies=UserTurnStrategies(
stop=[TranscriptionUserTurnStopStrategy(timeout=TRANSCRIPTION_TIMEOUT)],
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
should_start = None
should_stop = None
timeout = None
@user_turn_processor.event_handler("on_user_turn_started")
async def on_user_turn_started(processor, strategy):
nonlocal should_start
should_start = True
@user_turn_processor.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(processor, strategy):
nonlocal should_stop
should_stop = True
@user_turn_processor.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(processor):
nonlocal timeout
timeout = True
pipeline = Pipeline([user_turn_processor])
frames_to_send = [
VADUserStartedSpeakingFrame(),
VADUserStoppedSpeakingFrame(),
SleepFrame(sleep=USER_TURN_STOP_TIMEOUT - 0.1),
TranscriptionFrame(text="Hello!", user_id="", timestamp="now"),
SleepFrame(sleep=USER_TURN_STOP_TIMEOUT - 0.1),
SleepFrame(sleep=TRANSCRIPTION_TIMEOUT),
]
await run_test(
pipeline,
frames_to_send=frames_to_send,
)
# The transcription strategy should kick-in before the user turn end timeout.
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertFalse(timeout)
if __name__ == "__main__":
unittest.main()