Merge pull request #2996 from pipecat-ai/aleix/broadcast-frame

FrameProcessor: add new broadcast_frame() method
This commit is contained in:
Aleix Conchillo Flaqué
2025-11-06 12:13:15 -08:00
committed by GitHub
6 changed files with 35 additions and 49 deletions

View File

@@ -9,6 +9,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Added new `FrameProcessor.broadcast_frame()` method. This will push two
instances of a given frame class, one upstream and the other downstream.
```python
await self.broadcast_frame(UserSpeakingFrame)
```
- Added `MetricsLogObserver` for logging performance metrics from `MetricsFrame`
instances. Supports filtering via `include_metrics` parameter to control which
metrics types are logged (TTFB, processing time, LLM token usage, TTS usage,
@@ -36,8 +43,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `CancelFrame` and `CancelTaskFrame` have an optional `reason` field to
indicate why the pipeline is being canceled. This can be also specified when
you cancel a task with `PipelineTask.cancel(reason="cancellation your
reason")`.
you cancel a task with `PipelineTask.cancel(reason="cancellation reason")`.
- Added `include_prob_metrics` parameter to Whisper STT services to enable access
to probability metrics from transcription results.

View File

@@ -27,7 +27,6 @@ from pipecat.frames.frames import (
InterimTranscriptionFrame,
InterruptionFrame,
StartFrame,
STTMuteFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,

View File

@@ -14,7 +14,7 @@ management, and frame flow control mechanisms.
import asyncio
from dataclasses import dataclass
from enum import Enum
from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple
from typing import Any, Awaitable, Callable, Coroutine, List, Optional, Sequence, Tuple, Type
from loguru import logger
@@ -689,6 +689,19 @@ class FrameProcessor(BaseObject):
self._wait_for_interruption = False
async def broadcast_frame(self, frame_cls: Type[Frame], **kwargs):
"""Broadcasts a frame of the specified class upstream and downstream.
This method creates two instances of the given frame class using the
provided keyword arguments and pushes them upstream and downstream.
Args:
frame_cls: The class of the frame to be broadcasted.
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
await self.push_frame(frame_cls(**kwargs))
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.

View File

@@ -526,8 +526,7 @@ class DeepgramFluxSTTService(WebsocketSTTService):
"""
logger.debug("User started speaking")
await self.push_interruption_task_frame_and_wait()
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
await self.broadcast_frame(UserStartedSpeakingFrame)
await self.start_metrics()
await self._call_event_handler("on_start_of_turn", transcript)
if transcript:

View File

@@ -433,11 +433,7 @@ class LLMService(AIService):
await self._call_event_handler("on_function_calls_started", function_calls)
# Push frame both downstream and upstream
started_frame_downstream = FunctionCallsStartedFrame(function_calls=function_calls)
started_frame_upstream = FunctionCallsStartedFrame(function_calls=function_calls)
await self.push_frame(started_frame_downstream, FrameDirection.DOWNSTREAM)
await self.push_frame(started_frame_upstream, FrameDirection.UPSTREAM)
await self.broadcast_frame(FunctionCallsStartedFrame, function_calls=function_calls)
for function_call in function_calls:
if function_call.function_name in self._functions.keys():
@@ -552,33 +548,24 @@ class LLMService(AIService):
# NOTE(aleix): This needs to be removed after we remove the deprecation.
await self._call_start_function(runner_item.context, runner_item.function_name)
# Push a function call in-progress downstream. This frame will let our
# assistant context aggregator know that we are in the middle of a
# function call. Some contexts/aggregators may not need this. But some
# definitely do (Anthropic, for example). Also push it upstream for use
# by other processors, like STTMuteFilter.
progress_frame_downstream = FunctionCallInProgressFrame(
# Broadcast function call in-progress. This frame will let our assistant
# context aggregator know that we are in the middle of a function
# call. Some contexts/aggregators may not need this. But some definitely
# do (Anthropic, for example).
await self.broadcast_frame(
FunctionCallInProgressFrame,
function_name=runner_item.function_name,
tool_call_id=runner_item.tool_call_id,
arguments=runner_item.arguments,
cancel_on_interruption=item.cancel_on_interruption,
)
progress_frame_upstream = FunctionCallInProgressFrame(
function_name=runner_item.function_name,
tool_call_id=runner_item.tool_call_id,
arguments=runner_item.arguments,
cancel_on_interruption=item.cancel_on_interruption,
)
# Push frame both downstream and upstream
await self.push_frame(progress_frame_downstream, FrameDirection.DOWNSTREAM)
await self.push_frame(progress_frame_upstream, FrameDirection.UPSTREAM)
# Define a callback function that pushes a FunctionCallResultFrame upstream & downstream.
async def function_call_result_callback(
result: Any, *, properties: Optional[FunctionCallResultProperties] = None
):
result_frame_downstream = FunctionCallResultFrame(
await self.broadcast_frame(
FunctionCallResultFrame,
function_name=runner_item.function_name,
tool_call_id=runner_item.tool_call_id,
arguments=runner_item.arguments,
@@ -586,17 +573,6 @@ class LLMService(AIService):
run_llm=runner_item.run_llm,
properties=properties,
)
result_frame_upstream = FunctionCallResultFrame(
function_name=runner_item.function_name,
tool_call_id=runner_item.tool_call_id,
arguments=runner_item.arguments,
result=result,
run_llm=runner_item.run_llm,
properties=properties,
)
await self.push_frame(result_frame_downstream, FrameDirection.DOWNSTREAM)
await self.push_frame(result_frame_upstream, FrameDirection.UPSTREAM)
if isinstance(item.handler, DirectFunctionWrapper):
# Handler is a DirectFunctionWrapper

View File

@@ -335,10 +335,7 @@ class BaseInputTransport(FrameProcessor):
logger.debug("User started speaking")
self._user_speaking = True
upstream_frame = UserStartedSpeakingFrame(emulated=emulated)
downstream_frame = UserStartedSpeakingFrame(emulated=emulated)
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
await self.broadcast_frame(UserStartedSpeakingFrame, emulated=emulated)
# Only push InterruptionFrame if:
# 1. No interruption config is set, OR
@@ -359,10 +356,7 @@ class BaseInputTransport(FrameProcessor):
logger.debug("User stopped speaking")
self._user_speaking = False
upstream_frame = UserStoppedSpeakingFrame(emulated=emulated)
downstream_frame = UserStoppedSpeakingFrame(emulated=emulated)
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
await self.broadcast_frame(UserStoppedSpeakingFrame, emulated=emulated)
#
# Handle bot speaking state
@@ -479,8 +473,7 @@ class BaseInputTransport(FrameProcessor):
await self._run_turn_analyzer(frame, vad_state, previous_vad_state)
if vad_state == VADState.SPEAKING:
await self.push_frame(UserSpeakingFrame())
await self.push_frame(UserSpeakingFrame(), FrameDirection.UPSTREAM)
await self.broadcast_frame(UserSpeakingFrame)
# Push audio downstream if passthrough is set.
if self._params.audio_in_passthrough: