Refactoring to use broadcasted_sibling_id instead of broadcasted field.

This commit is contained in:
filipi87
2026-02-19 15:06:50 -03:00
parent 50ef4909e3
commit b1cee140b9
4 changed files with 19 additions and 15 deletions

View File

@@ -1 +1 @@
- Added `broadcasted` field to the base `Frame` class. This field is automatically set to `True` by `broadcast_frame()` and `broadcast_frame_instance()` to distinguish broadcasted frames from single-direction frames.
- Added `broadcasted_sibling_id` field to the base `Frame` class. This field is automatically set by `broadcast_frame()` and `broadcast_frame_instance()` to the ID of the paired frame pushed in the opposite direction, allowing receivers to identify broadcast pairs.

View File

@@ -123,6 +123,9 @@ class Frame:
id: Unique identifier for the frame instance.
name: Human-readable name combining class name and instance count.
pts: Presentation timestamp in nanoseconds.
broadcasted_sibling_id: ID of the paired frame when this frame was
broadcast in both directions. Set automatically by
``broadcast_frame()`` and ``broadcast_frame_instance()``.
metadata: Dictionary for arbitrary frame metadata.
transport_source: Name of the transport source that created this frame.
transport_destination: Name of the transport destination for this frame.
@@ -131,7 +134,7 @@ class Frame:
id: int = field(init=False)
name: str = field(init=False)
pts: Optional[int] = field(init=False)
broadcasted: bool = field(init=False)
broadcasted_sibling_id: Optional[int] = field(init=False)
metadata: Dict[str, Any] = field(init=False)
transport_source: Optional[str] = field(init=False)
transport_destination: Optional[str] = field(init=False)
@@ -140,7 +143,7 @@ class Frame:
self.id: int = obj_id()
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
self.pts: Optional[int] = None
self.broadcasted: bool = False
self.broadcasted_sibling_id: Optional[int] = None
self.metadata: Dict[str, Any] = {}
self.transport_source: Optional[str] = None
self.transport_destination: Optional[str] = None

View File

@@ -788,10 +788,10 @@ class FrameProcessor(BaseObject):
**kwargs: Keyword arguments to be passed to the frame's constructor.
"""
downstream_frame = frame_cls(**kwargs)
downstream_frame.broadcasted = True
await self.push_frame(downstream_frame)
upstream_frame = frame_cls(**kwargs)
upstream_frame.broadcasted = True
downstream_frame.broadcasted_sibling_id = upstream_frame.id
upstream_frame.broadcasted_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
async def broadcast_frame_instance(self, frame: Frame):
@@ -816,17 +816,18 @@ class FrameProcessor(BaseObject):
if not f.init and f.name not in ("id", "name")
}
new_frame = frame_cls(**init_fields)
downstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
new_frame.broadcasted = True
await self.push_frame(new_frame)
setattr(downstream_frame, k, v)
new_frame = frame_cls(**init_fields)
upstream_frame = frame_cls(**init_fields)
for k, v in extra_fields.items():
setattr(new_frame, k, v)
new_frame.broadcasted = True
await self.push_frame(new_frame, FrameDirection.UPSTREAM)
setattr(upstream_frame, k, v)
downstream_frame.broadcasted_sibling_id = upstream_frame.id
upstream_frame.broadcasted_sibling_id = downstream_frame.id
await self.push_frame(downstream_frame)
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
async def __start(self, frame: StartFrame):
"""Handle the start frame to initialize processor state.

View File

@@ -1222,7 +1222,7 @@ class RTVIObserver(BaseObserver):
# For broadcasted frames (pushed in both directions), only process
# the downstream copy to avoid sending duplicate RTVI messages.
if frame.broadcasted and direction != FrameDirection.DOWNSTREAM:
if frame.broadcasted_sibling_id is not None and direction != FrameDirection.DOWNSTREAM:
return
# If we have already seen this frame, let's skip it.