Compare commits

...

5 Commits

Author SHA1 Message Date
Mark Backman
a68641afea Update VoicemailDetector to use ExternalTurnStartStrategies 2025-12-29 11:05:10 -05:00
Aleix Conchillo Flaqué
b48d164af8 turns: add external user and bot turn start strategies
External strategies are strategies where the logic for user turn start and turn
end come from a different processors (e.g. an STT).
2025-12-29 11:05:03 -05:00
Aleix Conchillo Flaqué
7c4802659c LLMUserAggregator: fix backwards compatibility with ExternalTurnStartStrategies 2025-12-28 19:09:46 -08:00
Aleix Conchillo Flaqué
3cb8665401 tests: added external user and bot turn start strategies 2025-12-28 19:09:46 -08:00
Aleix Conchillo Flaqué
ef73e498cb turns: add external user and bot turn start strategies
External strategies are strategies where the logic for user turn start and turn
end come from a different processors (e.g. an STT).
2025-12-28 19:09:46 -08:00
16 changed files with 359 additions and 56 deletions

View File

@@ -10,10 +10,12 @@
- VADUserTurnStartStrategy
- TranscriptionUserTurnStartStrategy
- MinWordsUserTurnStartStrategy
- ExternalUserTurnStartStrategy
Available bot turn start strategies:
- TranscriptionBotTurnStartStrategy
- TurnAnalyzerBotTurnStartStrategy
- ExternalBotTurnStartStrategy
The default strategies are:

View File

