From 0efa36a04ea52f608776ff0f5b49379bcda9b412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Wed, 24 Dec 2025 10:35:00 -0800 Subject: [PATCH] examples(foundational): added 24-user-mute-strategy.py example --- .../foundational/24-user-mute-strategy.py | 178 ++++++++++++++++++ examples/foundational/README.md | 2 +- 2 files changed, 179 insertions(+), 1 deletion(-) create mode 100644 examples/foundational/24-user-mute-strategy.py diff --git a/examples/foundational/24-user-mute-strategy.py b/examples/foundational/24-user-mute-strategy.py new file mode 100644 index 000000000..097338dac --- /dev/null +++ b/examples/foundational/24-user-mute-strategy.py @@ -0,0 +1,178 @@ +# +# Copyright (c) 2024–2025, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + + +import asyncio +import os + +from dotenv import load_dotenv +from loguru import logger + +from pipecat.adapters.schemas.function_schema import FunctionSchema +from pipecat.adapters.schemas.tools_schema import ToolsSchema +from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3 +from pipecat.audio.vad.silero import SileroVADAnalyzer +from pipecat.audio.vad.vad_analyzer import VADParams +from pipecat.frames.frames import LLMRunFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineParams, PipelineTask +from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_response_universal import ( + LLMContextAggregatorPair, + LLMUserAggregatorParams, +) +from pipecat.runner.types import RunnerArguments +from pipecat.runner.utils import create_transport +from pipecat.services.deepgram.stt import DeepgramSTTService +from pipecat.services.deepgram.tts import DeepgramTTSService +from pipecat.services.llm_service import FunctionCallParams +from pipecat.services.openai.llm import OpenAILLMService +from pipecat.transports.base_transport import BaseTransport, TransportParams +from pipecat.transports.daily.transport import DailyParams +from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams +from pipecat.turns.bot.turn_analyzer_bot_turn_start_strategy import TurnAnalyzerBotTurnStartStrategy +from pipecat.turns.mute.function_call_user_mute_strategy import FunctionCallUserMuteStrategy +from pipecat.turns.mute.mute_until_first_bot_complete_user_mute_strategy import ( + MuteUntilFirstBotCompleteUserMuteStrategy, +) +from pipecat.turns.turn_start_strategies import TurnStartStrategies + +load_dotenv(override=True) + + +async def fetch_weather_from_api(params: FunctionCallParams): + # Add a delay to test interruption during function calls + logger.info("Weather API call starting...") + await asyncio.sleep(5) # 5-second delay + logger.info("Weather API call completed") + await params.result_callback({"conditions": "nice", "temperature": "75"}) + + +# We store functions so objects (e.g. SileroVADAnalyzer) don't get +# instantiated. The function will be called when the desired transport gets +# selected. +transport_params = { + "daily": lambda: DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + ), + "twilio": lambda: FastAPIWebsocketParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + ), + "webrtc": lambda: TransportParams( + audio_in_enabled=True, + audio_out_enabled=True, + vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), + ), +} + + +async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): + logger.info(f"Starting bot") + + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + + tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en") + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) + llm.register_function("get_current_weather", fetch_weather_from_api) + + weather_function = FunctionSchema( + name="get_current_weather", + description="Get the current weather", + properties={ + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "format": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit to use. Infer this from the user's location.", + }, + }, + required=["location", "format"], + ) + tools = ToolsSchema(standard_tools=[weather_function]) + + messages = [ + { + "role": "system", + "content": "You are a helpful assistant who can check the weather. Always check the weather when a location is mentioned. Respond concisely and naturally. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.", + }, + ] + + context = LLMContext(messages, tools) + context_aggregator = LLMContextAggregatorPair( + context, + user_params=LLMUserAggregatorParams( + user_mute_strategies=[ + MuteUntilFirstBotCompleteUserMuteStrategy(), + FunctionCallUserMuteStrategy(), + ] + ), + ) + + pipeline = Pipeline( + [ + transport.input(), # Transport user input + stt, # STT + context_aggregator.user(), # User responses + llm, # LLM + tts, # TTS + transport.output(), # Transport bot output + context_aggregator.assistant(), # Assistant spoken responses + ] + ) + + task = PipelineTask( + pipeline, + params=PipelineParams( + enable_metrics=True, + enable_usage_metrics=True, + turn_start_strategies=TurnStartStrategies( + bot=[TurnAnalyzerBotTurnStartStrategy(turn_analyzer=LocalSmartTurnAnalyzerV3())] + ), + ), + idle_timeout_secs=runner_args.pipeline_idle_timeout_secs, + ) + + @transport.event_handler("on_client_connected") + async def on_client_connected(transport, client): + logger.info(f"Client connected") + # Kick off the conversation with a weather-related prompt + messages.append( + { + "role": "system", + "content": "Ask the user what city they'd like to know the weather for.", + } + ) + await task.queue_frames([LLMRunFrame()]) + + @transport.event_handler("on_client_disconnected") + async def on_client_disconnected(transport, client): + logger.info(f"Client disconnected") + await task.cancel() + + runner = PipelineRunner(handle_sigint=runner_args.handle_sigint) + + await runner.run(task) + + +async def bot(runner_args: RunnerArguments): + """Main bot entry point compatible with Pipecat Cloud.""" + transport = await create_transport(runner_args, transport_params) + await run_bot(transport, runner_args) + + +if __name__ == "__main__": + from pipecat.runner.run import main + + main() diff --git a/examples/foundational/README.md b/examples/foundational/README.md index c1ead3ece..bc25d42ca 100644 --- a/examples/foundational/README.md +++ b/examples/foundational/README.md @@ -80,7 +80,7 @@ uv run 07-interruptible.py -t twilio -x NGROK_HOST_NAME ### Common Utilities - **[17-detect-user-idle.py](./17-detect-user-idle.py)**: Handle inactive users (UserIdleProcessor) -- **[24-stt-mute-filter.py](./24-stt-mute-filter.py)**: Selectively mute user input (STTMuteFilter) +- **[24-user-mute-strategy.py](./24-user-mute-strategy.py)**: Selectively mute user input (LLMUserAggregator user mute strategies) - **[28-transcription-processor.py](./28-transcription-processor.py)**: Record conversation text (TranscriptProcessor) - **[30-observer.py](./30-observer.py)**: Access frame data (Custom observers) - **[31-heartbeats.py](./31-heartbeats.py)**: Detect idle pipelines (Pipeline monitoring)