From 5f64dae0cfefbed996d9059b8542e5ab0c297b63 Mon Sep 17 00:00:00 2001 From: Mark Backman Date: Sat, 7 Feb 2026 11:42:09 -0500 Subject: [PATCH] Filter RTVIObserver to downstream frames only and broadcast FunctionCallCancelFrame RTVIObserver now skips upstream frames to prevent duplicate RTVI messages when frames are broadcast in both directions. Also changed FunctionCallCancelFrame to use broadcast_frame for consistency with other function call frames. --- changelog/3672.changed.md | 1 + changelog/3672.fixed.md | 1 + src/pipecat/processors/frameworks/rtvi.py | 8 +++++++- src/pipecat/services/llm_service.py | 5 +++-- 4 files changed, 12 insertions(+), 3 deletions(-) create mode 100644 changelog/3672.changed.md create mode 100644 changelog/3672.fixed.md diff --git a/changelog/3672.changed.md b/changelog/3672.changed.md new file mode 100644 index 000000000..9722d00e0 --- /dev/null +++ b/changelog/3672.changed.md @@ -0,0 +1 @@ +- Changed `FunctionCallCancelFrame` to broadcast in both directions for consistency with other function call frames. diff --git a/changelog/3672.fixed.md b/changelog/3672.fixed.md new file mode 100644 index 000000000..1f68ed008 --- /dev/null +++ b/changelog/3672.fixed.md @@ -0,0 +1 @@ +- Fixed `RTVIObserver` sending duplicate client messages for frames that are broadcast in both directions (e.g. `UserStartedSpeakingFrame`, `FunctionCallResultFrame`). diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 61c3ef57e..b6a65a35b 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1023,7 +1023,7 @@ class RTVIObserverParams: sets the default level for unlisted functions:: function_call_report_level={ - "*": RTVIFunctionCallReportLevel.DISABLED, # Default: no events + "*": RTVIFunctionCallReportLevel.NONE, # Default: events with no metadata "get_weather": RTVIFunctionCallReportLevel.FULL, # Expose everything } @@ -1199,6 +1199,12 @@ class RTVIObserver(BaseObserver): frame = data.frame direction = data.direction + # Only process downstream frames. Some frames are broadcast in both + # directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame), + # and we only want to send one RTVI message per event. + if direction != FrameDirection.DOWNSTREAM: + return + # If we have already seen this frame, let's skip it. if frame.id in self._frames_seen: return diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index e354543e4..c59b102b4 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -745,8 +745,9 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService): await self.cancel_task(task) cancelled_tasks.add(task) - frame = FunctionCallCancelFrame(function_name=name, tool_call_id=tool_call_id) - await self.push_frame(frame) + await self.broadcast_frame( + FunctionCallCancelFrame, function_name=name, tool_call_id=tool_call_id + ) logger.debug(f"{self} Function call [{name}:{tool_call_id}] has been cancelled")