@@ -29,6 +29,7 @@ from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
load_dotenv(override=True)
@@ -132,7 +133,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False),
user_params=LLMUserAggregatorParams(
turn_start_strategies=ExternalTurnStartStrategies()
),
)
pipeline = Pipeline(

View File

@@ -27,6 +27,7 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
load_dotenv(override=True)
@@ -71,7 +72,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context, user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False)
context,
user_params=LLMUserAggregatorParams(turn_start_strategies=ExternalTurnStartStrategies()),
)
pipeline = Pipeline(

View File

@@ -33,6 +33,7 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
load_dotenv(override=True)
@@ -78,7 +79,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False),
user_params=LLMUserAggregatorParams(turn_start_strategies=ExternalTurnStartStrategies()),
)
pipeline = Pipeline(

View File

@@ -37,9 +37,13 @@ from pipecat.frames.frames import (
)
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
from pipecat.services.llm_service import LLMService
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
from pipecat.utils.sync.base_notifier import BaseNotifier
from pipecat.utils.sync.event_notifier import EventNotifier
@@ -318,11 +322,13 @@ class ClassificationProcessor(FrameProcessor):
# User started speaking - set the voicemail event
if self._voicemail_detected:
self._voicemail_event.set()
await self.push_frame(frame, direction)
elif isinstance(frame, UserStoppedSpeakingFrame):
# User stopped speaking - clear the voicemail event
if self._voicemail_detected:
self._voicemail_event.clear()
await self.push_frame(frame, direction)
else:
# Pass all non-LLM frames through
@@ -621,7 +627,12 @@ VOICEMAIL SYSTEM (respond "VOICEMAIL"):
# Create the LLM context and aggregators for conversation management
self._context = LLMContext(self._messages)
self._context_aggregator = LLMContextAggregatorPair(self._context)
self._context_aggregator = LLMContextAggregatorPair(
self._context,
user_params=LLMUserAggregatorParams(
turn_start_strategies=ExternalTurnStartStrategies()
),
)
# Create notification system for coordinating between components
self._gate_notifier = EventNotifier() # Signals classification completion

View File

@@ -81,16 +81,11 @@ class LLMUserAggregatorParams:
enable_emulated_vad_interruptions: When True, allows emulated VAD events
to interrupt the bot when it's speaking. When False, emulated speech
is ignored while the bot is speaking.
enable_user_speaking_frames: [DO NOT USE] added for temporary backwards
compatibility.
"""
aggregation_timeout: float = 0.5
turn_emulated_vad_timeout: float = 0.8
enable_emulated_vad_interruptions: bool = False
# Added for backwards compatibility.
enable_user_speaking_frames: bool = True
@dataclass

View File

@@ -65,7 +65,7 @@ from pipecat.processors.aggregators.llm_context import (
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.turns.bot import BaseBotTurnStartStrategy
from pipecat.turns.mute import BaseUserMuteStrategy
from pipecat.turns.turn_start_strategies import TurnStartStrategies
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies, TurnStartStrategies
from pipecat.turns.user import BaseUserTurnStartStrategy
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
from pipecat.utils.time import time_now_iso8601
@@ -76,18 +76,12 @@ class LLMUserAggregatorParams:
"""Parameters for configuring LLM user aggregation behavior.
Parameters:
enable_user_speaking_frames: If True, the aggregator will emit frames
indicating when the user starts and stops speaking, as well as
interruption frames. This is enabled by default, but you may want
to disable it if another component (e.g., an STT service) is already
generating these frames.
turn_start_strategies: User and bot turn start strategies.
user_mute_strategies: List of user mute strategies.
user_turn_end_timeout: Time in seconds to wait before considering the
user's turn finished and starting the bot turn.
"""
enable_user_speaking_frames: bool = True
turn_start_strategies: Optional[TurnStartStrategies] = None
user_mute_strategies: List[BaseUserMuteStrategy] = field(default_factory=list)
user_turn_end_timeout: float = 5.0
@@ -365,9 +359,14 @@ class LLMUserAggregator(LLMContextAggregator):
self._user_turn_end_timeout_task_handler()
)
await self._setup_turn_start_strategies()
await self._setup_user_mute_strategies()
async def _setup_user_mute_strategies(self):
for s in self._params.user_mute_strategies:
await s.setup(self.task_manager)
async def _setup_turn_start_strategies(self):
if self._turn_start_strategies.user:
for s in self._turn_start_strategies.user:
await s.setup(self.task_manager)
@@ -393,9 +392,10 @@ class LLMUserAggregator(LLMContextAggregator):
await self.cancel_task(self._user_turn_end_timeout_task)
self._user_turn_end_timeout_task = None
for s in self._params.user_mute_strategies:
await s.cleanup()
await self._cleanup_turn_start_strategies()
await self._cleanup_user_mute_strategies()
async def _cleanup_turn_start_strategies(self):
if self._turn_start_strategies.user:
for s in self._turn_start_strategies.user:
await s.cleanup()
@@ -404,6 +404,10 @@ class LLMUserAggregator(LLMContextAggregator):
for s in self._turn_start_strategies.bot:
await s.cleanup()
async def _cleanup_user_mute_strategies(self):
for s in self._params.user_mute_strategies:
await s.cleanup()
async def _maybe_mute_frame(self, frame: Frame):
should_mute_frame = self._user_is_muted and isinstance(
frame,
@@ -478,6 +482,10 @@ class LLMUserAggregator(LLMContextAggregator):
" )"
)
await self._cleanup_turn_start_strategies()
self._turn_start_strategies = ExternalTurnStartStrategies()
await self._setup_turn_start_strategies()
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
self._vad_user_speaking = True
@@ -507,11 +515,25 @@ class LLMUserAggregator(LLMContextAggregator):
)
)
async def _on_user_turn_started(self, strategy: BaseUserTurnStartStrategy):
await self._trigger_user_turn_start(strategy)
async def _on_user_turn_started(
self,
strategy: BaseUserTurnStartStrategy,
enable_user_speaking_frames: bool,
):
await self._trigger_user_turn_start(
strategy,
enable_user_speaking_frames=enable_user_speaking_frames,
)
async def _on_bot_turn_started(self, strategy: BaseBotTurnStartStrategy):
await self._trigger_bot_turn_start(strategy)
async def _on_bot_turn_started(
self,
strategy: BaseBotTurnStartStrategy,
enable_user_speaking_frames: bool,
):
await self._trigger_bot_turn_start(
strategy,
enable_user_speaking_frames=enable_user_speaking_frames,
)
async def _on_push_frame(
self,
@@ -529,11 +551,18 @@ class LLMUserAggregator(LLMContextAggregator):
):
await self.broadcast_frame(frame_cls, **kwargs)
async def _trigger_user_turn_start(self, strategy: Optional[BaseUserTurnStartStrategy]):
async def _trigger_user_turn_start(
self,
strategy: Optional[BaseUserTurnStartStrategy],
*,
enable_user_speaking_frames: bool,
):
# Prevent two consecutive user turn starts.
if self._user_turn:
return
logger.debug(f"User started speaking (user turn start strategy: {strategy})")
self._user_turn = True
self._user_turn_end_timeout_event.set()
@@ -542,19 +571,25 @@ class LLMUserAggregator(LLMContextAggregator):
for s in self._turn_start_strategies.user:
await s.reset()
if self._params.enable_user_speaking_frames:
logger.debug(f"User started speaking (user turn start strategy: {strategy})")
if enable_user_speaking_frames:
# TODO(aleix): These frames should really come from the top of the pipeline.
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.broadcast_frame(InterruptionFrame)
await self._call_event_handler("on_user_turn_started", strategy)
async def _trigger_bot_turn_start(self, strategy: Optional[BaseBotTurnStartStrategy]):
async def _trigger_bot_turn_start(
self,
strategy: Optional[BaseBotTurnStartStrategy],
*,
enable_user_speaking_frames: bool,
):
# Prevent two consecutive bot turn starts.
if not self._user_turn:
return
logger.debug(f"User stopped speaking (bot turn start strategy: {strategy})")
self._user_turn = False
self._user_turn_end_timeout_event.set()
@@ -563,8 +598,7 @@ class LLMUserAggregator(LLMContextAggregator):
for s in self._turn_start_strategies.bot:
await s.reset()
if self._params.enable_user_speaking_frames:
logger.debug(f"User stopped speaking (bot turn start strategy: {strategy})")
if enable_user_speaking_frames:
# TODO(aleix): This frame should really come from the top of the pipeline.
await self.broadcast_frame(UserStoppedSpeakingFrame)
@@ -584,7 +618,7 @@ class LLMUserAggregator(LLMContextAggregator):
except asyncio.TimeoutError:
if self._user_turn and not self._vad_user_speaking:
await self._call_event_handler("on_user_turn_end_timeout")
await self._trigger_bot_turn_start(None)
await self._trigger_bot_turn_start(None, enable_user_speaking_frames=True)
class LLMAssistantAggregator(LLMContextAggregator):

View File

@@ -5,6 +5,7 @@
#
from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy
from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy
from pipecat.turns.bot.transcription_bot_turn_start_strategy import (
TranscriptionBotTurnStartStrategy,
)

View File

@@ -88,6 +88,13 @@ class BaseBotTurnStartStrategy(BaseObject):
"""
await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)
async def trigger_bot_turn_started(self):
"""Trigger the `on_bot_turn_started` event."""
await self._call_event_handler("on_bot_turn_started")
async def trigger_bot_turn_started(self, *, enable_user_speaking_frames: bool = True):
"""Trigger the `on_bot_turn_started` event.
Args:
enable_user_speaking_frames: If True, the aggregator will emit frames
indicating when the user stops speaking. This is enabled by default,
but you may want to disable it if another component (e.g., an STT
service) is already generating these frames.
"""
await self._call_event_handler("on_bot_turn_started", enable_user_speaking_frames)

