diff --git a/examples/foundational/07a-non-interruptible.py b/examples/foundational/07a-non-interruptible.py index e645f0cbb..731a0eb77 100644 --- a/examples/foundational/07a-non-interruptible.py +++ b/examples/foundational/07a-non-interruptible.py @@ -13,9 +13,10 @@ transcribing user speech. Every 5 seconds, the bot toggles between: This is useful when you want to capture what the user says during bot speech without interrupting the bot's response, and then re-enable interruptions later. -The key mechanism is `user_turn_controller.update_strategies()` which allows -runtime changes to the user turn strategies. The `enable_interruptions` parameter -on start strategies controls whether InterruptionFrame is emitted. +The key mechanism is a custom frame (`EnableInterruptionsFrame`) that is queued +into the pipeline. A custom `DynamicVADUserTurnStartStrategy` subclasses +`VADUserTurnStartStrategy` and listens for this frame to update its +`enable_interruptions` setting at runtime. In both modes: - Voice Activity Detection (VAD) continues working @@ -28,6 +29,7 @@ while the bot talks to observe the different behaviors. import asyncio import os +from dataclasses import dataclass from dotenv import load_dotenv from loguru import logger @@ -35,7 +37,7 @@ from loguru import logger from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 from pipecat.audio.vad.silero import SileroVADAnalyzer from pipecat.audio.vad.vad_analyzer import VADParams -from pipecat.frames.frames import LLMRunFrame +from pipecat.frames.frames import Frame, LLMRunFrame, SystemFrame from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineParams, PipelineTask @@ -52,25 +54,44 @@ from pipecat.services.openai.llm import OpenAILLMService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams -from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy, VADUserTurnStartStrategy +from pipecat.turns.user_start import VADUserTurnStartStrategy from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies -def create_user_turn_strategies(enable_interruptions: bool) -> UserTurnStrategies: - """Create user turn strategies with the specified interruption setting. +@dataclass +class EnableInterruptionsFrame(SystemFrame): + """A custom frame to dynamically enable or disable interruptions. - Args: - enable_interruptions: If True, user speech will interrupt the bot. - If False, user speech is transcribed but won't interrupt. + Queue this frame into the pipeline to change the interruption behavior + of DynamicVADUserTurnStartStrategy at runtime. """ - return UserTurnStrategies( - start=[ - VADUserTurnStartStrategy(enable_interruptions=enable_interruptions), - TranscriptionUserTurnStartStrategy(enable_interruptions=enable_interruptions), - ], - stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())], - ) + + enable_interruptions: bool + + +class DynamicVADUserTurnStartStrategy(VADUserTurnStartStrategy): + """A VAD-based user turn start strategy with dynamic interruption control. + + This strategy extends VADUserTurnStartStrategy to listen for + EnableInterruptionsFrame, allowing the enable_interruptions setting + to be changed at runtime via the pipeline. + + Example: + # Create strategy with interruptions initially disabled + strategy = DynamicVADUserTurnStartStrategy(enable_interruptions=False) + + # Later, toggle interruptions by queuing a frame + await task.queue_frame(EnableInterruptionsFrame(enable_interruptions=True)) + """ + + async def process_frame(self, frame: Frame): + """Process frames, updating enable_interruptions when our custom frame arrives.""" + if isinstance(frame, EnableInterruptionsFrame): + self._enable_interruptions = frame.enable_interruptions + logger.info(f"Interruptions {'ENABLED' if frame.enable_interruptions else 'DISABLED'}") + + await super().process_frame(frame) load_dotenv(override=True) @@ -118,13 +139,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): context = LLMContext(messages) - # Start with interruptions DISABLED. - # The on_client_connected handler below will toggle between enabled/disabled - # every 5 seconds to demonstrate dynamic strategy updates. + # Create a dynamic VAD strategy that can be toggled at runtime via frames. + # This strategy extends VADUserTurnStartStrategy but listens for + # EnableInterruptionsFrame to change enable_interruptions dynamically. + dynamic_strategy = DynamicVADUserTurnStartStrategy(enable_interruptions=False) + user_aggregator, assistant_aggregator = LLMContextAggregatorPair( context, user_params=LLMUserAggregatorParams( - user_turn_strategies=create_user_turn_strategies(enable_interruptions=False), + user_turn_strategies=UserTurnStrategies( + start=[dynamic_strategy], + stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())], + ), ), ) @@ -157,19 +183,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): messages.append({"role": "system", "content": "Please introduce yourself to the user."}) await task.queue_frames([LLMRunFrame()]) - # Toggle interruptions every 5 seconds to demonstrate dynamic behavior. - # This runs inline in the event handler (similar to 23-bot-background-sound.py). + # Toggle interruptions every 5 seconds by queuing EnableInterruptionsFrame. + # This is the idiomatic Pipecat way to control behavior via frames. interruptions_enabled = False for _ in range(10): # Toggle 10 times (50 seconds total) await asyncio.sleep(5) interruptions_enabled = not interruptions_enabled - logger.info( - f"Toggling interruptions: {'ENABLED' if interruptions_enabled else 'DISABLED'}" - ) - # @aconchillo I think we need a new frame to handle this case right? - new_strategies = create_user_turn_strategies(enable_interruptions=interruptions_enabled) - await user_aggregator._user_turn_controller.update_strategies(new_strategies) + # Queue a frame to toggle interruptions - the strategy will pick it up + await task.queue_frame( + EnableInterruptionsFrame(enable_interruptions=interruptions_enabled) + ) @transport.event_handler("on_client_disconnected") async def on_client_disconnected(transport, client):