Compare commits
5 Commits
main
...
mb/voicema
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a68641afea | ||
|
|
b48d164af8 | ||
|
|
7c4802659c | ||
|
|
3cb8665401 | ||
|
|
ef73e498cb |
@@ -10,10 +10,12 @@
|
||||
- VADUserTurnStartStrategy
|
||||
- TranscriptionUserTurnStartStrategy
|
||||
- MinWordsUserTurnStartStrategy
|
||||
- ExternalUserTurnStartStrategy
|
||||
|
||||
Available bot turn start strategies:
|
||||
- TranscriptionBotTurnStartStrategy
|
||||
- TurnAnalyzerBotTurnStartStrategy
|
||||
- ExternalBotTurnStartStrategy
|
||||
|
||||
The default strategies are:
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -132,7 +133,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False),
|
||||
user_params=LLMUserAggregatorParams(
|
||||
turn_start_strategies=ExternalTurnStartStrategies()
|
||||
),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -27,6 +27,7 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -71,7 +72,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
context, user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False)
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(turn_start_strategies=ExternalTurnStartStrategies()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -33,6 +33,7 @@ from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
@@ -78,7 +79,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
context = LLMContext(messages)
|
||||
context_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(enable_user_speaking_frames=False),
|
||||
user_params=LLMUserAggregatorParams(turn_start_strategies=ExternalTurnStartStrategies()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
|
||||
@@ -37,9 +37,13 @@ from pipecat.frames.frames import (
|
||||
)
|
||||
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor, FrameProcessorSetup
|
||||
from pipecat.services.llm_service import LLMService
|
||||
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies
|
||||
from pipecat.utils.sync.base_notifier import BaseNotifier
|
||||
from pipecat.utils.sync.event_notifier import EventNotifier
|
||||
|
||||
@@ -318,11 +322,13 @@ class ClassificationProcessor(FrameProcessor):
|
||||
# User started speaking - set the voicemail event
|
||||
if self._voicemail_detected:
|
||||
self._voicemail_event.set()
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
# User stopped speaking - clear the voicemail event
|
||||
if self._voicemail_detected:
|
||||
self._voicemail_event.clear()
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
else:
|
||||
# Pass all non-LLM frames through
|
||||
@@ -621,7 +627,12 @@ VOICEMAIL SYSTEM (respond "VOICEMAIL"):
|
||||
|
||||
# Create the LLM context and aggregators for conversation management
|
||||
self._context = LLMContext(self._messages)
|
||||
self._context_aggregator = LLMContextAggregatorPair(self._context)
|
||||
self._context_aggregator = LLMContextAggregatorPair(
|
||||
self._context,
|
||||
user_params=LLMUserAggregatorParams(
|
||||
turn_start_strategies=ExternalTurnStartStrategies()
|
||||
),
|
||||
)
|
||||
|
||||
# Create notification system for coordinating between components
|
||||
self._gate_notifier = EventNotifier() # Signals classification completion
|
||||
|
||||
@@ -81,16 +81,11 @@ class LLMUserAggregatorParams:
|
||||
enable_emulated_vad_interruptions: When True, allows emulated VAD events
|
||||
to interrupt the bot when it's speaking. When False, emulated speech
|
||||
is ignored while the bot is speaking.
|
||||
enable_user_speaking_frames: [DO NOT USE] added for temporary backwards
|
||||
compatibility.
|
||||
|
||||
"""
|
||||
|
||||
aggregation_timeout: float = 0.5
|
||||
turn_emulated_vad_timeout: float = 0.8
|
||||
enable_emulated_vad_interruptions: bool = False
|
||||
# Added for backwards compatibility.
|
||||
enable_user_speaking_frames: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -65,7 +65,7 @@ from pipecat.processors.aggregators.llm_context import (
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.turns.bot import BaseBotTurnStartStrategy
|
||||
from pipecat.turns.mute import BaseUserMuteStrategy
|
||||
from pipecat.turns.turn_start_strategies import TurnStartStrategies
|
||||
from pipecat.turns.turn_start_strategies import ExternalTurnStartStrategies, TurnStartStrategies
|
||||
from pipecat.turns.user import BaseUserTurnStartStrategy
|
||||
from pipecat.utils.string import TextPartForConcatenation, concatenate_aggregated_text
|
||||
from pipecat.utils.time import time_now_iso8601
|
||||
@@ -76,18 +76,12 @@ class LLMUserAggregatorParams:
|
||||
"""Parameters for configuring LLM user aggregation behavior.
|
||||
|
||||
Parameters:
|
||||
enable_user_speaking_frames: If True, the aggregator will emit frames
|
||||
indicating when the user starts and stops speaking, as well as
|
||||
interruption frames. This is enabled by default, but you may want
|
||||
to disable it if another component (e.g., an STT service) is already
|
||||
generating these frames.
|
||||
turn_start_strategies: User and bot turn start strategies.
|
||||
user_mute_strategies: List of user mute strategies.
|
||||
user_turn_end_timeout: Time in seconds to wait before considering the
|
||||
user's turn finished and starting the bot turn.
|
||||
"""
|
||||
|
||||
enable_user_speaking_frames: bool = True
|
||||
turn_start_strategies: Optional[TurnStartStrategies] = None
|
||||
user_mute_strategies: List[BaseUserMuteStrategy] = field(default_factory=list)
|
||||
user_turn_end_timeout: float = 5.0
|
||||
@@ -365,9 +359,14 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
self._user_turn_end_timeout_task_handler()
|
||||
)
|
||||
|
||||
await self._setup_turn_start_strategies()
|
||||
await self._setup_user_mute_strategies()
|
||||
|
||||
async def _setup_user_mute_strategies(self):
|
||||
for s in self._params.user_mute_strategies:
|
||||
await s.setup(self.task_manager)
|
||||
|
||||
async def _setup_turn_start_strategies(self):
|
||||
if self._turn_start_strategies.user:
|
||||
for s in self._turn_start_strategies.user:
|
||||
await s.setup(self.task_manager)
|
||||
@@ -393,9 +392,10 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
await self.cancel_task(self._user_turn_end_timeout_task)
|
||||
self._user_turn_end_timeout_task = None
|
||||
|
||||
for s in self._params.user_mute_strategies:
|
||||
await s.cleanup()
|
||||
await self._cleanup_turn_start_strategies()
|
||||
await self._cleanup_user_mute_strategies()
|
||||
|
||||
async def _cleanup_turn_start_strategies(self):
|
||||
if self._turn_start_strategies.user:
|
||||
for s in self._turn_start_strategies.user:
|
||||
await s.cleanup()
|
||||
@@ -404,6 +404,10 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
for s in self._turn_start_strategies.bot:
|
||||
await s.cleanup()
|
||||
|
||||
async def _cleanup_user_mute_strategies(self):
|
||||
for s in self._params.user_mute_strategies:
|
||||
await s.cleanup()
|
||||
|
||||
async def _maybe_mute_frame(self, frame: Frame):
|
||||
should_mute_frame = self._user_is_muted and isinstance(
|
||||
frame,
|
||||
@@ -478,6 +482,10 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
" )"
|
||||
)
|
||||
|
||||
await self._cleanup_turn_start_strategies()
|
||||
self._turn_start_strategies = ExternalTurnStartStrategies()
|
||||
await self._setup_turn_start_strategies()
|
||||
|
||||
async def _handle_vad_user_started_speaking(self, frame: VADUserStartedSpeakingFrame):
|
||||
self._vad_user_speaking = True
|
||||
|
||||
@@ -507,11 +515,25 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
)
|
||||
)
|
||||
|
||||
async def _on_user_turn_started(self, strategy: BaseUserTurnStartStrategy):
|
||||
await self._trigger_user_turn_start(strategy)
|
||||
async def _on_user_turn_started(
|
||||
self,
|
||||
strategy: BaseUserTurnStartStrategy,
|
||||
enable_user_speaking_frames: bool,
|
||||
):
|
||||
await self._trigger_user_turn_start(
|
||||
strategy,
|
||||
enable_user_speaking_frames=enable_user_speaking_frames,
|
||||
)
|
||||
|
||||
async def _on_bot_turn_started(self, strategy: BaseBotTurnStartStrategy):
|
||||
await self._trigger_bot_turn_start(strategy)
|
||||
async def _on_bot_turn_started(
|
||||
self,
|
||||
strategy: BaseBotTurnStartStrategy,
|
||||
enable_user_speaking_frames: bool,
|
||||
):
|
||||
await self._trigger_bot_turn_start(
|
||||
strategy,
|
||||
enable_user_speaking_frames=enable_user_speaking_frames,
|
||||
)
|
||||
|
||||
async def _on_push_frame(
|
||||
self,
|
||||
@@ -529,11 +551,18 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
):
|
||||
await self.broadcast_frame(frame_cls, **kwargs)
|
||||
|
||||
async def _trigger_user_turn_start(self, strategy: Optional[BaseUserTurnStartStrategy]):
|
||||
async def _trigger_user_turn_start(
|
||||
self,
|
||||
strategy: Optional[BaseUserTurnStartStrategy],
|
||||
*,
|
||||
enable_user_speaking_frames: bool,
|
||||
):
|
||||
# Prevent two consecutive user turn starts.
|
||||
if self._user_turn:
|
||||
return
|
||||
|
||||
logger.debug(f"User started speaking (user turn start strategy: {strategy})")
|
||||
|
||||
self._user_turn = True
|
||||
self._user_turn_end_timeout_event.set()
|
||||
|
||||
@@ -542,19 +571,25 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
for s in self._turn_start_strategies.user:
|
||||
await s.reset()
|
||||
|
||||
if self._params.enable_user_speaking_frames:
|
||||
logger.debug(f"User started speaking (user turn start strategy: {strategy})")
|
||||
if enable_user_speaking_frames:
|
||||
# TODO(aleix): These frames should really come from the top of the pipeline.
|
||||
await self.broadcast_frame(UserStartedSpeakingFrame)
|
||||
await self.broadcast_frame(InterruptionFrame)
|
||||
|
||||
await self._call_event_handler("on_user_turn_started", strategy)
|
||||
|
||||
async def _trigger_bot_turn_start(self, strategy: Optional[BaseBotTurnStartStrategy]):
|
||||
async def _trigger_bot_turn_start(
|
||||
self,
|
||||
strategy: Optional[BaseBotTurnStartStrategy],
|
||||
*,
|
||||
enable_user_speaking_frames: bool,
|
||||
):
|
||||
# Prevent two consecutive bot turn starts.
|
||||
if not self._user_turn:
|
||||
return
|
||||
|
||||
logger.debug(f"User stopped speaking (bot turn start strategy: {strategy})")
|
||||
|
||||
self._user_turn = False
|
||||
self._user_turn_end_timeout_event.set()
|
||||
|
||||
@@ -563,8 +598,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
for s in self._turn_start_strategies.bot:
|
||||
await s.reset()
|
||||
|
||||
if self._params.enable_user_speaking_frames:
|
||||
logger.debug(f"User stopped speaking (bot turn start strategy: {strategy})")
|
||||
if enable_user_speaking_frames:
|
||||
# TODO(aleix): This frame should really come from the top of the pipeline.
|
||||
await self.broadcast_frame(UserStoppedSpeakingFrame)
|
||||
|
||||
@@ -584,7 +618,7 @@ class LLMUserAggregator(LLMContextAggregator):
|
||||
except asyncio.TimeoutError:
|
||||
if self._user_turn and not self._vad_user_speaking:
|
||||
await self._call_event_handler("on_user_turn_end_timeout")
|
||||
await self._trigger_bot_turn_start(None)
|
||||
await self._trigger_bot_turn_start(None, enable_user_speaking_frames=True)
|
||||
|
||||
|
||||
class LLMAssistantAggregator(LLMContextAggregator):
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy
|
||||
from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy
|
||||
from pipecat.turns.bot.transcription_bot_turn_start_strategy import (
|
||||
TranscriptionBotTurnStartStrategy,
|
||||
)
|
||||
|
||||
@@ -88,6 +88,13 @@ class BaseBotTurnStartStrategy(BaseObject):
|
||||
"""
|
||||
await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)
|
||||
|
||||
async def trigger_bot_turn_started(self):
|
||||
"""Trigger the `on_bot_turn_started` event."""
|
||||
await self._call_event_handler("on_bot_turn_started")
|
||||
async def trigger_bot_turn_started(self, *, enable_user_speaking_frames: bool = True):
|
||||
"""Trigger the `on_bot_turn_started` event.
|
||||
|
||||
Args:
|
||||
enable_user_speaking_frames: If True, the aggregator will emit frames
|
||||
indicating when the user stops speaking. This is enabled by default,
|
||||
but you may want to disable it if another component (e.g., an STT
|
||||
service) is already generating these frames.
|
||||
"""
|
||||
await self._call_event_handler("on_bot_turn_started", enable_user_speaking_frames)
|
||||
|
||||
127
src/pipecat/turns/bot/external_bot_turn_start_strategy.py
Normal file
127
src/pipecat/turns/bot/external_bot_turn_start_strategy.py
Normal file
@@ -0,0 +1,127 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Bot turn start strategy triggered by externally emitted frames."""
|
||||
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.bot.base_bot_turn_start_strategy import BaseBotTurnStartStrategy
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
|
||||
|
||||
class ExternalBotTurnStartStrategy(BaseBotTurnStartStrategy):
|
||||
"""Bot turn start strategy controlled by an external processor.
|
||||
|
||||
This strategy does not determine when a user turn ends on its own, it relies
|
||||
on a different processor in the pipeline which is responsible for emitting
|
||||
`UserStoppedSpeakingFrame` frames.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *, timeout: float = 0.2):
|
||||
"""Initialize the transcription-based bot turn start strategy.
|
||||
|
||||
Args:
|
||||
timeout: A short delay used internally to handle consecutive or
|
||||
slightly delayed transcriptions.
|
||||
"""
|
||||
super().__init__()
|
||||
self._timeout = timeout
|
||||
self._text = ""
|
||||
self._user_speaking = False
|
||||
self._seen_interim_results = False
|
||||
self._event = asyncio.Event()
|
||||
self._task: Optional[asyncio.Task] = None
|
||||
|
||||
async def reset(self):
|
||||
"""Reset the strategy to its initial state."""
|
||||
await super().reset()
|
||||
self._text = ""
|
||||
self._user_speaking = False
|
||||
self._seen_interim_results = False
|
||||
self._event.clear()
|
||||
|
||||
async def setup(self, task_manager: BaseTaskManager):
|
||||
"""Initialize the strategy with the given task manager.
|
||||
|
||||
Args:
|
||||
task_manager: The task manager to be associated with this instance.
|
||||
"""
|
||||
await super().setup(task_manager)
|
||||
self._task = task_manager.create_task(self._task_handler(), f"{self}::_task_handler")
|
||||
|
||||
async def cleanup(self):
|
||||
"""Cleanup the strategy."""
|
||||
await super().cleanup()
|
||||
if self._task:
|
||||
await self.task_manager.cancel_task(self._task)
|
||||
self._task = None
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
"""Process an incoming frame to update strategy state.
|
||||
|
||||
Updates internal transcription text and VAD state. The bot turn will be
|
||||
triggered when appropriate based on the collected frames.
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
|
||||
"""
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self._handle_user_started_speaking(frame)
|
||||
elif isinstance(frame, UserStoppedSpeakingFrame):
|
||||
await self._handle_user_stopped_speaking(frame)
|
||||
elif isinstance(frame, InterimTranscriptionFrame):
|
||||
await self._handle_interim_transcription(frame)
|
||||
elif isinstance(frame, TranscriptionFrame):
|
||||
await self._handle_transcription(frame)
|
||||
|
||||
async def _handle_user_started_speaking(self, _: UserStartedSpeakingFrame):
|
||||
"""Handle when the external service indicates the user is speaking."""
|
||||
self._user_speaking = True
|
||||
|
||||
async def _handle_user_stopped_speaking(self, _: UserStoppedSpeakingFrame):
|
||||
"""Handle when the external service indicates the user has stopped speaking."""
|
||||
self._user_speaking = False
|
||||
await self._maybe_trigger_bot_turn_started()
|
||||
|
||||
async def _handle_interim_transcription(self, frame: InterimTranscriptionFrame):
|
||||
self._seen_interim_results = True
|
||||
|
||||
async def _handle_transcription(self, frame: TranscriptionFrame):
|
||||
"""Handle user transcription."""
|
||||
self._text += frame.text
|
||||
# We just got a final result, so let's reset interim results.
|
||||
self._seen_interim_results = False
|
||||
# Reset aggregation timer.
|
||||
self._event.set()
|
||||
|
||||
async def _task_handler(self):
|
||||
"""Asynchronously monitor transcriptions and trigger bot turn when ready.
|
||||
|
||||
If transcription text exists and the user is not currently speaking,
|
||||
triggers the bot turn. Handles multiple or delayed transcriptions
|
||||
gracefully.
|
||||
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.wait_for(self._event.wait(), timeout=self._timeout)
|
||||
self._event.clear()
|
||||
except asyncio.TimeoutError:
|
||||
await self._maybe_trigger_bot_turn_started()
|
||||
|
||||
async def _maybe_trigger_bot_turn_started(self):
|
||||
if not self._user_speaking and not self._seen_interim_results and self._text:
|
||||
await self.trigger_bot_turn_started(enable_user_speaking_frames=False)
|
||||
@@ -11,10 +11,12 @@ from typing import List, Optional
|
||||
|
||||
from pipecat.turns.bot import (
|
||||
BaseBotTurnStartStrategy,
|
||||
ExternalBotTurnStartStrategy,
|
||||
TranscriptionBotTurnStartStrategy,
|
||||
)
|
||||
from pipecat.turns.user import (
|
||||
BaseUserTurnStartStrategy,
|
||||
ExternalUserTurnStartStrategy,
|
||||
TranscriptionUserTurnStartStrategy,
|
||||
VADUserTurnStartStrategy,
|
||||
)
|
||||
@@ -49,3 +51,25 @@ class TurnStartStrategies:
|
||||
self.user = [VADUserTurnStartStrategy(), TranscriptionUserTurnStartStrategy()]
|
||||
if not self.bot:
|
||||
self.bot = [TranscriptionBotTurnStartStrategy()]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExternalTurnStartStrategies(TurnStartStrategies):
|
||||
"""Default container for external user and bot turn start strategies.
|
||||
|
||||
This class provides a convenience default for configuring external turn
|
||||
control. It preconfigures `TurnStartStrategies` with
|
||||
`ExternalUserTurnStartStrategy` and `ExternalBotTurnStartStrategy`, allowing
|
||||
external processors (such as services) to control when user and bot turns
|
||||
start.
|
||||
|
||||
When using this container, the user aggregator does not push
|
||||
`UserStartedSpeakingFrame` or `UserStoppedSpeakingFrame` frames, and does
|
||||
not generate interruptions. These signals are expected to be provided by an
|
||||
external processor.
|
||||
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
self.user = [ExternalUserTurnStartStrategy()]
|
||||
self.bot = [ExternalBotTurnStartStrategy()]
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#
|
||||
|
||||
from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
from pipecat.turns.user.external_user_turn_start_strategy import ExternalUserTurnStartStrategy
|
||||
from pipecat.turns.user.min_words_user_turn_start_strategy import MinWordsUserTurnStartStrategy
|
||||
from pipecat.turns.user.transcription_user_turn_start_strategy import (
|
||||
TranscriptionUserTurnStartStrategy,
|
||||
|
||||
@@ -88,6 +88,15 @@ class BaseUserTurnStartStrategy(BaseObject):
|
||||
"""
|
||||
await self._call_event_handler("on_broadcast_frame", frame_cls, **kwargs)
|
||||
|
||||
async def trigger_user_turn_started(self):
|
||||
"""Trigger the `on_user_turn_started` event."""
|
||||
await self._call_event_handler("on_user_turn_started")
|
||||
async def trigger_user_turn_started(self, *, enable_user_speaking_frames: bool = True):
|
||||
"""Trigger the `on_user_turn_started` event.
|
||||
|
||||
Args:
|
||||
enable_user_speaking_frames: If True, the aggregator will emit frames
|
||||
indicating when the user starts speaking, as well as interruption
|
||||
frames. This is enabled by default, but you may want to disable it
|
||||
if another component (e.g., an STT service) is already generating
|
||||
these frames.
|
||||
**kwargs: Keyword arguments to be passed to the frame's constructor.
|
||||
"""
|
||||
await self._call_event_handler("on_user_turn_started", enable_user_speaking_frames)
|
||||
|
||||
31
src/pipecat/turns/user/external_user_turn_start_strategy.py
Normal file
31
src/pipecat/turns/user/external_user_turn_start_strategy.py
Normal file
@@ -0,0 +1,31 @@
|
||||
#
|
||||
# Copyright (c) 2024–2025, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""User turn start strategy triggered by externally emitted frames."""
|
||||
|
||||
from pipecat.frames.frames import Frame, UserStartedSpeakingFrame
|
||||
from pipecat.turns.user.base_user_turn_start_strategy import BaseUserTurnStartStrategy
|
||||
|
||||
|
||||
class ExternalUserTurnStartStrategy(BaseUserTurnStartStrategy):
|
||||
"""User turn start strategy controlled by an external processor.
|
||||
|
||||
This strategy does not determine when a user turn starts on its own, instead
|
||||
it relies on a different processor in the pipeline which is responsible for
|
||||
emitting `UserStartedSpeakingFrame` frames.
|
||||
|
||||
"""
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
"""Process an incoming frame to detect user turn start.
|
||||
|
||||
Args:
|
||||
frame: The frame to be analyzed.
|
||||
"""
|
||||
await super().process_frame(frame)
|
||||
|
||||
if isinstance(frame, UserStartedSpeakingFrame):
|
||||
await self.trigger_user_turn_started(enable_user_speaking_frames=False)
|
||||
@@ -10,10 +10,13 @@ import unittest
|
||||
from pipecat.frames.frames import (
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
UserStoppedSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.bot import TranscriptionBotTurnStartStrategy
|
||||
from pipecat.turns.bot.external_bot_turn_start_strategy import ExternalBotTurnStartStrategy
|
||||
from pipecat.utils.asyncio.task_manager import TaskManager, TaskManagerParams
|
||||
|
||||
AGGREGATION_TIMEOUT = 0.1
|
||||
@@ -30,7 +33,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -55,7 +58,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -86,7 +89,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -133,7 +136,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -167,7 +170,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -209,7 +212,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -239,7 +242,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -275,7 +278,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -313,7 +316,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -347,7 +350,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -392,7 +395,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -412,7 +415,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -437,7 +440,7 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy):
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -472,3 +475,35 @@ class TestTranscriptionBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
# at least the aggregation timeout.
|
||||
await asyncio.sleep(AGGREGATION_TIMEOUT + 0.1)
|
||||
self.assertTrue(should_start)
|
||||
|
||||
|
||||
class TestExternalBotTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_external_strategy(self):
|
||||
strategy = ExternalBotTurnStartStrategy()
|
||||
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_bot_turn_started")
|
||||
async def on_bot_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(UserStartedSpeakingFrame())
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(UserStoppedSpeakingFrame())
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(UserStartedSpeakingFrame())
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(
|
||||
TranscriptionFrame(text="How are you?", user_id="cat", timestamp="")
|
||||
)
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(UserStoppedSpeakingFrame())
|
||||
self.assertTrue(should_start)
|
||||
|
||||
@@ -10,10 +10,12 @@ from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
InterimTranscriptionFrame,
|
||||
TranscriptionFrame,
|
||||
UserStartedSpeakingFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.turns.user import (
|
||||
ExternalUserTurnStartStrategy,
|
||||
MinWordsUserTurnStartStrategy,
|
||||
TranscriptionUserTurnStartStrategy,
|
||||
VADUserTurnStartStrategy,
|
||||
@@ -27,7 +29,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -59,7 +61,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -81,7 +83,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -102,7 +104,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -115,7 +117,7 @@ class TestMinWordsInterruptionStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -132,7 +134,7 @@ class TestVADUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -150,7 +152,7 @@ class TestTranscriptionUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy):
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
@@ -162,3 +164,21 @@ class TestTranscriptionUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
|
||||
await strategy.process_frame(TranscriptionFrame(text="Hello!", user_id="", timestamp="now"))
|
||||
self.assertTrue(should_start)
|
||||
|
||||
|
||||
class TestExternalUserTurnStartStrategy(unittest.IsolatedAsyncioTestCase):
|
||||
async def test_external_strategy(self):
|
||||
strategy = ExternalUserTurnStartStrategy()
|
||||
|
||||
should_start = None
|
||||
|
||||
@strategy.event_handler("on_user_turn_started")
|
||||
async def on_user_turn_started(strategy, enable_user_speaking_frames):
|
||||
nonlocal should_start
|
||||
should_start = True
|
||||
|
||||
await strategy.process_frame(VADUserStartedSpeakingFrame())
|
||||
self.assertFalse(should_start)
|
||||
|
||||
await strategy.process_frame(UserStartedSpeakingFrame())
|
||||
self.assertTrue(should_start)
|
||||
|
||||
Reference in New Issue
Block a user