initial user mute strategies

This commit is contained in:
Aleix Conchillo Flaqué
2025-12-23 21:55:13 -08:00
parent ee35ea0966
commit 1d70275574
6 changed files with 289 additions and 0 deletions

View File

View File

@@ -0,0 +1,41 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User mute strategy that always mutes the user while the bot is speaking."""
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
class AlwaysUserMuteStrategy(BaseUserMuteStrategy):
"""User mute strategy that always mutes the user while the bot is speaking."""
def __init__(self):
"""Initialize the always user mute strategy."""
super().__init__()
self._bot_speaking = False
async def reset(self):
"""Reset the strategy to its initial state."""
self._bot_speaking = False
async def process_frame(self, frame: Frame) -> bool:
"""Process an incoming frame.
Args:
frame: The frame to be processed.
Returns:
Whether the strategy is muted.
"""
await super().process_frame(frame)
if isinstance(frame, BotStartedSpeakingFrame):
self._bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self._bot_speaking = False
return self._bot_speaking

View File

@@ -0,0 +1,69 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Base strategy for deciding whether user frames should be muted."""
from typing import Optional
from pipecat.frames.frames import Frame
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
class BaseUserMuteStrategy(BaseObject):
"""Base class for strategies that decide whether user frames should be muted.
A user mute strategy determines whether incoming user frames should be
suppressed based on the *current system state*.
Typical heuristics include:
- The bot is currently speaking, so user should be muted
- A function call or tool execution is in progress
- The system is otherwise not ready to accept user input
The strategy is evaluated per frame and returns a boolean indicating whether
the user should be muted.
"""
def __init__(self, **kwargs):
"""Initialize the base user mute strategy."""
super().__init__(**kwargs)
self._task_manager: Optional[BaseTaskManager] = None
@property
def task_manager(self) -> BaseTaskManager:
"""Returns the configured task manager."""
if not self._task_manager:
raise RuntimeError(f"{self} user mute strategy was not properly setup")
return self._task_manager
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the strategy with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
async def cleanup(self):
"""Cleanup the strategy."""
pass
async def reset(self):
"""Reset the strategy to its initial state."""
pass
async def process_frame(self, frame: Frame) -> bool:
"""Process an incoming frame.
Args:
frame: The frame to be processed.
Returns:
Whether the strategy is muted.
"""
return False

View File

@@ -0,0 +1,64 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User mute strategy that mutes the user only during the bots first speech."""
from pipecat.frames.frames import BotStartedSpeakingFrame, BotStoppedSpeakingFrame, Frame
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
class FirstSpeechUserMuteStrategy(BaseUserMuteStrategy):
"""User mute strategy that mutes the user only during the bots first speech.
This strategy allows user input before the bot starts speaking. Once the bot
begins its first speaking turn, user frames are muted until the bot finishes
that speech. After the bot completes its first speaking turn, user input is
no longer muted by this strategy.
Use this strategy when early user input is acceptable, but interruptions
during the bots initial response should be prevented.
"""
def __init__(self):
"""Initialize the first-bot-speech user mute strategy."""
super().__init__()
self._bot_speaking = False
self._first_speech_handled = False
async def reset(self):
"""Reset the strategy to its initial state."""
self._bot_speaking = False
self._first_speech_handled = False
async def process_frame(self, frame: Frame) -> bool:
"""Process an incoming frame.
Args:
frame: The frame to be processed.
Returns:
Whether the strategy is muted.
"""
await super().process_frame(frame)
if isinstance(frame, BotStartedSpeakingFrame):
await self._handle_bot_started_speaking(frame)
elif isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(frame)
if self._bot_speaking and not self._first_speech_handled:
return True
return False
async def _handle_bot_started_speaking(self, frame: BotStartedSpeakingFrame):
self._bot_speaking = True
async def _handle_bot_stopped_speaking(self, frame: BotStoppedSpeakingFrame):
self._bot_speaking = False
if not self._first_speech_handled:
self._first_speech_handled = True

View File

@@ -0,0 +1,59 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User mute strategy that mutes the user while a function call is executing."""
from typing import Set
from pipecat.frames.frames import (
Frame,
FunctionCallCancelFrame,
FunctionCallResultFrame,
FunctionCallsStartedFrame,
)
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
class FunctionCallUserMuteStrategy(BaseUserMuteStrategy):
"""User mute strategy that mutes the user while a function call is executing.
This strategy ensures that user input does not interfere with ongoing
function execution. While a function call is active, all user frames are
muted. Once the function call completes or is canceled, user input is
allowed again.
"""
def __init__(self):
"""Initialize the function call user mute strategy."""
super().__init__()
self._function_call_in_progress: Set[str] = set()
async def reset(self):
"""Reset the strategy to its initial state."""
self._function_call_in_progress = set()
async def process_frame(self, frame: Frame) -> bool:
"""Process an incoming frame.
Args:
frame: The frame to be processed.
Returns:
Whether the strategy is muted.
"""
await super().process_frame(frame)
if isinstance(frame, FunctionCallsStartedFrame):
await self._handle_function_calls_started(frame)
elif isinstance(frame, (FunctionCallCancelFrame, FunctionCallResultFrame)):
self._function_call_in_progress.remove(frame.tool_call_id)
return bool(self._function_call_in_progress)
async def _handle_function_calls_started(self, frame: FunctionCallsStartedFrame):
for f in frame.function_calls:
self._function_call_in_progress.add(f.tool_call_id)

View File

@@ -0,0 +1,56 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""User mute strategy that mutes the user until the bot completes its first speech."""
from pipecat.frames.frames import BotStoppedSpeakingFrame, Frame
from pipecat.turns.mute.base_user_mute_strategy import BaseUserMuteStrategy
class MuteUntilFirstBotCompleteUserMuteStrategy(BaseUserMuteStrategy):
"""User mute strategy that mutes the user until the bot completes its first speech.
This strategy mutes user frames immediately from the start of the
interaction, even if the bot has not started speaking yet. User input
remains muted until the bot finishes its first speaking turn.
After the bot completes its initial speech, all subsequent user frames are
allowed to pass through without muting.
Use this strategy when the bot must fully control the beginning of the
interaction and deliver its first response without any user interruption.
"""
def __init__(self):
"""Initialize the mute-until-first-bot-complete user mute strategy."""
super().__init__()
self._first_speech_handled = False
async def reset(self):
"""Reset the strategy to its initial state."""
self._first_speech_handled = False
async def process_frame(self, frame: Frame) -> bool:
"""Process an incoming frame.
Args:
frame: The frame to be processed.
Returns:
Whether the strategy is muted.
"""
await super().process_frame(frame)
if isinstance(frame, BotStoppedSpeakingFrame):
await self._handle_bot_stopped_speaking(frame)
return not self._first_speech_handled
async def _handle_bot_stopped_speaking(self, frame: BotStoppedSpeakingFrame):
self._bot_speaking = False
if not self._first_speech_handled:
self._first_speech_handled = True