diff --git a/changelog/3672.changed.md b/changelog/3672.changed.md new file mode 100644 index 000000000..9722d00e0 --- /dev/null +++ b/changelog/3672.changed.md @@ -0,0 +1 @@ +- Changed `FunctionCallCancelFrame` to broadcast in both directions for consistency with other function call frames. diff --git a/changelog/3672.fixed.md b/changelog/3672.fixed.md new file mode 100644 index 000000000..1f68ed008 --- /dev/null +++ b/changelog/3672.fixed.md @@ -0,0 +1 @@ +- Fixed `RTVIObserver` sending duplicate client messages for frames that are broadcast in both directions (e.g. `UserStartedSpeakingFrame`, `FunctionCallResultFrame`). diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index 61c3ef57e..b6a65a35b 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -1023,7 +1023,7 @@ class RTVIObserverParams: sets the default level for unlisted functions:: function_call_report_level={ - "*": RTVIFunctionCallReportLevel.DISABLED, # Default: no events + "*": RTVIFunctionCallReportLevel.NONE, # Default: events with no metadata "get_weather": RTVIFunctionCallReportLevel.FULL, # Expose everything } @@ -1199,6 +1199,12 @@ class RTVIObserver(BaseObserver): frame = data.frame direction = data.direction + # Only process downstream frames. Some frames are broadcast in both + # directions (e.g. UserStartedSpeakingFrame, FunctionCallResultFrame), + # and we only want to send one RTVI message per event. + if direction != FrameDirection.DOWNSTREAM: + return + # If we have already seen this frame, let's skip it. if frame.id in self._frames_seen: return diff --git a/src/pipecat/services/llm_service.py b/src/pipecat/services/llm_service.py index e354543e4..c59b102b4 100644 --- a/src/pipecat/services/llm_service.py +++ b/src/pipecat/services/llm_service.py @@ -745,8 +745,9 @@ class LLMService(UserTurnCompletionLLMServiceMixin, AIService): await self.cancel_task(task) cancelled_tasks.add(task) - frame = FunctionCallCancelFrame(function_name=name, tool_call_id=tool_call_id) - await self.push_frame(frame) + await self.broadcast_frame( + FunctionCallCancelFrame, function_name=name, tool_call_id=tool_call_id + ) logger.debug(f"{self} Function call [{name}:{tool_call_id}] has been cancelled")