Files
pipecat/tests/test_user_turn_controller.py
Aleix Conchillo Flaqué 480eca42f5 Split user-turn-stop into inference-triggered and finalized events
Fixes a real bug: with `filter_incomplete_user_turns` enabled, the
smart-turn detector's tentative stop was firing `on_user_turn_stopped`
before the LLM had a chance to veto it. Observers, transcript
appenders and UI indicators received an early — and sometimes
duplicated — signal.

Decomposes the single stop concern into two events:
- `on_user_turn_inference_triggered` fires when a stop strategy has
  enough signal to start LLM inference. The aggregator pushes the
  context here, kicking off the LLM call.
- `on_user_turn_stopped` fires only when the user turn is semantically
  final. Built-in strategies fire both events at the same call site,
  preserving today's behavior for the common case.

Adds `LLMTurnCompletionUserTurnStopStrategy`, which gates
finalization on a `UserTurnCompletedFrame` (a fieldless system frame
emitted by any component judging turn completeness — currently the
`UserTurnCompletionLLMServiceMixin` on `✓`).

Adds `deferred(strategy)` / `DeferredUserTurnStopStrategy`, a thin
wrapper that forwards an inner strategy's events except
`on_user_turn_stopped`. Use this to install a stop strategy as an
inference trigger only, leaving finalization to a peer (e.g. the LLM
completion strategy).

Adds `llm_completion_user_turn_stop_strategies()` for the common
case:

    UserTurnStrategies(
        stop=llm_completion_user_turn_stop_strategies(),
    )

Deprecates `LLMUserAggregatorParams.filter_incomplete_user_turns`.
The aggregator emits a `DeprecationWarning`, wraps existing stop
strategies with `deferred(...)`, and appends
`LLMTurnCompletionUserTurnStopStrategy` automatically.
2026-05-07 17:46:09 -07:00

333 lines
12 KiB
Python

