Refactor to use frame-based interruption control

- Add EnableInterruptionsFrame custom frame for pipeline-based control
- Subclass VADUserTurnStartStrategy as DynamicVADUserTurnStartStrategy
- Use task.queue_frame() to toggle interruptions idiomatically
- Follows same pattern as MixerEnableFrame in 23-bot-background-sound.py
This commit is contained in:
James Hush
2026-01-14 11:39:45 +08:00
parent c2fdbc9e65
commit dbeb6dd4ab

View File

@@ -13,9 +13,10 @@ transcribing user speech. Every 5 seconds, the bot toggles between:
This is useful when you want to capture what the user says during bot speech
without interrupting the bot's response, and then re-enable interruptions later.
The key mechanism is `user_turn_controller.update_strategies()` which allows
runtime changes to the user turn strategies. The `enable_interruptions` parameter
on start strategies controls whether InterruptionFrame is emitted.
The key mechanism is a custom frame (`EnableInterruptionsFrame`) that is queued
into the pipeline. A custom `DynamicVADUserTurnStartStrategy` subclasses
`VADUserTurnStartStrategy` and listens for this frame to update its
`enable_interruptions` setting at runtime.
In both modes:
- Voice Activity Detection (VAD) continues working
@@ -28,6 +29,7 @@ while the bot talks to observe the different behaviors.
import asyncio
import os
from dataclasses import dataclass
from dotenv import load_dotenv
from loguru import logger
@@ -35,7 +37,7 @@ from loguru import logger
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import Frame, LLMRunFrame, SystemFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -52,25 +54,44 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_start import TranscriptionUserTurnStartStrategy, VADUserTurnStartStrategy
from pipecat.turns.user_start import VADUserTurnStartStrategy
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
def create_user_turn_strategies(enable_interruptions: bool) -> UserTurnStrategies:
"""Create user turn strategies with the specified interruption setting.
@dataclass
class EnableInterruptionsFrame(SystemFrame):
"""A custom frame to dynamically enable or disable interruptions.
Args:
enable_interruptions: If True, user speech will interrupt the bot.
If False, user speech is transcribed but won't interrupt.
Queue this frame into the pipeline to change the interruption behavior
of DynamicVADUserTurnStartStrategy at runtime.
"""
return UserTurnStrategies(
start=[
VADUserTurnStartStrategy(enable_interruptions=enable_interruptions),
TranscriptionUserTurnStartStrategy(enable_interruptions=enable_interruptions),
],
stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())],
)
enable_interruptions: bool
class DynamicVADUserTurnStartStrategy(VADUserTurnStartStrategy):
"""A VAD-based user turn start strategy with dynamic interruption control.
This strategy extends VADUserTurnStartStrategy to listen for
EnableInterruptionsFrame, allowing the enable_interruptions setting
to be changed at runtime via the pipeline.
Example:
# Create strategy with interruptions initially disabled
strategy = DynamicVADUserTurnStartStrategy(enable_interruptions=False)
# Later, toggle interruptions by queuing a frame
await task.queue_frame(EnableInterruptionsFrame(enable_interruptions=True))
"""
async def process_frame(self, frame: Frame):
"""Process frames, updating enable_interruptions when our custom frame arrives."""
if isinstance(frame, EnableInterruptionsFrame):
self._enable_interruptions = frame.enable_interruptions
logger.info(f"Interruptions {'ENABLED' if frame.enable_interruptions else 'DISABLED'}")
await super().process_frame(frame)
load_dotenv(override=True)
@@ -118,13 +139,18 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages)
# Start with interruptions DISABLED.
# The on_client_connected handler below will toggle between enabled/disabled
# every 5 seconds to demonstrate dynamic strategy updates.
# Create a dynamic VAD strategy that can be toggled at runtime via frames.
# This strategy extends VADUserTurnStartStrategy but listens for
# EnableInterruptionsFrame to change enable_interruptions dynamically.
dynamic_strategy = DynamicVADUserTurnStartStrategy(enable_interruptions=False)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=create_user_turn_strategies(enable_interruptions=False),
user_turn_strategies=UserTurnStrategies(
start=[dynamic_strategy],
stop=[TurnAnalyzerUserTurnStopStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())],
),
),
)
@@ -157,19 +183,17 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
# Toggle interruptions every 5 seconds to demonstrate dynamic behavior.
# This runs inline in the event handler (similar to 23-bot-background-sound.py).
# Toggle interruptions every 5 seconds by queuing EnableInterruptionsFrame.
# This is the idiomatic Pipecat way to control behavior via frames.
interruptions_enabled = False
for _ in range(10): # Toggle 10 times (50 seconds total)
await asyncio.sleep(5)
interruptions_enabled = not interruptions_enabled
logger.info(
f"Toggling interruptions: {'ENABLED' if interruptions_enabled else 'DISABLED'}"
)
# @aconchillo I think we need a new frame to handle this case right?
new_strategies = create_user_turn_strategies(enable_interruptions=interruptions_enabled)
await user_aggregator._user_turn_controller.update_strategies(new_strategies)
# Queue a frame to toggle interruptions - the strategy will pick it up
await task.queue_frame(
EnableInterruptionsFrame(enable_interruptions=interruptions_enabled)
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):