View File

@@ -0,0 +1,127 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Bot turn start strategy triggered by externally emitted frames."""
import asyncio
from typing import Optional
from pipecat.frames.frames import (
Frame,
InterimTranscriptionFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy
from pipecat.utils.asyncio.task_manager import BaseTaskManager
class ExternalBotTurnStartStrategy(BaseBotTurnStartStrategy):
"""Bot turn start strategy controlled by an external processor.
This strategy does not determine when a user turn ends on its own, it relies
on a different processor in the pipeline which is responsible for emitting
`UserStoppedSpeakingFrame` frames.
"""
def __init__(self, *, timeout: float = 0.2):
"""Initialize the transcription-based bot turn start strategy.
Args:
timeout: A short delay used internally to handle consecutive or
slightly delayed transcriptions.
"""
super().__init__()
self._timeout = timeout
self._text = ""
self._user_speaking = False
self._seen_interim_results = False
self._event = asyncio.Event()
self._task: Optional[asyncio.Task] = None
async def reset(self):
"""Reset the strategy to its initial state."""
await super().reset()
self._text = ""
self._user_speaking = False
self._seen_interim_results = False
self._event.clear()
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
await super().setup(task_manager)
self._task = task_manager.create_task(self._task_handler(), f"{self}::_task_handler")
async def cleanup(self):
"""Cleanup the strategy."""
await super().cleanup()
if self._task:
await self.task_manager.cancel_task(self._task)
self._task = None
async def process_frame(self, frame: Frame):
"""Process an incoming frame to update strategy state.
Updates internal transcription text and VAD state. The bot turn will be
triggered when appropriate based on the collected frames.
Args:
frame: The frame to be analyzed.
"""
if isinstance(frame, UserStartedSpeakingFrame):
await self._handle_user_started_speaking(frame)
elif isinstance(frame, UserStoppedSpeakingFrame):
await self._handle_user_stopped_speaking(frame)
elif isinstance(frame, InterimTranscriptionFrame):
await self._handle_interim_transcription(frame)
elif isinstance(frame, TranscriptionFrame):
await self._handle_transcription(frame)
async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame):
"""Handle when the external service indicates the user is speaking."""
self._user_speaking = True
async def _handle_user_stopped_speaking(self, _: UserStoppedSpeakingFrame):
"""Handle when the external service indicates the user has stopped speaking."""
self._user_speaking = False
await self._maybe_trigger_bot_turn_started()
async def _handle_interim_transcription(self, frame: InterimTranscriptionFrame):
self._seen_interim_results = True
async def _handle_transcription(self, frame: TranscriptionFrame):
"""Handle user transcription."""
self._text += frame.text
# We just got a final result, so let's reset interim results.
self._seen_interim_results = False
# Reset aggregation timer.
self._event.set()
async def _task_handler(self):
"""Asynchronously monitor transcriptions and trigger bot turn when ready.
If transcription text exists and the user is not currently speaking,
triggers the bot turn. Handles multiple or delayed transcriptions
gracefully.
"""
while True:
try:
await asyncio.wait_for(self._event.wait(), timeout=self._timeout)
self._event.clear()
except asyncio.TimeoutError:
await self._maybe_trigger_bot_turn_started()
async def _maybe_trigger_bot_turn_started(self):
if not self._user_speaking and not self._seen_interim_results and self._text:
await self.trigger_bot_turn_started(enable_user_speaking_frames=False)

View File

@@ -11,10 +11,12 @@ from typing import List, Optional
from pipecat.turns.bot import (
BaseBotTurnStartStrategy,
ExternalBotTurnStartStrategy,
TranscriptionBotTurnStartStrategy,
)
from pipecat.turns.user import (
BaseUserTurnStartStrategy,
ExternalUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
VADUserTurnStartStrategy,
)
@@ -49,3 +51,25 @@ class TurnStartStrategies:
self.user = [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()]
if not self.bot:
self.bot = [TranscriptionBotTurnStartStrategy()]
@dataclass
class ExternalTurnStartStrategies(TurnStartStrategies):
"""Default container for external user and bot turn start strategies.
This class provides a convenience default for configuring external turn
control. It preconfigures `TurnStartStrategies` with
`ExternalUserTurnStartStrategy` and `ExternalBotTurnStartStrategy`, allowing
external processors (such as services) to control when user and bot turns
start.
When using this container, the user aggregator does not push
`UserStartedSpeakingFrame` or `UserStoppedSpeakingFrame` frames, and does
not generate interruptions. These signals are expected to be provided by an
external processor.
"""
def __post_init__(self):
self.user = [ExternalUserTurnStartStrategy()]
self.bot = [ExternalBotTurnStartStrategy()]

View File

@@ -5,6 +5,7 @@
#
from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy
from pipecat.turns.user.external_user_turn_start_strategy import ExternalUserTurnStartStrategy
from pipecat.turns.user.min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy
from pipecat.turns.user.transcription_user_turn_start_strategy import (
TranscriptionUserTurnStartStrategy,

View File

@@ -88,6 +88,15 @@ class BaseUserTurnStartStrategy(BaseObject):
"""
await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)
async def trigger_user_turn_started(self):
"""Trigger the `on_user_turn_started` event."""
await self._call_event_handler("on_user_turn_started")
async def trigger_user_turn_started(self, *, enable_user_speaking_frames: bool = True):
"""Trigger the `on_user_turn_started` event.
Args:
enable_user_speaking_frames: If True, the aggregator will emit frames
indicating when the user starts speaking, as well as interruption
frames. This is enabled by default, but you may want to disable it
if another component (e.g., an STT service) is already generating
these frames.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self._call_event_handler("on_user_turn_started", enable_user_speaking_frames)

View File

@@ -0,0 +1,31 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User turn start strategy triggered by externally emitted frames."""
from pipecat.frames.frames import Frame, UserStartedSpeakingFrame
from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy
class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy):
"""User turn start strategy controlled by an external processor.
This strategy does not determine when a user turn starts on its own, instead
it relies on a different processor in the pipeline which is responsible for
emitting `UserStartedSpeakingFrame` frames.
"""
async def process_frame(self, frame: Frame):
"""Process an incoming frame to detect user turn start.
Args:
frame: The frame to be analyzed.
"""
await super().process_frame(frame)
if isinstance(frame, UserStartedSpeakingFrame):
await self.trigger_user_turn_started(enable_user_speaking_frames=False)

