From c821e9f8fd562db4c999c50ae78e03cb9e4ec679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Sun, 28 Dec 2025 12:45:58 -0800 Subject: [PATCH 1/8] turns: add external user and bot turn start strategies External strategies are strategies where the logic for user turn start and turn end come from a different processors (e.g. an STT). --- changelog/3045.added.md | 2 + .../07a-interruptible-speechmatics-vad.py | 5 +- .../07c-interruptible-deepgram-flux.py | 4 +- .../07c-interruptible-deepgram-vad.py | 3 +- .../processors/aggregators/llm_response.py | 5 - .../aggregators/llm_response_universal.py | 54 +++++--- src/pipecat/turns/bot/__init__.py | 1 + .../turns/bot/base_bot_turn_start_strategy.py | 13 +- .../bot/external_bot_turn_start_strategy.py | 127 ++++++++++++++++++ src/pipecat/turns/turn_start_strategies.py | 24 ++++ src/pipecat/turns/user/__init__.py | 1 + .../user/base_user_turn_start_strategy.py | 15 ++- .../user/external_user_turn_start_strategy.py | 31 +++++ tests/test_bot_turn_start_strategy.py | 26 ++-- tests/test_user_turn_start_strategy.py | 14 +- 15 files changed, 274 insertions(+), 51 deletions(-) create mode 100644 src/pipecat/turns/bot/external_bot_turn_start_strategy.py create mode 100644 src/pipecat/turns/user/external_user_turn_start_strategy.py diff --git a/changelog/3045.added.md b/changelog/3045.added.md index f1063af1f..beadd0e51 100644 --- a/changelog/3045.added.md +++ b/changelog/3045.added.md @@ -10,10 +10,12 @@ - VADUserTurnStartStrategy - TranscriptionUserTurnStartStrategy - MinWordsUserTurnStartStrategy + - ExternalUserTurnStartStrategy Available bot turn start strategies: - TranscriptionBotTurnStartStrategy - TurnAnalyzerBotTurnStartStrategy + - ExternalBotTurnStartStrategy The default strategies are: diff --git a/examples/foundational/07a-interruptible-speechmatics-vad.py b/examples/foundational/07a-interruptible-speechmatics-vad.py index 48fd93aec..ce88ffe92 100644 --- a/examples/foundational/07a-interruptible-speechmatics-vad.py +++ b/examples/foundational/07a-interruptible-speechmatics-vad.py @@ -29,6 +29,7 @@ from pipecat.transcriptions.language import Language from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies load_dotenv(override=True) @@ -132,7 +133,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) context_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False), + user_params=LLMUserAggregatorParams( + turn_start_strategies=ExternalTurnStartStrategies() + ), ) pipeline = Pipeline( diff --git a/examples/foundational/07c-interruptible-deepgram-flux.py b/examples/foundational/07c-interruptible-deepgram-flux.py index a47140ea2..e0175b335 100644 --- a/examples/foundational/07c-interruptible-deepgram-flux.py +++ b/examples/foundational/07c-interruptible-deepgram-flux.py @@ -27,6 +27,7 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies load_dotenv(override=True) @@ -71,7 +72,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) context_aggregator = LLMContextAggregatorPair( - context, user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False) + context, + user_params=LLMUserAggregatorParams(turn_start_strategies=ExternalTurnStartStrategies()), ) pipeline = Pipeline( diff --git a/examples/foundational/07c-interruptible-deepgram-vad.py b/examples/foundational/07c-interruptible-deepgram-vad.py index 1e1133a75..cbde4008d 100644 --- a/examples/foundational/07c-interruptible-deepgram-vad.py +++ b/examples/foundational/07c-interruptible-deepgram-vad.py @@ -33,6 +33,7 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies load_dotenv(override=True) @@ -78,7 +79,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) context_aggregator = LLMContextAggregatorPair( context, - user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False), + user_params=LLMUserAggregatorParams(turn_start_strategies=ExternalTurnStartStrategies()), ) pipeline = Pipeline( diff --git a/src/pipecat/processors/aggregators/llm_response.py b/src/pipecat/processors/aggregators/llm_response.py index dcb6f76f9..5dc374567 100644 --- a/src/pipecat/processors/aggregators/llm_response.py +++ b/src/pipecat/processors/aggregators/llm_response.py @@ -81,16 +81,11 @@ class LLMUserAggregatorParams: enable_emulated_vad_interruptions: When True, allows emulated VAD events to interrupt the bot when it's speaking. When False, emulated speech is ignored while the bot is speaking. - enable_user_speaking_frames: [DO NOT USE] added for temporary backwards - compatibility. - """ aggregation_timeout: float = 0.5 turn_emulated_vad_timeout: float = 0.8 enable_emulated_vad_interruptions: bool = False - # Added for backwards compatibility. - enable_user_speaking_frames: bool = True @dataclass diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 3083611b2..7b88ec0b7 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -76,18 +76,12 @@ class LLMUserAggregatorParams: """Parameters for configuring LLM user aggregation behavior. Parameters: - enable_user_speaking_frames: If True, the aggregator will emit frames - indicating when the user starts and stops speaking, as well as - interruption frames. This is enabled by default, but you may want - to disable it if another component (e.g., an STT service) is already - generating these frames. turn_start_strategies: User and bot turn start strategies. user_mute_strategies: List of user mute strategies. user_turn_end_timeout: Time in seconds to wait before considering the user's turn finished and starting the bot turn. """ - enable_user_speaking_frames: bool = True turn_start_strategies: Optional[TurnStartStrategies] = None user_mute_strategies: List[BaseUserMuteStrategy] = field(default_factory=list) user_turn_end_timeout: float = 5.0 @@ -507,11 +501,25 @@ class LLMUserAggregator(LLMContextAggregator): ) ) - async def _on_user_turn_started(self, strategy: BaseUserTurnStartStrategy): - await self._trigger_user_turn_start(strategy) + async def _on_user_turn_started( + self, + strategy: BaseUserTurnStartStrategy, + enable_user_speaking_frames: bool, + ): + await self._trigger_user_turn_start( + strategy, + enable_user_speaking_frames=enable_user_speaking_frames, + ) - async def _on_bot_turn_started(self, strategy: BaseBotTurnStartStrategy): - await self._trigger_bot_turn_start(strategy) + async def _on_bot_turn_started( + self, + strategy: BaseBotTurnStartStrategy, + enable_user_speaking_frames: bool, + ): + await self._trigger_bot_turn_start( + strategy, + enable_user_speaking_frames=enable_user_speaking_frames, + ) async def _on_push_frame( self, @@ -529,11 +537,18 @@ class LLMUserAggregator(LLMContextAggregator): ): await self.broadcast_frame(frame_cls, **kwargs) - async def _trigger_user_turn_start(self, strategy: Optional[BaseUserTurnStartStrategy]): + async def _trigger_user_turn_start( + self, + strategy: Optional[BaseUserTurnStartStrategy], + *, + enable_user_speaking_frames: bool, + ): # Prevent two consecutive user turn starts. if self._user_turn: return + logger.debug(f"User started speaking (user turn start strategy: {strategy})") + self._user_turn = True self._user_turn_end_timeout_event.set() @@ -542,19 +557,25 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._turn_start_strategies.user: await s.reset() - if self._params.enable_user_speaking_frames: - logger.debug(f"User started speaking (user turn start strategy: {strategy})") + if enable_user_speaking_frames: # TODO(aleix): These frames should really come from the top of the pipeline. await self.broadcast_frame(UserStartedSpeakingFrame) await self.broadcast_frame(InterruptionFrame) await self._call_event_handler("on_user_turn_started", strategy) - async def _trigger_bot_turn_start(self, strategy: Optional[BaseBotTurnStartStrategy]): + async def _trigger_bot_turn_start( + self, + strategy: Optional[BaseBotTurnStartStrategy], + *, + enable_user_speaking_frames: bool, + ): # Prevent two consecutive bot turn starts. if not self._user_turn: return + logger.debug(f"User stopped speaking (bot turn start strategy: {strategy})") + self._user_turn = False self._user_turn_end_timeout_event.set() @@ -563,8 +584,7 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._turn_start_strategies.bot: await s.reset() - if self._params.enable_user_speaking_frames: - logger.debug(f"User stopped speaking (bot turn start strategy: {strategy})") + if enable_user_speaking_frames: # TODO(aleix): This frame should really come from the top of the pipeline. await self.broadcast_frame(UserStoppedSpeakingFrame) @@ -584,7 +604,7 @@ class LLMUserAggregator(LLMContextAggregator): except asyncio.TimeoutError: if self._user_turn and not self._vad_user_speaking: await self._call_event_handler("on_user_turn_end_timeout") - await self._trigger_bot_turn_start(None) + await self._trigger_bot_turn_start(None, enable_user_speaking_frames=True) class LLMAssistantAggregator(LLMContextAggregator): diff --git a/src/pipecat/turns/bot/__init__.py b/src/pipecat/turns/bot/__init__.py index 258386256..4f4acf24e 100644 --- a/src/pipecat/turns/bot/__init__.py +++ b/src/pipecat/turns/bot/__init__.py @@ -5,6 +5,7 @@ # from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy +from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy from pipecat.turns.bot.transcription_bot_turn_start_strategy import ( TranscriptionBotTurnStartStrategy, ) diff --git a/src/pipecat/turns/bot/base_bot_turn_start_strategy.py b/src/pipecat/turns/bot/base_bot_turn_start_strategy.py index 10bd2de9b..5b997524e 100644 --- a/src/pipecat/turns/bot/base_bot_turn_start_strategy.py +++ b/src/pipecat/turns/bot/base_bot_turn_start_strategy.py @@ -88,6 +88,13 @@ class BaseBotTurnStartStrategy(BaseObject): """ await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) - async def trigger_bot_turn_started(self): - """Trigger the `on_bot_turn_started` event.""" - await self._call_event_handler("on_bot_turn_started") + async def trigger_bot_turn_started(self, *, enable_user_speaking_frames: bool = True): + """Trigger the `on_bot_turn_started` event. + + Args: + enable_user_speaking_frames: If True, the aggregator will emit frames + indicating when the user stops speaking. This is enabled by default, + but you may want to disable it if another component (e.g., an STT + service) is already generating these frames. + """ + await self._call_event_handler("on_bot_turn_started", enable_user_speaking_frames) diff --git a/src/pipecat/turns/bot/external_bot_turn_start_strategy.py b/src/pipecat/turns/bot/external_bot_turn_start_strategy.py new file mode 100644 index 000000000..330863cef --- /dev/null +++ b/src/pipecat/turns/bot/external_bot_turn_start_strategy.py @@ -0,0 +1,127 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Bot turn start strategy triggered by externally emitted frames.""" + +import asyncio +from typing import Optional + +from pipecat.frames.frames import ( + Frame, + InterimTranscriptionFrame, + TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, +) +from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy +from pipecat.utils.asyncio.task_manager import BaseTaskManager + + +class ExternalBotTurnStartStrategy(BaseBotTurnStartStrategy): + """Bot turn start strategy controlled by an external processor. + + This strategy does not determine when a user turn ends on its own, it relies + on a different processor in the pipeline which is responsible for emitting + `UserStoppedSpeakingFrame` frames. + + """ + + def __init__(self, *, timeout: float = 0.5): + """Initialize the transcription-based bot turn start strategy. + + Args: + timeout: A short delay used internally to handle consecutive or + slightly delayed transcriptions. + """ + super().__init__() + self._timeout = timeout + self._text = "" + self._user_speaking = False + self._seen_interim_results = False + self._event = asyncio.Event() + self._task: Optional[asyncio.Task] = None + + async def reset(self): + """Reset the strategy to its initial state.""" + await super().reset() + self._text = "" + self._user_speaking = False + self._seen_interim_results = False + self._event.clear() + + async def setup(self, task_manager: BaseTaskManager): + """Initialize the strategy with the given task manager. + + Args: + task_manager: The task manager to be associated with this instance. + """ + await super().setup(task_manager) + self._task = task_manager.create_task(self._task_handler(), f"{self}::_task_handler") + + async def cleanup(self): + """Cleanup the strategy.""" + await super().cleanup() + if self._task: + await self.task_manager.cancel_task(self._task) + self._task = None + + async def process_frame(self, frame: Frame): + """Process an incoming frame to update strategy state. + + Updates internal transcription text and VAD state. The bot turn will be + triggered when appropriate based on the collected frames. + + Args: + frame: The frame to be analyzed. + + """ + if isinstance(frame, UserStartedSpeakingFrame): + await self._handle_user_started_speaking(frame) + elif isinstance(frame, UserStoppedSpeakingFrame): + await self._handle_user_stopped_speaking(frame) + elif isinstance(frame, InterimTranscriptionFrame): + await self._handle_interim_transcription(frame) + elif isinstance(frame, TranscriptionFrame): + await self._handle_transcription(frame) + + async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame): + """Handle when the external service indicates the user is speaking.""" + self._user_speaking = True + + async def _handle_user_stopped_speaking(self, _: UserStoppedSpeakingFrame): + """Handle when the external service indicates the user has stopped speaking.""" + self._user_speaking = False + await self._maybe_trigger_bot_turn_started() + + async def _handle_interim_transcription(self, frame: InterimTranscriptionFrame): + self._seen_interim_results = True + + async def _handle_transcription(self, frame: TranscriptionFrame): + """Handle user transcription.""" + self._text += frame.text + # We just got a final result, so let's reset interim results. + self._seen_interim_results = False + # Reset aggregation timer. + self._event.set() + + async def _task_handler(self): + """Asynchronously monitor transcriptions and trigger bot turn when ready. + + If transcription text exists and the user is not currently speaking, + triggers the bot turn. Handles multiple or delayed transcriptions + gracefully. + + """ + while True: + try: + await asyncio.wait_for(self._event.wait(), timeout=self._timeout) + self._event.clear() + except asyncio.TimeoutError: + await self._maybe_trigger_bot_turn_started() + + async def _maybe_trigger_bot_turn_started(self): + if not self._user_speaking and not self._seen_interim_results and self._text: + await self.trigger_bot_turn_started(enable_user_speaking_frames=False) diff --git a/src/pipecat/turns/turn_start_strategies.py b/src/pipecat/turns/turn_start_strategies.py index 972ea1575..7f4650ad0 100644 --- a/src/pipecat/turns/turn_start_strategies.py +++ b/src/pipecat/turns/turn_start_strategies.py @@ -11,10 +11,12 @@ from typing import List, Optional from pipecat.turns.bot import ( BaseBotTurnStartStrategy, + ExternalBotTurnStartStrategy, TranscriptionBotTurnStartStrategy, ) from pipecat.turns.user import ( BaseUserTurnStartStrategy, + ExternalUserTurnStartStrategy, TranscriptionUserTurnStartStrategy, VADUserTurnStartStrategy, ) @@ -49,3 +51,25 @@ class TurnStartStrategies: self.user = [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()] if not self.bot: self.bot = [TranscriptionBotTurnStartStrategy()] + + +@dataclass +class ExternalTurnStartStrategies(TurnStartStrategies): + """Default container for external user and bot turn start strategies. + + This class provides a convenience default for configuring external turn + control. It preconfigures `TurnStartStrategies` with + `ExternalUserTurnStartStrategy` and `ExternalBotTurnStartStrategy`, allowing + external processors (such as services) to control when user and bot turns + start. + + When using this container, the user aggregator does not push + `UserStartedSpeakingFrame` or `UserStoppedSpeakingFrame` frames, and does + not generate interruptions. These signals are expected to be provided by an + external processor. + + """ + + def __post_init__(self): + self.user = [ExternalUserTurnStartStrategy()] + self.bot = [ExternalBotTurnStartStrategy()] diff --git a/src/pipecat/turns/user/__init__.py b/src/pipecat/turns/user/__init__.py index 0b3af50c5..1baf45e63 100644 --- a/src/pipecat/turns/user/__init__.py +++ b/src/pipecat/turns/user/__init__.py @@ -5,6 +5,7 @@ # from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy +from pipecat.turns.user.external_user_turn_start_strategy import ExternalUserTurnStartStrategy from pipecat.turns.user.min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy from pipecat.turns.user.transcription_user_turn_start_strategy import ( TranscriptionUserTurnStartStrategy, diff --git a/src/pipecat/turns/user/base_user_turn_start_strategy.py b/src/pipecat/turns/user/base_user_turn_start_strategy.py index c1d49f6e8..9f7c8811b 100644 --- a/src/pipecat/turns/user/base_user_turn_start_strategy.py +++ b/src/pipecat/turns/user/base_user_turn_start_strategy.py @@ -88,6 +88,15 @@ class BaseUserTurnStartStrategy(BaseObject): """ await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) - async def trigger_user_turn_started(self): - """Trigger the `on_user_turn_started` event.""" - await self._call_event_handler("on_user_turn_started") + async def trigger_user_turn_started(self, *, enable_user_speaking_frames: bool = True): + """Trigger the `on_user_turn_started` event. + + Args: + enable_user_speaking_frames: If True, the aggregator will emit frames + indicating when the user starts speaking, as well as interruption + frames. This is enabled by default, but you may want to disable it + if another component (e.g., an STT service) is already generating + these frames. + **kwargs: Keyword arguments to be passed to the frame's constructor. + """ + await self._call_event_handler("on_user_turn_started", enable_user_speaking_frames) diff --git a/src/pipecat/turns/user/external_user_turn_start_strategy.py b/src/pipecat/turns/user/external_user_turn_start_strategy.py new file mode 100644 index 000000000..13b69346d --- /dev/null +++ b/src/pipecat/turns/user/external_user_turn_start_strategy.py @@ -0,0 +1,31 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User turn start strategy triggered by externally emitted frames.""" + +from pipecat.frames.frames import Frame, UserStartedSpeakingFrame +from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy + + +class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy): + """User turn start strategy controlled by an external processor. + + This strategy does not determine when a user turn starts on its own, instead + it relies on a different processor in the pipeline which is responsible for + emitting `UserStartedSpeakingFrame` frames. + + """ + + async def process_frame(self, frame: Frame): + """Process an incoming frame to detect user turn start. + + Args: + frame: The frame to be analyzed. + """ + await super().process_frame(frame) + + if isinstance(frame, UserStartedSpeakingFrame): + await self.trigger_user_turn_started(enable_user_speaking_frames=False) diff --git a/tests/test_bot_turn_start_strategy.py b/tests/test_bot_turn_start_strategy.py index f719764aa..3908cb974 100644 --- a/tests/test_bot_turn_start_strategy.py +++ b/tests/test_bot_turn_start_strategy.py @@ -30,7 +30,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -55,7 +55,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -86,7 +86,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -133,7 +133,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -167,7 +167,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -209,7 +209,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -239,7 +239,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -275,7 +275,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -313,7 +313,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -347,7 +347,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -392,7 +392,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -412,7 +412,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -437,7 +437,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_bot_turn_started") - async def on_bot_turn_started(strategy): + async def on_bot_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True diff --git a/tests/test_user_turn_start_strategy.py b/tests/test_user_turn_start_strategy.py index b87f63d7b..da1c46e5a 100644 --- a/tests/test_user_turn_start_strategy.py +++ b/tests/test_user_turn_start_strategy.py @@ -27,7 +27,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -59,7 +59,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -81,7 +81,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -102,7 +102,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -115,7 +115,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -132,7 +132,7 @@ class TestVADUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True @@ -150,7 +150,7 @@ class TestTranscriptionUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase): should_start = None @strategy.event_handler("on_user_turn_started") - async def on_user_turn_started(strategy): + async def on_user_turn_started(strategy, enable_user_speaking_frames): nonlocal should_start should_start = True From e757b4bf6fc1e8d0008aedd9d7cee68e6bf39bd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Sun, 28 Dec 2025 15:18:25 -0800 Subject: [PATCH 2/8] tests: added external user and bot turn start strategies --- tests/test_bot_turn_start_strategy.py | 35 ++++++++++++++++++++++++++ tests/test_user_turn_start_strategy.py | 20 +++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/tests/test_bot_turn_start_strategy.py b/tests/test_bot_turn_start_strategy.py index 3908cb974..8e4396fc2 100644 --- a/tests/test_bot_turn_start_strategy.py +++ b/tests/test_bot_turn_start_strategy.py @@ -10,10 +10,13 @@ import unittest from pipecat.frames.frames import ( InterimTranscriptionFrame, TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) from pipecat.turns.bot import TranscriptionBotTurnStartStrategy +from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams AGGREGATION_TIMEOUT = 0.1 @@ -472,3 +475,35 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): # at least the aggregation timeout. await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1) self.assertTrue(should_start) + + +class TestExternalBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase): + async def test_external_strategy(self): + strategy = ExternalBotTurnStartStrategy() + + should_start = None + + @strategy.event_handler("on_bot_turn_started") + async def on_bot_turn_started(strategy, enable_user_speaking_frames): + nonlocal should_start + should_start = True + + await strategy.process_frame(VADUserStartedSpeakingFrame()) + self.assertFalse(should_start) + + await strategy.process_frame(UserStartedSpeakingFrame()) + self.assertFalse(should_start) + + await strategy.process_frame(UserStoppedSpeakingFrame()) + self.assertFalse(should_start) + + await strategy.process_frame(UserStartedSpeakingFrame()) + self.assertFalse(should_start) + + await strategy.process_frame( + TranscriptionFrame(text="How are you?", user_id="cat", timestamp="") + ) + self.assertFalse(should_start) + + await strategy.process_frame(UserStoppedSpeakingFrame()) + self.assertTrue(should_start) diff --git a/tests/test_user_turn_start_strategy.py b/tests/test_user_turn_start_strategy.py index da1c46e5a..1212e075a 100644 --- a/tests/test_user_turn_start_strategy.py +++ b/tests/test_user_turn_start_strategy.py @@ -10,10 +10,12 @@ from pipecat.frames.frames import ( BotStartedSpeakingFrame, InterimTranscriptionFrame, TranscriptionFrame, + UserStartedSpeakingFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) from pipecat.turns.user import ( + ExternalUserTurnStartStrategy, MinWordsUserTurnStartStrategy, TranscriptionUserTurnStartStrategy, VADUserTurnStartStrategy, @@ -162,3 +164,21 @@ class TestTranscriptionUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase): await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="", timestamp="now")) self.assertTrue(should_start) + + +class TestExternalUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase): + async def test_external_strategy(self): + strategy = ExternalUserTurnStartStrategy() + + should_start = None + + @strategy.event_handler("on_user_turn_started") + async def on_user_turn_started(strategy, enable_user_speaking_frames): + nonlocal should_start + should_start = True + + await strategy.process_frame(VADUserStartedSpeakingFrame()) + self.assertFalse(should_start) + + await strategy.process_frame(UserStartedSpeakingFrame()) + self.assertTrue(should_start) From 46db8e58d6b80c3ae62a6e6ccbc48a2c0cec38f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Sun, 28 Dec 2025 15:03:43 -0800 Subject: [PATCH 3/8] LLMUserAggregator: fix backwards compatibility with ExternalTurnStartStrategies --- .../aggregators/llm_response_universal.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index 7b88ec0b7..c1ad2a629 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -65,7 +65,7 @@ from pipecat.processors.aggregators.llm_context import ( from pipecat.processors.frame_processor import FrameDirection, FrameProcessor from pipecat.turns.bot import BaseBotTurnStartStrategy from pipecat.turns.mute import BaseUserMuteStrategy -from pipecat.turns.turn_start_strategies import TurnStartStrategies +from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies, TurnStartStrategies from pipecat.turns.user import BaseUserTurnStartStrategy from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text from pipecat.utils.time import time_now_iso8601 @@ -359,9 +359,14 @@ class LLMUserAggregator(LLMContextAggregator): self._user_turn_end_timeout_task_handler() ) + await self._setup_turn_start_strategies() + await self._setup_user_mute_strategies() + + async def _setup_user_mute_strategies(self): for s in self._params.user_mute_strategies: await s.setup(self.task_manager) + async def _setup_turn_start_strategies(self): if self._turn_start_strategies.user: for s in self._turn_start_strategies.user: await s.setup(self.task_manager) @@ -387,9 +392,10 @@ class LLMUserAggregator(LLMContextAggregator): await self.cancel_task(self._user_turn_end_timeout_task) self._user_turn_end_timeout_task = None - for s in self._params.user_mute_strategies: - await s.cleanup() + await self._cleanup_turn_start_strategies() + await self._cleanup_user_mute_strategies() + async def _cleanup_turn_start_strategies(self): if self._turn_start_strategies.user: for s in self._turn_start_strategies.user: await s.cleanup() @@ -398,6 +404,10 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._turn_start_strategies.bot: await s.cleanup() + async def _cleanup_user_mute_strategies(self): + for s in self._params.user_mute_strategies: + await s.cleanup() + async def _maybe_mute_frame(self, frame: Frame): should_mute_frame = self._user_is_muted and isinstance( frame, @@ -472,6 +482,10 @@ class LLMUserAggregator(LLMContextAggregator): " )" ) + await self._cleanup_turn_start_strategies() + self._turn_start_strategies = ExternalTurnStartStrategies() + await self._setup_turn_start_strategies() + async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame): self._vad_user_speaking = True From 30e6a3393058471aedc5d5a048fde2adb5c6ff5c Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 29 Dec 2025 11:11:26 -0500 Subject: [PATCH 4/8] Update VoicemailDetector to use ExternalTurnStartStrategies --- .../extensions/voicemail/voicemail_detector.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/pipecat/extensions/voicemail/voicemail_detector.py b/src/pipecat/extensions/voicemail/voicemail_detector.py index 3849bc983..e87c6339d 100644 --- a/src/pipecat/extensions/voicemail/voicemail_detector.py +++ b/src/pipecat/extensions/voicemail/voicemail_detector.py @@ -37,9 +37,13 @@ from pipecat.frames.frames import ( ) from pipecat.pipeline.parallel_pipeline import ParallelPipeline from pipecat.processors.aggregators.llm_context import LLMContext -from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup from pipecat.services.llm_service import LLMService +from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies from pipecat.utils.sync.base_notifier import BaseNotifier from pipecat.utils.sync.event_notifier import EventNotifier @@ -318,11 +322,13 @@ class ClassificationProcessor(FrameProcessor): # User started speaking - set the voicemail event if self._voicemail_detected: self._voicemail_event.set() + await self.push_frame(frame, direction) elif isinstance(frame, UserStoppedSpeakingFrame): # User stopped speaking - clear the voicemail event if self._voicemail_detected: self._voicemail_event.clear() + await self.push_frame(frame, direction) else: # Pass all non-LLM frames through @@ -621,7 +627,12 @@ VOICEMAIL SYSTEM (respond "VOICEMAIL"): # Create the LLM context and aggregators for conversation management self._context = LLMContext(self._messages) - self._context_aggregator = LLMContextAggregatorPair(self._context) + self._context_aggregator = LLMContextAggregatorPair( + self._context, + user_params=LLMUserAggregatorParams( + turn_start_strategies=ExternalTurnStartStrategies() + ), + ) # Create notification system for coordinating between components self._gate_notifier = EventNotifier() # Signals classification completion From c28ed2206ce426d16f5c7a17a91268d999c95dab Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Mon, 29 Dec 2025 12:44:42 -0500 Subject: [PATCH 5/8] DeepgramSTTService pushes user started/stopped speaking and interruption frames --- changelog/xxx.changed.md | 1 + .../07c-interruptible-deepgram-vad.py | 15 +-------------- src/pipecat/services/deepgram/stt.py | 5 +++++ 3 files changed, 7 insertions(+), 14 deletions(-) create mode 100644 changelog/xxx.changed.md diff --git a/changelog/xxx.changed.md b/changelog/xxx.changed.md new file mode 100644 index 000000000..3b9b074a8 --- /dev/null +++ b/changelog/xxx.changed.md @@ -0,0 +1 @@ +- Updated `DeepgramSTTService` to push user started/stopped speaking and interruption frames when `vad_enabled` is set to true. This centralizes the frames into the service, removing the need to have your application code handle Deepgram's events and push these frames. diff --git a/examples/foundational/07c-interruptible-deepgram-vad.py b/examples/foundational/07c-interruptible-deepgram-vad.py index cbde4008d..9814fc01d 100644 --- a/examples/foundational/07c-interruptible-deepgram-vad.py +++ b/examples/foundational/07c-interruptible-deepgram-vad.py @@ -11,12 +11,7 @@ from deepgram import LiveOptions from dotenv import load_dotenv from loguru import logger -from pipecat.frames.frames import ( - InterruptionFrame, - LLMRunFrame, - UserStartedSpeakingFrame, - UserStoppedSpeakingFrame, -) +from pipecat.frames.frames import LLMRunFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -103,14 +98,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, ) - @stt.event_handler("on_speech_started") - async def on_speech_started(stt, *args, **kwargs): - await task.queue_frames([UserStartedSpeakingFrame(), InterruptionFrame()]) - - @stt.event_handler("on_utterance_end") - async def on_utterance_end(stt, *args, **kwargs): - await task.queue_frames([UserStoppedSpeakingFrame()]) - @transport.event_handler("on_client_connected") async def on_client_connected(transport, client): logger.info(f"Client connected") diff --git a/src/pipecat/services/deepgram/stt.py b/src/pipecat/services/deepgram/stt.py index 63c89f880..97888b7ec 100644 --- a/src/pipecat/services/deepgram/stt.py +++ b/src/pipecat/services/deepgram/stt.py @@ -17,6 +17,8 @@ from pipecat.frames.frames import ( InterimTranscriptionFrame, StartFrame, TranscriptionFrame, + UserStartedSpeakingFrame, + UserStoppedSpeakingFrame, VADUserStartedSpeakingFrame, VADUserStoppedSpeakingFrame, ) @@ -271,9 +273,12 @@ class DeepgramSTTService(STTService): async def _on_speech_started(self, *args, **kwargs): await self.start_metrics() await self._call_event_handler("on_speech_started", *args, **kwargs) + await self.broadcast_frame(UserStartedSpeakingFrame) + await self.push_interruption_task_frame_and_wait() async def _on_utterance_end(self, *args, **kwargs): await self._call_event_handler("on_utterance_end", *args, **kwargs) + await self.broadcast_frame(UserStoppedSpeakingFrame) @traced_stt async def _handle_transcription( From cf46431d92c57106c46b4f4c109bee6d92e9d242 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 29 Dec 2025 10:30:41 -0800 Subject: [PATCH 6/8] update changelog file --- changelog/{xxx.changed.md => 3314.changed.md} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/{xxx.changed.md => 3314.changed.md} (100%) diff --git a/changelog/xxx.changed.md b/changelog/3314.changed.md similarity index 100% rename from changelog/xxx.changed.md rename to changelog/3314.changed.md From c8f47b4b2273c94f010be99a6f8b821fbb6f9dd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 29 Dec 2025 14:32:08 -0800 Subject: [PATCH 7/8] turns: add UserTurnStartedParams and BotTurnStartedParams --- .../aggregators/llm_response_universal.py | 38 +++++-------- src/pipecat/turns/bot/__init__.py | 5 +- .../turns/bot/base_bot_turn_start_strategy.py | 50 ++++++++++++----- .../bot/external_bot_turn_start_strategy.py | 6 +-- src/pipecat/turns/user/__init__.py | 5 +- .../user/base_user_turn_start_strategy.py | 53 ++++++++++++++----- .../user/external_user_turn_start_strategy.py | 6 ++- 7 files changed, 106 insertions(+), 57 deletions(-) diff --git a/src/pipecat/processors/aggregators/llm_response_universal.py b/src/pipecat/processors/aggregators/llm_response_universal.py index c1ad2a629..9feb23199 100644 --- a/src/pipecat/processors/aggregators/llm_response_universal.py +++ b/src/pipecat/processors/aggregators/llm_response_universal.py @@ -63,10 +63,10 @@ from pipecat.processors.aggregators.llm_context import ( NotGiven, ) from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.turns.bot import BaseBotTurnStartStrategy +from pipecat.turns.bot import BaseBotTurnStartStrategy, BotTurnStartedParams from pipecat.turns.mute import BaseUserMuteStrategy from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies, TurnStartStrategies -from pipecat.turns.user import BaseUserTurnStartStrategy +from pipecat.turns.user import BaseUserTurnStartStrategy, UserTurnStartedParams from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text from pipecat.utils.time import time_now_iso8601 @@ -518,22 +518,14 @@ class LLMUserAggregator(LLMContextAggregator): async def _on_user_turn_started( self, strategy: BaseUserTurnStartStrategy, - enable_user_speaking_frames: bool, + params: UserTurnStartedParams, ): - await self._trigger_user_turn_start( - strategy, - enable_user_speaking_frames=enable_user_speaking_frames, - ) + await self._trigger_user_turn_start(strategy, params) async def _on_bot_turn_started( - self, - strategy: BaseBotTurnStartStrategy, - enable_user_speaking_frames: bool, + self, strategy: BaseBotTurnStartStrategy, params: BotTurnStartedParams ): - await self._trigger_bot_turn_start( - strategy, - enable_user_speaking_frames=enable_user_speaking_frames, - ) + await self._trigger_bot_turn_start(strategy, params) async def _on_push_frame( self, @@ -552,10 +544,7 @@ class LLMUserAggregator(LLMContextAggregator): await self.broadcast_frame(frame_cls, **kwargs) async def _trigger_user_turn_start( - self, - strategy: Optional[BaseUserTurnStartStrategy], - *, - enable_user_speaking_frames: bool, + self, strategy: Optional[BaseUserTurnStartStrategy], params: UserTurnStartedParams ): # Prevent two consecutive user turn starts. if self._user_turn: @@ -571,7 +560,7 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._turn_start_strategies.user: await s.reset() - if enable_user_speaking_frames: + if params.enable_user_speaking_frames: # TODO(aleix): These frames should really come from the top of the pipeline. await self.broadcast_frame(UserStartedSpeakingFrame) await self.broadcast_frame(InterruptionFrame) @@ -579,10 +568,7 @@ class LLMUserAggregator(LLMContextAggregator): await self._call_event_handler("on_user_turn_started", strategy) async def _trigger_bot_turn_start( - self, - strategy: Optional[BaseBotTurnStartStrategy], - *, - enable_user_speaking_frames: bool, + self, strategy: Optional[BaseBotTurnStartStrategy], params: BotTurnStartedParams ): # Prevent two consecutive bot turn starts. if not self._user_turn: @@ -598,7 +584,7 @@ class LLMUserAggregator(LLMContextAggregator): for s in self._turn_start_strategies.bot: await s.reset() - if enable_user_speaking_frames: + if params.enable_user_speaking_frames: # TODO(aleix): This frame should really come from the top of the pipeline. await self.broadcast_frame(UserStoppedSpeakingFrame) @@ -618,7 +604,9 @@ class LLMUserAggregator(LLMContextAggregator): except asyncio.TimeoutError: if self._user_turn and not self._vad_user_speaking: await self._call_event_handler("on_user_turn_end_timeout") - await self._trigger_bot_turn_start(None, enable_user_speaking_frames=True) + await self._trigger_bot_turn_start( + None, BotTurnStartedParams(enable_user_speaking_frames=True) + ) class LLMAssistantAggregator(LLMContextAggregator): diff --git a/src/pipecat/turns/bot/__init__.py b/src/pipecat/turns/bot/__init__.py index 4f4acf24e..3118c9f60 100644 --- a/src/pipecat/turns/bot/__init__.py +++ b/src/pipecat/turns/bot/__init__.py @@ -4,7 +4,10 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy +from pipecat.turns.bot.base_bot_turn_start_strategy import ( + BaseBotTurnStartStrategy, + BotTurnStartedParams, +) from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy from pipecat.turns.bot.transcription_bot_turn_start_strategy import ( TranscriptionBotTurnStartStrategy, diff --git a/src/pipecat/turns/bot/base_bot_turn_start_strategy.py b/src/pipecat/turns/bot/base_bot_turn_start_strategy.py index 5b997524e..d4faad799 100644 --- a/src/pipecat/turns/bot/base_bot_turn_start_strategy.py +++ b/src/pipecat/turns/bot/base_bot_turn_start_strategy.py @@ -6,6 +6,7 @@ """Base turn start strategy for determining when the bot should start speaking.""" +from dataclasses import dataclass from typing import Optional, Type from pipecat.frames.frames import Frame @@ -14,6 +15,26 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject +@dataclass +class BotTurnStartedParams: + """Parameters emitted when a bot turn starts. + + These parameters are passed to the `on_bot_turn_started` event and provide + contextual information about how the bot turn should be handled by the user + aggregator. + + Attributes: + enable_user_speaking_frames: Whether the user aggregator should emit + frames indicating user speaking state (e.g., user stopped speaking) + during the bot's turn. This is typically enabled by default, but may + be disabled when another component (such as an STT service) is already + responsible for generating user speaking frames. + + """ + + enable_user_speaking_frames: bool + + class BaseBotTurnStartStrategy(BaseObject): """Base class for strategies that determine when the bot should start speaking. @@ -28,9 +49,18 @@ class BaseBotTurnStartStrategy(BaseObject): """ - def __init__(self, **kwargs): - """Initialize the base bot turn start strategy.""" + def __init__(self, *, enable_user_speaking_frames: bool = True, **kwargs): + """Initialize the base bot turn start strategy. + + Args: + enable_user_speaking_frames: If True, the aggregator will emit frames + indicating when the user stops speaking. This is enabled by default, + but you may want to disable it if another component (e.g., an STT + service) is already generating these frames. + **kwargs: Additional keyword arguments. + """ super().__init__(**kwargs) + self._enable_user_speaking_frames = enable_user_speaking_frames self._task_manager: Optional[BaseTaskManager] = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) @@ -88,13 +118,9 @@ class BaseBotTurnStartStrategy(BaseObject): """ await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) - async def trigger_bot_turn_started(self, *, enable_user_speaking_frames: bool = True): - """Trigger the `on_bot_turn_started` event. - - Args: - enable_user_speaking_frames: If True, the aggregator will emit frames - indicating when the user stops speaking. This is enabled by default, - but you may want to disable it if another component (e.g., an STT - service) is already generating these frames. - """ - await self._call_event_handler("on_bot_turn_started", enable_user_speaking_frames) + async def trigger_bot_turn_started(self): + """Trigger the `on_bot_turn_started` event.""" + await self._call_event_handler( + "on_bot_turn_started", + BotTurnStartedParams(enable_user_speaking_frames=self._enable_user_speaking_frames), + ) diff --git a/src/pipecat/turns/bot/external_bot_turn_start_strategy.py b/src/pipecat/turns/bot/external_bot_turn_start_strategy.py index 330863cef..ddc6fb888 100644 --- a/src/pipecat/turns/bot/external_bot_turn_start_strategy.py +++ b/src/pipecat/turns/bot/external_bot_turn_start_strategy.py @@ -30,13 +30,13 @@ class ExternalBotTurnStartStrategy(BaseBotTurnStartStrategy): """ def __init__(self, *, timeout: float = 0.5): - """Initialize the transcription-based bot turn start strategy. + """Initialize the external bot turn start strategy. Args: timeout: A short delay used internally to handle consecutive or slightly delayed transcriptions. """ - super().__init__() + super().__init__(enable_user_speaking_frames=False) self._timeout = timeout self._text = "" self._user_speaking = False @@ -124,4 +124,4 @@ class ExternalBotTurnStartStrategy(BaseBotTurnStartStrategy): async def _maybe_trigger_bot_turn_started(self): if not self._user_speaking and not self._seen_interim_results and self._text: - await self.trigger_bot_turn_started(enable_user_speaking_frames=False) + await self.trigger_bot_turn_started() diff --git a/src/pipecat/turns/user/__init__.py b/src/pipecat/turns/user/__init__.py index 1baf45e63..a3ec3c744 100644 --- a/src/pipecat/turns/user/__init__.py +++ b/src/pipecat/turns/user/__init__.py @@ -4,7 +4,10 @@ # SPDX-License-Identifier: BSD 2-Clause License # -from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy +from pipecat.turns.user.base_user_turn_start_strategy import ( + BaseUserTurnStartStrategy, + UserTurnStartedParams, +) from pipecat.turns.user.external_user_turn_start_strategy import ExternalUserTurnStartStrategy from pipecat.turns.user.min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy from pipecat.turns.user.transcription_user_turn_start_strategy import ( diff --git a/src/pipecat/turns/user/base_user_turn_start_strategy.py b/src/pipecat/turns/user/base_user_turn_start_strategy.py index 9f7c8811b..bfe4197bb 100644 --- a/src/pipecat/turns/user/base_user_turn_start_strategy.py +++ b/src/pipecat/turns/user/base_user_turn_start_strategy.py @@ -6,6 +6,7 @@ """Base turn start strategy for determining when the user starts speaking.""" +from dataclasses import dataclass from typing import Optional, Type from pipecat.frames.frames import Frame @@ -14,6 +15,26 @@ from pipecat.utils.asyncio.task_manager import BaseTaskManager from pipecat.utils.base_object import BaseObject +@dataclass +class UserTurnStartedParams: + """Parameters emitted when a user turn starts. + + These parameters are passed to the `on_user_turn_started` event and provide + contextual information about how the user turn should be handled by the user + aggregator. + + Attributes: + enable_user_speaking_frames: Whether the user aggregator should emit + frames indicating user speaking state (e.g., user started speaking) + during the bot's turn. This is typically enabled by default, but may + be disabled when another component (such as an STT service) is already + responsible for generating user speaking frames. + + """ + + enable_user_speaking_frames: bool + + class BaseUserTurnStartStrategy(BaseObject): """Base class for strategies that determine when a user starts speaking. @@ -28,9 +49,19 @@ class BaseUserTurnStartStrategy(BaseObject): - `on_user_turn_started`: Signals that a user turn has started. """ - def __init__(self, **kwargs): - """Initialize the base user turn start strategy.""" + def __init__(self, *, enable_user_speaking_frames: bool = True, **kwargs): + """Initialize the base user turn start strategy. + + Args: + enable_user_speaking_frames: If True, the aggregator will emit frames + indicating when the user starts speaking, as well as interruption + frames. This is enabled by default, but you may want to disable it + if another component (e.g., an STT service) is already generating + these frames. + **kwargs: Additional keyword arguments. + """ super().__init__(**kwargs) + self._enable_user_speaking_frames = enable_user_speaking_frames self._task_manager: Optional[BaseTaskManager] = None self._register_event_handler("on_push_frame", sync=True) self._register_event_handler("on_broadcast_frame", sync=True) @@ -88,15 +119,9 @@ class BaseUserTurnStartStrategy(BaseObject): """ await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs) - async def trigger_user_turn_started(self, *, enable_user_speaking_frames: bool = True): - """Trigger the `on_user_turn_started` event. - - Args: - enable_user_speaking_frames: If True, the aggregator will emit frames - indicating when the user starts speaking, as well as interruption - frames. This is enabled by default, but you may want to disable it - if another component (e.g., an STT service) is already generating - these frames. - **kwargs: Keyword arguments to be passed to the frame's constructor. - """ - await self._call_event_handler("on_user_turn_started", enable_user_speaking_frames) + async def trigger_user_turn_started(self): + """Trigger the `on_user_turn_started` event.""" + await self._call_event_handler( + "on_user_turn_started", + UserTurnStartedParams(enable_user_speaking_frames=self._enable_user_speaking_frames), + ) diff --git a/src/pipecat/turns/user/external_user_turn_start_strategy.py b/src/pipecat/turns/user/external_user_turn_start_strategy.py index 13b69346d..60f07d023 100644 --- a/src/pipecat/turns/user/external_user_turn_start_strategy.py +++ b/src/pipecat/turns/user/external_user_turn_start_strategy.py @@ -19,6 +19,10 @@ class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy): """ + def __init__(self): + """Initialize the external user turn start strategy.""" + super().__init__(enable_user_speaking_frames=False) + async def process_frame(self, frame: Frame): """Process an incoming frame to detect user turn start. @@ -28,4 +32,4 @@ class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy): await super().process_frame(frame) if isinstance(frame, UserStartedSpeakingFrame): - await self.trigger_user_turn_started(enable_user_speaking_frames=False) + await self.trigger_user_turn_started() From bd9a316d7a853dd2bfd4e312eb26e260c70e4dda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Mon, 29 Dec 2025 14:34:23 -0800 Subject: [PATCH 8/8] transports: don't use interruptions_allowed to avoid deprecation warning --- src/pipecat/transports/base_input.py | 2 +- src/pipecat/transports/base_output.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 4b5052af6..14a6b547f 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -477,7 +477,7 @@ class BaseInputTransport(FrameProcessor): ) # Make sure we notify about interruptions quickly out-of-band. - if should_push_immediate_interruption and self.interruptions_allowed: + if should_push_immediate_interruption and self._allow_interruptions: await self.push_interruption_task_frame_and_wait() elif self.interruption_strategies and self._bot_speaking: logger.debug( diff --git a/src/pipecat/transports/base_output.py b/src/pipecat/transports/base_output.py index bc0eb9d6b..332c9d98a 100644 --- a/src/pipecat/transports/base_output.py +++ b/src/pipecat/transports/base_output.py @@ -498,7 +498,7 @@ class BaseOutputTransport(FrameProcessor): Args: _: The start interruption frame (unused). """ - if not self._transport.interruptions_allowed: + if not self._transport._allow_interruptions: return # Cancel tasks.