diff --git a/changelog/3774.added.md b/changelog/3774.added.md index 032d8238f..8e442999c 100644 --- a/changelog/3774.added.md +++ b/changelog/3774.added.md @@ -1 +1 @@ -- Added `broadcasted` field to the base `Frame` class. This field is automatically set to `True` by `broadcast_frame()` and `broadcast_frame_instance()` to distinguish broadcasted frames from single-direction frames. +- Added `broadcasted_sibling_id` field to the base `Frame` class. This field is automatically set by `broadcast_frame()` and `broadcast_frame_instance()` to the ID of the paired frame pushed in the opposite direction, allowing receivers to identify broadcast pairs. diff --git a/src/pipecat/frames/frames.py b/src/pipecat/frames/frames.py index c3f6226c6..e6bc008ab 100644 --- a/src/pipecat/frames/frames.py +++ b/src/pipecat/frames/frames.py @@ -123,6 +123,9 @@ class Frame: id: Unique identifier for the frame instance. name: Human-readable name combining class name and instance count. pts: Presentation timestamp in nanoseconds. + broadcasted_sibling_id: ID of the paired frame when this frame was + broadcast in both directions. Set automatically by + ``broadcast_frame()`` and ``broadcast_frame_instance()``. metadata: Dictionary for arbitrary frame metadata. transport_source: Name of the transport source that created this frame. transport_destination: Name of the transport destination for this frame. @@ -131,7 +134,7 @@ class Frame: id: int = field(init=False) name: str = field(init=False) pts: Optional[int] = field(init=False) - broadcasted: bool = field(init=False) + broadcasted_sibling_id: Optional[int] = field(init=False) metadata: Dict[str, Any] = field(init=False) transport_source: Optional[str] = field(init=False) transport_destination: Optional[str] = field(init=False) @@ -140,7 +143,7 @@ class Frame: self.id: int = obj_id() self.name: str = f"{self.__class__.__name__}#{obj_count(self)}" self.pts: Optional[int] = None - self.broadcasted: bool = False + self.broadcasted_sibling_id: Optional[int] = None self.metadata: Dict[str, Any] = {} self.transport_source: Optional[str] = None self.transport_destination: Optional[str] = None diff --git a/src/pipecat/processors/frame_processor.py b/src/pipecat/processors/frame_processor.py index 48c8c718c..6bdd9ae1d 100644 --- a/src/pipecat/processors/frame_processor.py +++ b/src/pipecat/processors/frame_processor.py @@ -788,10 +788,10 @@ class FrameProcessor(BaseObject): **kwargs: Keyword arguments to be passed to the frame's constructor. """ downstream_frame = frame_cls(**kwargs) - downstream_frame.broadcasted = True - await self.push_frame(downstream_frame) upstream_frame = frame_cls(**kwargs) - upstream_frame.broadcasted = True + downstream_frame.broadcasted_sibling_id = upstream_frame.id + upstream_frame.broadcasted_sibling_id = downstream_frame.id + await self.push_frame(downstream_frame) await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) async def broadcast_frame_instance(self, frame: Frame): @@ -816,17 +816,18 @@ class FrameProcessor(BaseObject): if not f.init and f.name not in ("id", "name") } - new_frame = frame_cls(**init_fields) + downstream_frame = frame_cls(**init_fields) for k, v in extra_fields.items(): - setattr(new_frame, k, v) - new_frame.broadcasted = True - await self.push_frame(new_frame) + setattr(downstream_frame, k, v) - new_frame = frame_cls(**init_fields) + upstream_frame = frame_cls(**init_fields) for k, v in extra_fields.items(): - setattr(new_frame, k, v) - new_frame.broadcasted = True - await self.push_frame(new_frame, FrameDirection.UPSTREAM) + setattr(upstream_frame, k, v) + + downstream_frame.broadcasted_sibling_id = upstream_frame.id + upstream_frame.broadcasted_sibling_id = downstream_frame.id + await self.push_frame(downstream_frame) + await self.push_frame(upstream_frame, FrameDirection.UPSTREAM) async def __start(self, frame: StartFrame): """Handle the start frame to initialize processor state. diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 8c7bb15ef..55be46452 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1222,7 +1222,7 @@ class RTVIObserver(BaseObserver): # For broadcasted frames (pushed in both directions), only process # the downstream copy to avoid sending duplicate RTVI messages. - if frame.broadcasted and direction != FrameDirection.DOWNSTREAM: + if frame.broadcasted_sibling_id is not None and direction != FrameDirection.DOWNSTREAM: return # If we have already seen this frame, let's skip it.