diff --git a/src/pipecat/turns/mute/__init__.py b/src/pipecat/turns/mute/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/pipecat/turns/mute/always_user_mute_strategy.py b/src/pipecat/turns/mute/always_user_mute_strategy.py new file mode 100644 index 000000000..0bbe85173 --- /dev/null +++ b/src/pipecat/turns/mute/always_user_mute_strategy.py @@ -0,0 +1,41 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User mute strategy that always mutes the user while the bot is speaking.""" + +from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame +from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy + + +class AlwaysUserMuteStrategy(BaseUserMuteStrategy): + """User mute strategy that always mutes the user while the bot is speaking.""" + + def __init__(self): + """Initialize the always user mute strategy.""" + super().__init__() + self._bot_speaking = False + + async def reset(self): + """Reset the strategy to its initial state.""" + self._bot_speaking = False + + async def process_frame(self, frame: Frame) -> bool: + """Process an incoming frame. + + Args: + frame: The frame to be processed. + + Returns: + Whether the strategy is muted. + """ + await super().process_frame(frame) + + if isinstance(frame, BotStartedSpeakingFrame): + self._bot_speaking = True + elif isinstance(frame, BotStoppedSpeakingFrame): + self._bot_speaking = False + + return self._bot_speaking diff --git a/src/pipecat/turns/mute/base_user_mute_strategy.py b/src/pipecat/turns/mute/base_user_mute_strategy.py new file mode 100644 index 000000000..dd3075c96 --- /dev/null +++ b/src/pipecat/turns/mute/base_user_mute_strategy.py @@ -0,0 +1,69 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""Base strategy for deciding whether user frames should be muted.""" + +from typing import Optional + +from pipecat.frames.frames import Frame +from pipecat.utils.asyncio.task_manager import BaseTaskManager +from pipecat.utils.base_object import BaseObject + + +class BaseUserMuteStrategy(BaseObject): + """Base class for strategies that decide whether user frames should be muted. + + A user mute strategy determines whether incoming user frames should be + suppressed based on the *current system state*. + + Typical heuristics include: + - The bot is currently speaking, so user should be muted + - A function call or tool execution is in progress + - The system is otherwise not ready to accept user input + + The strategy is evaluated per frame and returns a boolean indicating whether + the user should be muted. + + """ + + def __init__(self, **kwargs): + """Initialize the base user mute strategy.""" + super().__init__(**kwargs) + self._task_manager: Optional[BaseTaskManager] = None + + @property + def task_manager(self) -> BaseTaskManager: + """Returns the configured task manager.""" + if not self._task_manager: + raise RuntimeError(f"{self} user mute strategy was not properly setup") + return self._task_manager + + async def setup(self, task_manager: BaseTaskManager): + """Initialize the strategy with the given task manager. + + Args: + task_manager: The task manager to be associated with this instance. + """ + self._task_manager = task_manager + + async def cleanup(self): + """Cleanup the strategy.""" + pass + + async def reset(self): + """Reset the strategy to its initial state.""" + pass + + async def process_frame(self, frame: Frame) -> bool: + """Process an incoming frame. + + Args: + frame: The frame to be processed. + + Returns: + Whether the strategy is muted. + """ + return False diff --git a/src/pipecat/turns/mute/first_speech_user_mute_strategy.py b/src/pipecat/turns/mute/first_speech_user_mute_strategy.py new file mode 100644 index 000000000..29e59ae99 --- /dev/null +++ b/src/pipecat/turns/mute/first_speech_user_mute_strategy.py @@ -0,0 +1,64 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User mute strategy that mutes the user only during the bot’s first speech.""" + +from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame +from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy + + +class FirstSpeechUserMuteStrategy(BaseUserMuteStrategy): + """User mute strategy that mutes the user only during the bot’s first speech. + + This strategy allows user input before the bot starts speaking. Once the bot + begins its first speaking turn, user frames are muted until the bot finishes + that speech. After the bot completes its first speaking turn, user input is + no longer muted by this strategy. + + Use this strategy when early user input is acceptable, but interruptions + during the bot’s initial response should be prevented. + + """ + + def __init__(self): + """Initialize the first-bot-speech user mute strategy.""" + super().__init__() + self._bot_speaking = False + self._first_speech_handled = False + + async def reset(self): + """Reset the strategy to its initial state.""" + self._bot_speaking = False + self._first_speech_handled = False + + async def process_frame(self, frame: Frame) -> bool: + """Process an incoming frame. + + Args: + frame: The frame to be processed. + + Returns: + Whether the strategy is muted. + """ + await super().process_frame(frame) + + if isinstance(frame, BotStartedSpeakingFrame): + await self._handle_bot_started_speaking(frame) + elif isinstance(frame, BotStoppedSpeakingFrame): + await self._handle_bot_stopped_speaking(frame) + + if self._bot_speaking and not self._first_speech_handled: + return True + + return False + + async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame): + self._bot_speaking = True + + async def _handle_bot_stopped_speaking(self, frame: BotStoppedSpeakingFrame): + self._bot_speaking = False + if not self._first_speech_handled: + self._first_speech_handled = True diff --git a/src/pipecat/turns/mute/function_call_user_mute_strategy.py b/src/pipecat/turns/mute/function_call_user_mute_strategy.py new file mode 100644 index 000000000..938dc5936 --- /dev/null +++ b/src/pipecat/turns/mute/function_call_user_mute_strategy.py @@ -0,0 +1,59 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User mute strategy that mutes the user while a function call is executing.""" + +from typing import Set + +from pipecat.frames.frames import ( + Frame, + FunctionCallCancelFrame, + FunctionCallResultFrame, + FunctionCallsStartedFrame, +) +from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy + + +class FunctionCallUserMuteStrategy(BaseUserMuteStrategy): + """User mute strategy that mutes the user while a function call is executing. + + This strategy ensures that user input does not interfere with ongoing + function execution. While a function call is active, all user frames are + muted. Once the function call completes or is canceled, user input is + allowed again. + + """ + + def __init__(self): + """Initialize the function call user mute strategy.""" + super().__init__() + self._function_call_in_progress: Set[str] = set() + + async def reset(self): + """Reset the strategy to its initial state.""" + self._function_call_in_progress = set() + + async def process_frame(self, frame: Frame) -> bool: + """Process an incoming frame. + + Args: + frame: The frame to be processed. + + Returns: + Whether the strategy is muted. + """ + await super().process_frame(frame) + + if isinstance(frame, FunctionCallsStartedFrame): + await self._handle_function_calls_started(frame) + elif isinstance(frame, (FunctionCallCancelFrame, FunctionCallResultFrame)): + self._function_call_in_progress.remove(frame.tool_call_id) + + return bool(self._function_call_in_progress) + + async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame): + for f in frame.function_calls: + self._function_call_in_progress.add(f.tool_call_id) diff --git a/src/pipecat/turns/mute/mute_until_first_bot_complete_user_mute_strategy.py b/src/pipecat/turns/mute/mute_until_first_bot_complete_user_mute_strategy.py new file mode 100644 index 000000000..093f5356f --- /dev/null +++ b/src/pipecat/turns/mute/mute_until_first_bot_complete_user_mute_strategy.py @@ -0,0 +1,56 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +"""User mute strategy that mutes the user until the bot completes its first speech.""" + +from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame +from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy + + +class MuteUntilFirstBotCompleteUserMuteStrategy(BaseUserMuteStrategy): + """User mute strategy that mutes the user until the bot completes its first speech. + + This strategy mutes user frames immediately from the start of the + interaction, even if the bot has not started speaking yet. User input + remains muted until the bot finishes its first speaking turn. + + After the bot completes its initial speech, all subsequent user frames are + allowed to pass through without muting. + + Use this strategy when the bot must fully control the beginning of the + interaction and deliver its first response without any user interruption. + + """ + + def __init__(self): + """Initialize the mute-until-first-bot-complete user mute strategy.""" + super().__init__() + self._first_speech_handled = False + + async def reset(self): + """Reset the strategy to its initial state.""" + self._first_speech_handled = False + + async def process_frame(self, frame: Frame) -> bool: + """Process an incoming frame. + + Args: + frame: The frame to be processed. + + Returns: + Whether the strategy is muted. + """ + await super().process_frame(frame) + + if isinstance(frame, BotStoppedSpeakingFrame): + await self._handle_bot_stopped_speaking(frame) + + return not self._first_speech_handled + + async def _handle_bot_stopped_speaking(self, frame: BotStoppedSpeakingFrame): + self._bot_speaking = False + if not self._first_speech_handled: + self._first_speech_handled = True