From ebfa4f2d5e223d1838c4e881d28fcf21d7c7b346 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Tue, 29 Jul 2025 12:01:29 -0400 Subject: [PATCH] Push the STTMuteFrame upstream and downstream --- CHANGELOG.md | 3 + examples/foundational/24-stt-mute-filter.py | 2 +- examples/translation-chatbot/bot.py | 216 ------------------ .../processors/filters/stt_mute_filter.py | 3 +- 4 files changed, 6 insertions(+), 218 deletions(-) delete mode 100644 examples/translation-chatbot/bot.py diff --git a/CHANGELOG.md b/CHANGELOG.md index d488476de..e23d861b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- `STTMuteFilter` now pushes the `STTMuteFrame` upstream and downstream, to + allow for more flexible `STTMuteFilter` placement. + - Play delayed messages from `ElevenLabsTTSService` if they still belong to the current context. diff --git a/examples/foundational/24-stt-mute-filter.py b/examples/foundational/24-stt-mute-filter.py index 7896ea3f2..a39314e92 100644 --- a/examples/foundational/24-stt-mute-filter.py +++ b/examples/foundational/24-stt-mute-filter.py @@ -112,7 +112,7 @@ async def run_example(transport: BaseTransport, _: argparse.Namespace, handle_si [ transport.input(), # Transport user input stt, # STT - stt_mute_processor, # Add the mute processor before STT + stt_mute_processor, # Add the mute processor between STT and context aggregator context_aggregator.user(), # User responses llm, # LLM tts, # TTS diff --git a/examples/translation-chatbot/bot.py b/examples/translation-chatbot/bot.py deleted file mode 100644 index 801358b7c..000000000 --- a/examples/translation-chatbot/bot.py +++ /dev/null @@ -1,216 +0,0 @@ -# -# Copyright (c) 2024–2025, Daily -# -# SPDX-License-Identifier: BSD 2-Clause License -# - -import asyncio -import os -import sys -from typing import List - -import aiohttp -from dotenv import load_dotenv -from loguru import logger -from runner import configure - -from pipecat.audio.vad.silero import SileroVADAnalyzer -from pipecat.frames.frames import ( - Frame, - LLMMessagesFrame, - TranscriptionFrame, - TranscriptionMessage, - TranscriptionUpdateFrame, -) -from pipecat.pipeline.pipeline import Pipeline -from pipecat.pipeline.runner import PipelineRunner -from pipecat.pipeline.task import PipelineParams, PipelineTask -from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext -from pipecat.processors.filters.stt_mute_filter import STTMuteConfig, STTMuteFilter, STTMuteStrategy -from pipecat.processors.frame_processor import FrameDirection, FrameProcessor -from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor -from pipecat.processors.transcript_processor import TranscriptProcessor -from pipecat.services.cartesia.tts import CartesiaTTSService -from pipecat.services.deepgram.stt import DeepgramSTTService -from pipecat.services.openai.llm import OpenAILLMService -from pipecat.transports.services.daily import DailyParams, DailyTransport - -load_dotenv(override=True) - -logger.remove(0) -logger.add(sys.stderr, level="DEBUG") - - -""" -This example looks a bit different than the chatbot example, because it isn't waiting on the user to stop talking to start translating. -It also isn't saving what the user or bot says into the context object for use in subsequent interactions. -""" - - -# We need to use a custom service here to yield LLM frames without saving -# any context -class TranslationProcessor(FrameProcessor): - """A processor that translates text frames from a source language to a target language.""" - - def __init__(self, in_language, out_language): - """Initialize the TranslationProcessor with source and target languages. - - Args: - in_language (str): The language of the input text. - out_language (str): The language to translate the text into. - """ - super().__init__() - self._out_language = out_language - self._in_language = in_language - - async def process_frame(self, frame: Frame, direction: FrameDirection): - """Process a frame and translate text frames. - - Args: - frame (Frame): The frame to process. - direction (FrameDirection): The direction of the frame. - """ - await super().process_frame(frame, direction) - - if isinstance(frame, TranscriptionFrame): - logger.debug(f"Translating {self._in_language}: {frame.text} to {self._out_language}") - context = [ - { - "role": "system", - "content": f"You will be provided with a sentence in {self._in_language}, and your task is to only translate it into {self._out_language}.", - }, - {"role": "user", "content": frame.text}, - ] - await self.push_frame(LLMMessagesFrame(context)) - else: - await self.push_frame(frame) - - -class TranscriptHandler: - """Simple handler to demonstrate transcript processing. - - Maintains a list of conversation messages and logs them with timestamps. - """ - - def __init__(self, in_language="English", out_language="Spanish"): - """Initialize the TranscriptHandler with an empty list of messages.""" - self.messages: List[TranscriptionMessage] = [] - self.in_language = in_language - self.out_language = out_language - - async def on_transcript_update( - self, processor: TranscriptProcessor, frame: TranscriptionUpdateFrame - ): - """Handle new transcript messages. - - Args: - processor: The TranscriptProcessor that emitted the update - frame: TranscriptionUpdateFrame containing new messages - """ - self.messages.extend(frame.messages) - - # Log the new messages - logger.info("New transcript messages:") - for msg in frame.messages: - timestamp = f"[{msg.timestamp}] " if msg.timestamp else "" - message = { - "event": "translation", - "timestamp": msg.timestamp, - "role": msg.role, - "language": self.out_language if msg.role == "assistant" else self.in_language, - "text": msg.content, - } - logger.info(f"{timestamp}{msg.role}: {msg.content}") - - -async def main(): - """Main function to set up and run the translation chatbot pipeline.""" - async with aiohttp.ClientSession() as session: - (room_url, token) = await configure(session) - - transport = DailyTransport( - room_url, - token, - "Translator", - DailyParams( - audio_in_enabled=True, - audio_out_enabled=True, - vad_analyzer=SileroVADAnalyzer(), - ), - ) - - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - - stt_mute_processor = STTMuteFilter( - config=STTMuteConfig( - strategies={ - STTMuteStrategy.ALWAYS, - } - ), - ) - - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="34dbb662-8e98-413c-a1ef-1a3407675fe7", # Spanish Narrator Man - model="sonic-2", - ) - - in_language = "English" - out_language = "Spanish" - - llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY")) - context = OpenAILLMContext() - context_aggregator = llm.create_context_aggregator(context) - - tp = TranslationProcessor(in_language=in_language, out_language=out_language) - - transcript = TranscriptProcessor() - transcript_handler = TranscriptHandler(in_language=in_language, out_language=out_language) - - # Register event handler for transcript updates - @transcript.event_handler("on_transcript_update") - async def on_transcript_update(processor, frame): - await transcript_handler.on_transcript_update(processor, frame) - - rtvi = RTVIProcessor() - - pipeline = Pipeline( - [ - transport.input(), - rtvi, - stt, - stt_mute_processor, # We don't want to interrupt the translator bot - transcript.user(), # User transcripts - tp, - llm, - tts, - transport.output(), - transcript.assistant(), - context_aggregator.assistant(), - ] - ) - - task = PipelineTask( - pipeline, - params=PipelineParams( - enable_metrics=True, - enable_usage_metrics=True, - ), - observers=[RTVIObserver(rtvi)], - ) - - @transport.event_handler("on_first_participant_joined") - async def on_first_participant_joined(transport, participant): - logger.info("First participant joined") - - @transport.event_handler("on_participant_left") - async def on_participant_left(transport, participant, reason): - await task.cancel() - - runner = PipelineRunner() - - await runner.run(task) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/src/pipecat/processors/filters/stt_mute_filter.py b/src/pipecat/processors/filters/stt_mute_filter.py index 700313d31..5e77b60ee 100644 --- a/src/pipecat/processors/filters/stt_mute_filter.py +++ b/src/pipecat/processors/filters/stt_mute_filter.py @@ -133,7 +133,8 @@ class STTMuteFilter(FrameProcessor): if should_mute != self.is_muted: logger.debug(f"STTMuteFilter {'muting' if should_mute else 'unmuting'}") self._is_muted = should_mute - await self.push_frame(STTMuteFrame(mute=should_mute)) + await self.push_frame(STTMuteFrame(mute=should_mute), FrameDirection.UPSTREAM) + await self.push_frame(STTMuteFrame(mute=should_mute), FrameDirection.DOWNSTREAM) async def _should_mute(self) -> bool: """Determine if STT should be muted based on current state and strategies."""