From 63df4642b5fdcf01df84185795986f2b2bbeafa6 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Thu, 19 Feb 2026 07:43:20 -0700 Subject: [PATCH] Fix RTVIObserver missing upstream-only frames by adding broadcasted flag RTVIObserver previously filtered out all upstream frames to avoid duplicate messages from broadcasted frames. This caused upstream-only frames to be silently ignored. Instead, add a `broadcasted` field to the Frame base class that is set by broadcast_frame() and broadcast_frame_instance(), and only skip upstream copies of broadcasted frames. --- src/pipecat/frames/frames.py | 2 ++ src/pipecat/processors/frame_processor.py | 10 ++++++++-- src/pipecat/processors/frameworks/rtvi.py | 7 +++---- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index 8d237defc..c3f6226c6 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -131,6 +131,7 @@ class Frame: id: int = field(init=False) name: str = field(init=False) pts: Optional[int] = field(init=False) + broadcasted: bool = field(init=False) metadata: Dict[str, Any] = field(init=False) transport_source: Optional[str] = field(init=False) transport_destination: Optional[str] = field(init=False) @@ -139,6 +140,7 @@ class Frame: self.id: int = obj_id() self.name: str = f"{self.__class__.__name__}#{obj_count(self)}" self.pts: Optional[int] = None + self.broadcasted: bool = False self.metadata: Dict[str, Any] = {} self.transport_source: Optional[str] = None self.transport_destination: Optional[str] = None diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index f0c9e7183..48c8c718c 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -787,8 +787,12 @@ class FrameProcessor(BaseObject): frame_cls: The class of the frame to be broadcasted. **kwargs: Keyword arguments to be passed to the frame's constructor. """ - await self.push_frame(frame_cls(**kwargs)) - await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM) + downstream_frame = frame_cls(**kwargs) + downstream_frame.broadcasted = True + await self.push_frame(downstream_frame) + upstream_frame = frame_cls(**kwargs) + upstream_frame.broadcasted = True + await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) async def broadcast_frame_instance(self, frame: Frame): """Broadcasts a frame instance upstream and downstream. @@ -815,11 +819,13 @@ class FrameProcessor(BaseObject): new_frame = frame_cls(**init_fields) for k, v in extra_fields.items(): setattr(new_frame, k, v) + new_frame.broadcasted = True await self.push_frame(new_frame) new_frame = frame_cls(**init_fields) for k, v in extra_fields.items(): setattr(new_frame, k, v) + new_frame.broadcasted = True await self.push_frame(new_frame, FrameDirection.UPSTREAM) async def __start(self, frame: StartFrame): diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index c1497b40b..8c7bb15ef 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1220,10 +1220,9 @@ class RTVIObserver(BaseObserver): frame = data.frame direction = data.direction - # Only process downstream frames. Some frames are broadcast in both - # directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame), - # and we only want to send one RTVI message per event. - if direction != FrameDirection.DOWNSTREAM: + # For broadcasted frames (pushed in both directions), only process + # the downstream copy to avoid sending duplicate RTVI messages. + if frame.broadcasted and direction != FrameDirection.DOWNSTREAM: return # If we have already seen this frame, let's skip it.