From 94dbd2fa684cb75e06acfcf43d68839e1916441e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Fri, 15 May 2026 14:50:35 -0700 Subject: [PATCH] Broadcast UserTurnInferenceCompletedFrame on tool calls in filter-incomplete MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With ``filter_incomplete_user_turns`` enabled, an LLM that responded to a user turn by calling a tool (without first emitting a ✓ marker) never finalized the user turn. ``UserStoppedSpeakingFrame`` stayed deferred, the assistant aggregator kept ``_user_speaking=True``, and when ``FunctionCallResultFrame`` arrived its ``not self._user_speaking`` gate dropped the context push — the LLM continuation never ran and the call hung silently. Broadcast ``UserTurnInferenceCompletedFrame`` on ``FunctionCallsStartedFrame`` (i.e. the moment the LLM commits to a tool call, before the function dispatches), gated by a new ``_turn_completion_broadcasted`` flag so the ✓ path and the tool-call path don't both fire. The flag resets in ``_turn_reset`` alongside the other per-turn state. Emitting on the start frame rather than ``LLMFullResponseEndFrame`` also shrinks the race window — ``UserStoppedSpeakingFrame`` (a ``SystemFrame``) has the maximum possible head start over the ``FunctionCallResultFrame`` (``DataFrame``) that follows. --- .../turns/user_turn_completion_mixin.py | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/src/pipecat/turns/user_turn_completion_mixin.py b/src/pipecat/turns/user_turn_completion_mixin.py index c423e16bf..15fbd78ec 100644 --- a/src/pipecat/turns/user_turn_completion_mixin.py +++ b/src/pipecat/turns/user_turn_completion_mixin.py @@ -20,6 +20,7 @@ from loguru import logger from pipecat.frames.frames import ( Frame, + FunctionCallsStartedFrame, InterruptionFrame, LLMFullResponseEndFrame, LLMMarkerFrame, @@ -222,6 +223,14 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): # ensures graceful degradation if the LLM disobeys and outputs additional text. self._turn_suppressed = False self._turn_complete_found = False # True when ✓ (COMPLETE) is detected + # Set when the LLM made a tool call during this turn. Informational + # only — broadcasting is idempotency-gated by + # ``_turn_completion_broadcasted``. + self._turn_had_function_call = False + # True once ``UserTurnInferenceCompletedFrame`` has been broadcast + # for this turn. Prevents double-broadcast when ✓ and a tool call + # both occur in the same turn. + self._turn_completion_broadcasted = False # Timeout handling self._user_turn_completion_config = UserTurnCompletionConfig() @@ -236,6 +245,27 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): """ self._user_turn_completion_config = config + async def _broadcast_turn_completion(self): + """Broadcast ``UserTurnInferenceCompletedFrame`` at most once per turn. + + Called from the two places we know the LLM has committed to a + response for the current user turn: + + - the ``✓`` marker is detected in the text stream + - a ``FunctionCallsStartedFrame`` is emitted — the LLM committed + to a tool call before producing (or instead of) a marker. + + Broadcasting on the tool-call path matters for races: the + downstream ``UserStoppedSpeakingFrame`` needs to propagate + before the function actually executes and a + ``FunctionCallResultFrame`` flows back to the assistant + aggregator. + """ + if self._turn_completion_broadcasted: + return + self._turn_completion_broadcasted = True + await self.broadcast_frame(UserTurnInferenceCompletedFrame) + async def _start_incomplete_timeout(self, incomplete_type: Literal["short", "long"]): """Start a timeout task for incomplete turn handling. @@ -325,6 +355,8 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): self._turn_text_buffer = "" self._turn_suppressed = False self._turn_complete_found = False + self._turn_had_function_call = False + self._turn_completion_broadcasted = False async def process_frame(self, frame: Frame, direction: FrameDirection): """Process frames, handling turn completion state resets. @@ -351,7 +383,14 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): frame: The frame to push downstream. direction: The direction of frame flow. Defaults to downstream. """ - if isinstance(frame, LLMFullResponseEndFrame): + if isinstance(frame, FunctionCallsStartedFrame): + self._turn_had_function_call = True + # Broadcast turn completion now, before the function dispatches + # — gives ``UserStoppedSpeakingFrame`` maximum time to propagate + # so the assistant aggregator's ``_user_speaking`` is False by + # the time a ``FunctionCallResultFrame`` arrives. + await self._broadcast_turn_completion() + elif isinstance(frame, LLMFullResponseEndFrame): await self._turn_reset() await super().push_frame(frame, direction) @@ -427,7 +466,9 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor): # LLMTurnCompletionUserTurnStopStrategy) can fire # `on_user_turn_stopped`. Must fire before the marker so # downstream consumers see the signal before the response. - await self.broadcast_frame(UserTurnInferenceCompletedFrame) + # Idempotent: a tool call earlier in the turn may have + # already broadcast. + await self._broadcast_turn_completion() # Push the marker as a sideband signal that the assistant # aggregator will prepend to the upcoming aggregated text,