From f6916428b1f07d1b0484973b58a4924cab91cd0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Thu, 6 Nov 2025 09:40:33 -0800 Subject: [PATCH] FrameProcessor: add new broadcast_frame() method --- CHANGELOG.md | 10 ++++- .../processors/filters/stt_mute_filter.py | 1 - src/pipecat/processors/frame_processor.py | 15 ++++++- src/pipecat/services/deepgram/flux/stt.py | 3 +- src/pipecat/services/llm_service.py | 42 ++++--------------- src/pipecat/transports/base_input.py | 13 ++---- 6 files changed, 35 insertions(+), 49 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0b16df29..9baa61a9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added new `FrameProcessor.broadcast_frame()` method. This will push two + instances of a given frame class, one upstream and the other downstream. + + ```python + await self.broadcast_frame(UserSpeakingFrame) + ``` + - Added `MetricsLogObserver` for logging performance metrics from `MetricsFrame` instances. Supports filtering via `include_metrics` parameter to control which metrics types are logged (TTFB, processing time, LLM token usage, TTS usage, @@ -36,8 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `CancelFrame` and `CancelTaskFrame` have an optional `reason` field to indicate why the pipeline is being canceled. This can be also specified when - you cancel a task with `PipelineTask.cancel(reason="cancellation your -reason")`. + you cancel a task with `PipelineTask.cancel(reason="cancellation reason")`. - Added `include_prob_metrics` parameter to Whisper STT services to enable access to probability metrics from transcription results. diff --git a/src/pipecat/processors/filters/stt_mute_filter.py b/src/pipecat/processors/filters/stt_mute_filter.py index a134fee5d..0b822b54a 100644 --- a/src/pipecat/processors/filters/stt_mute_filter.py +++ b/src/pipecat/processors/filters/stt_mute_filter.py @@ -27,7 +27,6 @@ from pipecat.frames.frames import ( InterimTranscriptionFrame, InterruptionFrame, StartFrame, - STTMuteFrame, TranscriptionFrame, UserStartedSpeakingFrame, UserStoppedSpeakingFrame, diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 1ca3333b5..361aa0a44 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -14,7 +14,7 @@ management, and frame flow control mechanisms. import asyncio from dataclasses import dataclass from enum import Enum -from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple +from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple, Type from loguru import logger @@ -689,6 +689,19 @@ class FrameProcessor(BaseObject): self._wait_for_interruption = False + async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs): + """Broadcasts a frame of the specified class upstream and downstream. + + This method creates two instances of the given frame class using the + provided keyword arguments and pushes them upstream and downstream. + + Args: + frame_cls: The class of the frame to be broadcasted. + **kwargs: Keyword arguments to be passed to the frame's constructor. + """ + await self.push_frame(frame_cls(**kwargs)) + await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM) + async def __start(self, frame: StartFrame): """Handle the start frame to initialize processor state. diff --git a/src/pipecat/services/deepgram/flux/stt.py b/src/pipecat/services/deepgram/flux/stt.py index 78f615a4d..9b22a5d28 100644 --- a/src/pipecat/services/deepgram/flux/stt.py +++ b/src/pipecat/services/deepgram/flux/stt.py @@ -526,8 +526,7 @@ class DeepgramFluxSTTService(WebsocketSTTService): """ logger.debug("User started speaking") await self.push_interruption_task_frame_and_wait() - await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM) - await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM) + await self.broadcast_frame(UserStartedSpeakingFrame) await self.start_metrics() await self._call_event_handler("on_start_of_turn", transcript) if transcript: diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index e342be7eb..7f743354d 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -433,11 +433,7 @@ class LLMService(AIService): await self._call_event_handler("on_function_calls_started", function_calls) - # Push frame both downstream and upstream - started_frame_downstream = FunctionCallsStartedFrame(function_calls=function_calls) - started_frame_upstream = FunctionCallsStartedFrame(function_calls=function_calls) - await self.push_frame(started_frame_downstream, FrameDirection.DOWNSTREAM) - await self.push_frame(started_frame_upstream, FrameDirection.UPSTREAM) + await self.broadcast_frame(FunctionCallsStartedFrame, function_calls=function_calls) for function_call in function_calls: if function_call.function_name in self._functions.keys(): @@ -552,33 +548,24 @@ class LLMService(AIService): # NOTE(aleix): This needs to be removed after we remove the deprecation. await self._call_start_function(runner_item.context, runner_item.function_name) - # Push a function call in-progress downstream. This frame will let our - # assistant context aggregator know that we are in the middle of a - # function call. Some contexts/aggregators may not need this. But some - # definitely do (Anthropic, for example). Also push it upstream for use - # by other processors, like STTMuteFilter. - progress_frame_downstream = FunctionCallInProgressFrame( + # Broadcast function call in-progress. This frame will let our assistant + # context aggregator know that we are in the middle of a function + # call. Some contexts/aggregators may not need this. But some definitely + # do (Anthropic, for example). + await self.broadcast_frame( + FunctionCallInProgressFrame, function_name=runner_item.function_name, tool_call_id=runner_item.tool_call_id, arguments=runner_item.arguments, cancel_on_interruption=item.cancel_on_interruption, ) - progress_frame_upstream = FunctionCallInProgressFrame( - function_name=runner_item.function_name, - tool_call_id=runner_item.tool_call_id, - arguments=runner_item.arguments, - cancel_on_interruption=item.cancel_on_interruption, - ) - - # Push frame both downstream and upstream - await self.push_frame(progress_frame_downstream, FrameDirection.DOWNSTREAM) - await self.push_frame(progress_frame_upstream, FrameDirection.UPSTREAM) # Define a callback function that pushes a FunctionCallResultFrame upstream & downstream. async def function_call_result_callback( result: Any, *, properties: Optional[FunctionCallResultProperties] = None ): - result_frame_downstream = FunctionCallResultFrame( + await self.broadcast_frame( + FunctionCallResultFrame, function_name=runner_item.function_name, tool_call_id=runner_item.tool_call_id, arguments=runner_item.arguments, @@ -586,17 +573,6 @@ class LLMService(AIService): run_llm=runner_item.run_llm, properties=properties, ) - result_frame_upstream = FunctionCallResultFrame( - function_name=runner_item.function_name, - tool_call_id=runner_item.tool_call_id, - arguments=runner_item.arguments, - result=result, - run_llm=runner_item.run_llm, - properties=properties, - ) - - await self.push_frame(result_frame_downstream, FrameDirection.DOWNSTREAM) - await self.push_frame(result_frame_upstream, FrameDirection.UPSTREAM) if isinstance(item.handler, DirectFunctionWrapper): # Handler is a DirectFunctionWrapper diff --git a/src/pipecat/transports/base_input.py b/src/pipecat/transports/base_input.py index 0f247c6dc..e66007e04 100644 --- a/src/pipecat/transports/base_input.py +++ b/src/pipecat/transports/base_input.py @@ -335,10 +335,7 @@ class BaseInputTransport(FrameProcessor): logger.debug("User started speaking") self._user_speaking = True - upstream_frame = UserStartedSpeakingFrame(emulated=emulated) - downstream_frame = UserStartedSpeakingFrame(emulated=emulated) - await self.push_frame(downstream_frame) - await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) + await self.broadcast_frame(UserStartedSpeakingFrame, emulated=emulated) # Only push InterruptionFrame if: # 1. No interruption config is set, OR @@ -359,10 +356,7 @@ class BaseInputTransport(FrameProcessor): logger.debug("User stopped speaking") self._user_speaking = False - upstream_frame = UserStoppedSpeakingFrame(emulated=emulated) - downstream_frame = UserStoppedSpeakingFrame(emulated=emulated) - await self.push_frame(downstream_frame) - await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) + await self.broadcast_frame(UserStoppedSpeakingFrame, emulated=emulated) # # Handle bot speaking state @@ -479,8 +473,7 @@ class BaseInputTransport(FrameProcessor): await self._run_turn_analyzer(frame, vad_state, previous_vad_state) if vad_state == VADState.SPEAKING: - await self.push_frame(UserSpeakingFrame()) - await self.push_frame(UserSpeakingFrame(), FrameDirection.UPSTREAM) + await self.broadcast_frame(UserSpeakingFrame) # Push audio downstream if passthrough is set. if self._params.audio_in_passthrough: