195 lines
6.7 KiB
Python
195 lines
6.7 KiB
Python
#
|
|
# Copyright (c) 2024-2026 Daily
|
|
#
|
|
# SPDX-License-Identifier: BSD 2-Clause License
|
|
#
|
|
|
|
import asyncio
|
|
import unittest
|
|
|
|
from pipecat.frames.frames import (
|
|
BotStartedSpeakingFrame,
|
|
TranscriptionFrame,
|
|
UserStartedSpeakingFrame,
|
|
UserStoppedSpeakingFrame,
|
|
VADUserStartedSpeakingFrame,
|
|
VADUserStoppedSpeakingFrame,
|
|
)
|
|
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
|
|
MinWordsUserTurnStartStrategy,
|
|
)
|
|
from pipecat.turns.user_turn_controller import UserTurnController
|
|
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies
|
|
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
|
|
|
|
USER_TURN_STOP_TIMEOUT = 0.2
|
|
|
|
|
|
class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
|
|
async def asyncSetUp(self):
|
|
self.task_manager = TaskManager()
|
|
self.task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
|
|
|
|
async def test_default_user_turn_strategies(self):
|
|
controller = UserTurnController(user_turn_strategies=UserTurnStrategies())
|
|
|
|
await controller.setup(self.task_manager)
|
|
|
|
should_start = None
|
|
should_stop = None
|
|
|
|
@controller.event_handler("on_user_turn_started")
|
|
async def on_user_turn_started(controller, strategy, params):
|
|
nonlocal should_start
|
|
should_start = True
|
|
|
|
@controller.event_handler("on_user_turn_stopped")
|
|
async def on_user_turn_stopped(controller, strategy, params):
|
|
nonlocal should_stop
|
|
should_stop = True
|
|
|
|
await controller.process_frame(VADUserStartedSpeakingFrame())
|
|
self.assertTrue(should_start)
|
|
self.assertFalse(should_stop)
|
|
|
|
await controller.process_frame(
|
|
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
|
|
)
|
|
self.assertTrue(should_start)
|
|
self.assertFalse(should_stop)
|
|
|
|
await controller.process_frame(VADUserStoppedSpeakingFrame())
|
|
self.assertTrue(should_start)
|
|
self.assertTrue(should_stop)
|
|
|
|
async def test_user_turn_start_reset(self):
|
|
controller = UserTurnController(
|
|
user_turn_strategies=UserTurnStrategies(
|
|
start=[MinWordsUserTurnStartStrategy(min_words=3)]
|
|
),
|
|
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
|
|
)
|
|
|
|
await controller.setup(self.task_manager)
|
|
|
|
should_start = 0
|
|
|
|
@controller.event_handler("on_user_turn_started")
|
|
async def on_user_turn_started(controller, strategy, params):
|
|
nonlocal should_start
|
|
should_start += 1
|
|
|
|
await controller.process_frame(BotStartedSpeakingFrame())
|
|
await controller.process_frame(TranscriptionFrame(text="One", user_id="cat", timestamp=""))
|
|
self.assertEqual(should_start, 0)
|
|
|
|
await controller.process_frame(
|
|
TranscriptionFrame(text=" two three!", user_id="cat", timestamp="")
|
|
)
|
|
self.assertEqual(should_start, 1)
|
|
|
|
# Trigger user stop turn so we can trigger user start turn again.
|
|
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
|
|
|
|
await controller.process_frame(BotStartedSpeakingFrame())
|
|
await controller.process_frame(
|
|
TranscriptionFrame(text="Hello", user_id="cat", timestamp="")
|
|
)
|
|
self.assertEqual(should_start, 1)
|
|
|
|
await controller.process_frame(
|
|
TranscriptionFrame(text=" there friend!", user_id="cat", timestamp="")
|
|
)
|
|
self.assertEqual(should_start, 2)
|
|
|
|
async def test_user_turn_stop_timeout_no_transcription(self):
|
|
controller = UserTurnController(
|
|
user_turn_strategies=UserTurnStrategies(),
|
|
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
|
|
)
|
|
|
|
await controller.setup(self.task_manager)
|
|
|
|
should_start = None
|
|
should_stop = None
|
|
timeout = None
|
|
|
|
@controller.event_handler("on_user_turn_started")
|
|
async def on_user_turn_started(controller, strategy, params):
|
|
nonlocal should_start
|
|
should_start = True
|
|
|
|
@controller.event_handler("on_user_turn_stopped")
|
|
async def on_user_turn_stopped(controller, strategy, params):
|
|
nonlocal should_stop
|
|
should_stop = True
|
|
|
|
@controller.event_handler("on_user_turn_stop_timeout")
|
|
async def on_user_turn_stop_timeout(controller):
|
|
nonlocal timeout
|
|
timeout = True
|
|
|
|
await controller.process_frame(VADUserStartedSpeakingFrame())
|
|
self.assertTrue(should_start)
|
|
self.assertFalse(should_stop)
|
|
self.assertFalse(timeout)
|
|
|
|
await controller.process_frame(VADUserStoppedSpeakingFrame())
|
|
self.assertTrue(should_start)
|
|
self.assertFalse(should_stop)
|
|
|
|
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
|
|
self.assertTrue(should_start)
|
|
self.assertTrue(should_stop)
|
|
self.assertTrue(timeout)
|
|
|
|
async def test_external_user_turn_strategies_no_timeout_while_speaking(self):
|
|
"""Test that timeout does not trigger when user is still speaking with external strategies."""
|
|
controller = UserTurnController(
|
|
user_turn_strategies=ExternalUserTurnStrategies(),
|
|
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
|
|
)
|
|
|
|
await controller.setup(self.task_manager)
|
|
|
|
should_start = None
|
|
should_stop = None
|
|
timeout = None
|
|
|
|
@controller.event_handler("on_user_turn_started")
|
|
async def on_user_turn_started(controller, strategy, params):
|
|
nonlocal should_start
|
|
should_start = True
|
|
|
|
@controller.event_handler("on_user_turn_stopped")
|
|
async def on_user_turn_stopped(controller, strategy, params):
|
|
nonlocal should_stop
|
|
should_stop = True
|
|
|
|
@controller.event_handler("on_user_turn_stop_timeout")
|
|
async def on_user_turn_stop_timeout(controller):
|
|
nonlocal timeout
|
|
timeout = True
|
|
|
|
# Simulate external service (like Deepgram Flux) broadcasting UserStartedSpeakingFrame
|
|
await controller.process_frame(UserStartedSpeakingFrame())
|
|
self.assertTrue(should_start)
|
|
self.assertFalse(should_stop)
|
|
self.assertFalse(timeout)
|
|
|
|
# User is still speaking, timeout should not trigger
|
|
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
|
|
self.assertTrue(should_start)
|
|
self.assertFalse(should_stop)
|
|
self.assertFalse(timeout)
|
|
|
|
# Now external service broadcasts UserStoppedSpeakingFrame
|
|
await controller.process_frame(UserStoppedSpeakingFrame())
|
|
|
|
# But no transcription, so timeout should trigger
|
|
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
|
|
|
|
self.assertTrue(should_start)
|
|
self.assertTrue(should_stop)
|
|
self.assertTrue(timeout)
|