Broadcast UserTurnInferenceCompletedFrame on tool calls in filter-incomplete

With ``filter_incomplete_user_turns`` enabled, an LLM that responded to
a user turn by calling a tool (without first emitting a ✓ marker)
never finalized the user turn. ``UserStoppedSpeakingFrame`` stayed
deferred, the assistant aggregator kept ``_user_speaking=True``, and
when ``FunctionCallResultFrame`` arrived its ``not self._user_speaking``
gate dropped the context push — the LLM continuation never ran and
the call hung silently.

Broadcast ``UserTurnInferenceCompletedFrame`` on
``FunctionCallsStartedFrame`` (i.e. the moment the LLM commits to a
tool call, before the function dispatches), gated by a new
``_turn_completion_broadcasted`` flag so the ✓ path and the tool-call
path don't both fire. The flag resets in ``_turn_reset`` alongside
the other per-turn state.

Emitting on the start frame rather than ``LLMFullResponseEndFrame``
also shrinks the race window — ``UserStoppedSpeakingFrame`` (a
``SystemFrame``) has the maximum possible head start over the
``FunctionCallResultFrame`` (``DataFrame``) that follows.
This commit is contained in:
Aleix Conchillo Flaqué
2026-05-15 14:50:35 -07:00
parent ea296babe9
commit 94dbd2fa68

View File

@@ -20,6 +20,7 @@ from loguru import logger
from pipecat.frames.frames import (
Frame,
FunctionCallsStartedFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMMarkerFrame,
@@ -222,6 +223,14 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
# ensures graceful degradation if the LLM disobeys and outputs additional text.
self._turn_suppressed = False
self._turn_complete_found = False # True when ✓ (COMPLETE) is detected
# Set when the LLM made a tool call during this turn. Informational
# only — broadcasting is idempotency-gated by
# ``_turn_completion_broadcasted``.
self._turn_had_function_call = False
# True once ``UserTurnInferenceCompletedFrame`` has been broadcast
# for this turn. Prevents double-broadcast when ✓ and a tool call
# both occur in the same turn.
self._turn_completion_broadcasted = False
# Timeout handling
self._user_turn_completion_config = UserTurnCompletionConfig()
@@ -236,6 +245,27 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
"""
self._user_turn_completion_config = config
async def _broadcast_turn_completion(self):
"""Broadcast ``UserTurnInferenceCompletedFrame`` at most once per turn.
Called from the two places we know the LLM has committed to a
response for the current user turn:
- the ``✓`` marker is detected in the text stream
- a ``FunctionCallsStartedFrame`` is emitted — the LLM committed
to a tool call before producing (or instead of) a marker.
Broadcasting on the tool-call path matters for races: the
downstream ``UserStoppedSpeakingFrame`` needs to propagate
before the function actually executes and a
``FunctionCallResultFrame`` flows back to the assistant
aggregator.
"""
if self._turn_completion_broadcasted:
return
self._turn_completion_broadcasted = True
await self.broadcast_frame(UserTurnInferenceCompletedFrame)
async def _start_incomplete_timeout(self, incomplete_type: Literal["short", "long"]):
"""Start a timeout task for incomplete turn handling.
@@ -325,6 +355,8 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
self._turn_text_buffer = ""
self._turn_suppressed = False
self._turn_complete_found = False
self._turn_had_function_call = False
self._turn_completion_broadcasted = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames, handling turn completion state resets.
@@ -351,7 +383,14 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
frame: The frame to push downstream.
direction: The direction of frame flow. Defaults to downstream.
"""
if isinstance(frame, LLMFullResponseEndFrame):
if isinstance(frame, FunctionCallsStartedFrame):
self._turn_had_function_call = True
# Broadcast turn completion now, before the function dispatches
# — gives ``UserStoppedSpeakingFrame`` maximum time to propagate
# so the assistant aggregator's ``_user_speaking`` is False by
# the time a ``FunctionCallResultFrame`` arrives.
await self._broadcast_turn_completion()
elif isinstance(frame, LLMFullResponseEndFrame):
await self._turn_reset()
await super().push_frame(frame, direction)
@@ -427,7 +466,9 @@ class UserTurnCompletionLLMServiceMixin(FrameProcessor):
# LLMTurnCompletionUserTurnStopStrategy) can fire
# `on_user_turn_stopped`. Must fire before the marker so
# downstream consumers see the signal before the response.
await self.broadcast_frame(UserTurnInferenceCompletedFrame)
# Idempotent: a tool call earlier in the turn may have
# already broadcast.
await self._broadcast_turn_completion()
# Push the marker as a sideband signal that the assistant
# aggregator will prepend to the upcoming aggregated text,