# # Copyright (c) 2024-2026 Daily # # SPDX-License-Identifier: BSD 2-Clause License # import asyncio import unittest from pipecat.frames.frames import ( TranscriptionFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) from pipecat.turns.user_turn_controller import UserTurnController from pipecat.turns.user_turn_strategies import UserTurnStrategies from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams USER_TURN_STOP_TIMEOUT = 0.2 class TestUserTurnController(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self.task_manager = TaskManager() self.task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop())) async def test_default_user_turn_strategies(self): controller = UserTurnController(user_turn_strategies=UserTurnStrategies()) await controller.setup(self.task_manager) should_start = None should_stop = None @controller.event_handler("on_user_turn_started") async def on_user_turn_started(controller, strategy, params): nonlocal should_start should_start = True @controller.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(controller, strategy, params): nonlocal should_stop should_stop = True await controller.process_frame(VADUserStartedSpeakingFrame()) self.assertTrue(should_start) self.assertFalse(should_stop) await controller.process_frame( TranscriptionFrame(text="Hello!", user_id="", timestamp="now") ) self.assertTrue(should_start) self.assertFalse(should_stop) await controller.process_frame(VADUserStoppedSpeakingFrame()) self.assertTrue(should_start) self.assertTrue(should_stop) async def test_user_turn_stop_timeout_no_transcription(self): controller = UserTurnController( user_turn_strategies=UserTurnStrategies(), user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT, ) await controller.setup(self.task_manager) should_start = None should_stop = None timeout = None @controller.event_handler("on_user_turn_started") async def on_user_turn_started(controller, strategy, params): nonlocal should_start should_start = True @controller.event_handler("on_user_turn_stopped") async def on_user_turn_stopped(controller, strategy, params): nonlocal should_stop should_stop = True @controller.event_handler("on_user_turn_stop_timeout") async def on_user_turn_stop_timeout(controller): nonlocal timeout timeout = True await controller.process_frame(VADUserStartedSpeakingFrame()) self.assertTrue(should_start) self.assertFalse(should_stop) self.assertFalse(timeout) await controller.process_frame(VADUserStoppedSpeakingFrame()) self.assertTrue(should_start) self.assertFalse(should_stop) await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1) self.assertTrue(should_start) self.assertTrue(should_stop) self.assertTrue(timeout)