#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import unittest
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_start.min_words_user_turn_start_strategy import (
MinWordsUserTurnStartStrategy,
)
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy, deferred
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import ExternalUserTurnStrategies, UserTurnStrategies
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
USER_TURN_STOP_TIMEOUT = 0.2
TRANSCRIPTION_TIMEOUT = 0.1
class TestUserTurnController(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.task_manager = TaskManager()
self.task_manager.setup(TaskManagerParams(loop=asyncio.get_running_loop()))
async def test_default_user_turn_strategies(self):
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
)
)
await controller.setup(self.task_manager)
should_start = None
should_stop = None
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal should_start
should_start = True
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal should_stop
should_stop = True
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertTrue(should_start)
self.assertFalse(should_stop)
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
self.assertTrue(should_start)
self.assertFalse(should_stop)
await controller.process_frame(VADUserStoppedSpeakingFrame())
self.assertTrue(should_start)
# Wait for user_speech_timeout to elapse
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertTrue(should_stop)
async def test_inference_triggered_fires_alongside_stopped(self):
"""Default strategies fire both inference-triggered and stopped, in order."""
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
)
)
await controller.setup(self.task_manager)
events: list[str] = []
@controller.event_handler("on_user_turn_inference_triggered")
async def on_user_turn_inference_triggered(controller, strategy):
events.append("inference_triggered")
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
events.append("stopped")
await controller.process_frame(VADUserStartedSpeakingFrame())
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(events, ["inference_triggered", "stopped"])
async def test_deferred_wrapper_skips_stopped(self):
"""A deferred() wrapper drops the inner strategy's on_user_turn_stopped event."""
wrapped = deferred(
SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)
)
controller = UserTurnController(user_turn_strategies=UserTurnStrategies(stop=[wrapped]))
await controller.setup(self.task_manager)
events: list[str] = []
@controller.event_handler("on_user_turn_inference_triggered")
async def on_user_turn_inference_triggered(controller, strategy):
events.append("inference_triggered")
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
events.append("stopped")
await controller.process_frame(VADUserStartedSpeakingFrame())
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
# The inner strategy fires inference-triggered (forwarded by the
# wrapper). Finalization is suppressed, but the controller's
# stop watchdog eventually fires `stopped`.
self.assertEqual(events[0], "inference_triggered")
async def test_user_turn_start_reset(self):
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
start=[MinWordsUserTurnStartStrategy(min_words=3)]
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
should_start = 0
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal should_start
should_start += 1
await controller.process_frame(BotStartedSpeakingFrame())
await controller.process_frame(TranscriptionFrame(text="One", user_id="cat", timestamp=""))
self.assertEqual(should_start, 0)
await controller.process_frame(
TranscriptionFrame(text="One two three!", user_id="cat", timestamp="")
)
self.assertEqual(should_start, 1)
# Trigger user stop turn so we can trigger user start turn again.
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
await controller.process_frame(BotStartedSpeakingFrame())
await controller.process_frame(TranscriptionFrame(text="Hi!", user_id="cat", timestamp=""))
self.assertEqual(should_start, 1)
await controller.process_frame(
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
)
self.assertEqual(should_start, 2)
async def test_user_turn_stop_timeout_no_transcription(self):
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
should_start = None
should_stop = None
timeout = None
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal should_start
should_start = True
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal should_stop
should_stop = True
@controller.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(controller):
nonlocal timeout
timeout = True
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertTrue(should_start)
self.assertFalse(should_stop)
self.assertFalse(timeout)
await controller.process_frame(VADUserStoppedSpeakingFrame())
self.assertTrue(should_start)
self.assertFalse(should_stop)
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertTrue(timeout)
async def test_external_user_turn_strategies_no_timeout_while_speaking(self):
"""Test that timeout does not trigger when user is still speaking with external strategies."""
controller = UserTurnController(
user_turn_strategies=ExternalUserTurnStrategies(),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
should_start = None
should_stop = None
timeout = None
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal should_start
should_start = True
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal should_stop
should_stop = True
@controller.event_handler("on_user_turn_stop_timeout")
async def on_user_turn_stop_timeout(controller):
nonlocal timeout
timeout = True
# Simulate external service (like Deepgram Flux) broadcasting UserStartedSpeakingFrame
await controller.process_frame(UserStartedSpeakingFrame())
self.assertTrue(should_start)
self.assertFalse(should_stop)
self.assertFalse(timeout)
# User is still speaking, timeout should not trigger
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
self.assertTrue(should_start)
self.assertFalse(should_stop)
self.assertFalse(timeout)
# Now external service broadcasts UserStoppedSpeakingFrame
await controller.process_frame(UserStoppedSpeakingFrame())
# But no transcription, so timeout should trigger
await asyncio.sleep(USER_TURN_STOP_TIMEOUT + 0.1)
self.assertTrue(should_start)
self.assertTrue(should_stop)
self.assertTrue(timeout)
async def test_late_transcription_between_turns_no_premature_stop(self):
"""Test that a late transcription arriving between turns does not cause a premature stop.
Reproduces the bug from issue #4053: after turn 1 completes and reset()
clears state, a late TranscriptionFrame sets _text to stale content. On
the next turn, that stale _text gates a premature turn stop via timeout(0)
before the current turn's transcript arrives.
Uses only VADUserTurnStartStrategy (no TranscriptionUserTurnStartStrategy)
so the late transcription doesn't trigger a spurious turn start.
"""
controller = UserTurnController(
user_turn_strategies=UserTurnStrategies(
start=[VADUserTurnStartStrategy()],
stop=[SpeechTimeoutUserTurnStopStrategy(user_speech_timeout=TRANSCRIPTION_TIMEOUT)],
),
user_turn_stop_timeout=USER_TURN_STOP_TIMEOUT,
)
await controller.setup(self.task_manager)
start_count = 0
stop_count = 0
@controller.event_handler("on_user_turn_started")
async def on_user_turn_started(controller, strategy, params):
nonlocal start_count
start_count += 1
@controller.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(controller, strategy, params):
nonlocal stop_count
stop_count += 1
# === Turn 1: S-T-E ===
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(start_count, 1)
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 1)
# === Between turns: late transcription arrives ===
# This sets _text on the stop strategy while _user_turn is False.
await controller.process_frame(
TranscriptionFrame(text="Hello!", user_id="", timestamp="now")
)
# === Turn 2: S-T-E (transcription arrives during turn) ===
# The fix resets stop strategies at turn start, clearing stale _text.
await controller.process_frame(VADUserStartedSpeakingFrame())
self.assertEqual(start_count, 2)
await controller.process_frame(
TranscriptionFrame(text="How are you?", user_id="", timestamp="now")
)
await controller.process_frame(VADUserStoppedSpeakingFrame())
# Wait for user_speech_timeout to elapse — should get turn 2 stop
await asyncio.sleep(TRANSCRIPTION_TIMEOUT + 0.1)
self.assertEqual(stop_count, 2)
if __name__ == "__main__":
unittest.main()