Fix RTVIObserver missing upstream-only frames by adding broadcasted flag
RTVIObserver previously filtered out all upstream frames to avoid duplicate messages from broadcasted frames. This caused upstream-only frames to be silently ignored. Instead, add a `broadcasted` field to the Frame base class that is set by broadcast_frame() and broadcast_frame_instance(), and only skip upstream copies of broadcasted frames.
This commit is contained in:
@@ -131,6 +131,7 @@ class Frame:
|
||||
id: int = field(init=False)
|
||||
name: str = field(init=False)
|
||||
pts: Optional[int] = field(init=False)
|
||||
broadcasted: bool = field(init=False)
|
||||
metadata: Dict[str, Any] = field(init=False)
|
||||
transport_source: Optional[str] = field(init=False)
|
||||
transport_destination: Optional[str] = field(init=False)
|
||||
@@ -139,6 +140,7 @@ class Frame:
|
||||
self.id: int = obj_id()
|
||||
self.name: str = f"{self.__class__.__name__}#{obj_count(self)}"
|
||||
self.pts: Optional[int] = None
|
||||
self.broadcasted: bool = False
|
||||
self.metadata: Dict[str, Any] = {}
|
||||
self.transport_source: Optional[str] = None
|
||||
self.transport_destination: Optional[str] = None
|
||||
|
||||
@@ -787,8 +787,12 @@ class FrameProcessor(BaseObject):
|
||||
frame_cls: The class of the frame to be broadcasted.
|
||||
**kwargs: Keyword arguments to be passed to the frame's constructor.
|
||||
"""
|
||||
await self.push_frame(frame_cls(**kwargs))
|
||||
await self.push_frame(frame_cls(**kwargs), FrameDirection.UPSTREAM)
|
||||
downstream_frame = frame_cls(**kwargs)
|
||||
downstream_frame.broadcasted = True
|
||||
await self.push_frame(downstream_frame)
|
||||
upstream_frame = frame_cls(**kwargs)
|
||||
upstream_frame.broadcasted = True
|
||||
await self.push_frame(upstream_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
async def broadcast_frame_instance(self, frame: Frame):
|
||||
"""Broadcasts a frame instance upstream and downstream.
|
||||
@@ -815,11 +819,13 @@ class FrameProcessor(BaseObject):
|
||||
new_frame = frame_cls(**init_fields)
|
||||
for k, v in extra_fields.items():
|
||||
setattr(new_frame, k, v)
|
||||
new_frame.broadcasted = True
|
||||
await self.push_frame(new_frame)
|
||||
|
||||
new_frame = frame_cls(**init_fields)
|
||||
for k, v in extra_fields.items():
|
||||
setattr(new_frame, k, v)
|
||||
new_frame.broadcasted = True
|
||||
await self.push_frame(new_frame, FrameDirection.UPSTREAM)
|
||||
|
||||
async def __start(self, frame: StartFrame):
|
||||
|
||||
@@ -1220,10 +1220,9 @@ class RTVIObserver(BaseObserver):
|
||||
frame = data.frame
|
||||
direction = data.direction
|
||||
|
||||
# Only process downstream frames. Some frames are broadcast in both
|
||||
# directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame),
|
||||
# and we only want to send one RTVI message per event.
|
||||
if direction != FrameDirection.DOWNSTREAM:
|
||||
# For broadcasted frames (pushed in both directions), only process
|
||||
# the downstream copy to avoid sending duplicate RTVI messages.
|
||||
if frame.broadcasted and direction != FrameDirection.DOWNSTREAM:
|
||||
return
|
||||
|
||||
# If we have already seen this frame, let's skip it.
|
||||
|
||||
Reference in New Issue
Block a user