View File

@@ -10,10 +10,13 @@ import unittest
from pipecat.frames.frames import (
InterimTranscriptionFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.bot import TranscriptionBotTurnStartStrategy
from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
AGGREGATION_TIMEOUT = 0.1
@@ -30,7 +33,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -55,7 +58,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -86,7 +89,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -133,7 +136,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -167,7 +170,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -209,7 +212,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -239,7 +242,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -275,7 +278,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -313,7 +316,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -347,7 +350,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -392,7 +395,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -412,7 +415,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -437,7 +440,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy):
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -472,3 +475,35 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
# at least the aggregation timeout.
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
self.assertTrue(should_start)
class TestExternalBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
async def test_external_strategy(self):
strategy = ExternalBotTurnStartStrategy()
should_start = None
@strategy.event_handler("on_bot_turn_started")
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
await strategy.process_frame(VADUserStartedSpeakingFrame())
self.assertFalse(should_start)
await strategy.process_frame(UserStartedSpeakingFrame())
self.assertFalse(should_start)
await strategy.process_frame(UserStoppedSpeakingFrame())
self.assertFalse(should_start)
await strategy.process_frame(UserStartedSpeakingFrame())
self.assertFalse(should_start)
await strategy.process_frame(
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
)
self.assertFalse(should_start)
await strategy.process_frame(UserStoppedSpeakingFrame())
self.assertTrue(should_start)

View File

@@ -10,10 +10,12 @@ from pipecat.frames.frames import (
BotStartedSpeakingFrame,
InterimTranscriptionFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.turns.user import (
ExternalUserTurnStartStrategy,
MinWordsUserTurnStartStrategy,
TranscriptionUserTurnStartStrategy,
VADUserTurnStartStrategy,
@@ -27,7 +29,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -59,7 +61,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -81,7 +83,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -102,7 +104,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -115,7 +117,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -132,7 +134,7 @@ class TestVADUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -150,7 +152,7 @@ class TestTranscriptionUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy):
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
@@ -162,3 +164,21 @@ class TestTranscriptionUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="", timestamp="now"))
self.assertTrue(should_start)
class TestExternalUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
async def test_external_strategy(self):
strategy = ExternalUserTurnStartStrategy()
should_start = None
@strategy.event_handler("on_user_turn_started")
async def on_user_turn_started(strategy, enable_user_speaking_frames):
nonlocal should_start
should_start = True
await strategy.process_frame(VADUserStartedSpeakingFrame())
self.assertFalse(should_start)
await strategy.process_frame(UserStartedSpeakingFrame())
self.assertTrue(should_start)