PipelineTask owns its TaskManager but is itself a BaseObject, so it
inherits create_task/cancel_task. Replace the explicit
self._task_manager.create_task(coro, f"{self}::name") call sites with
self.create_task(coro, "name") for consistency with other BaseObject
subclasses.
PipelineTask owns its TaskManager (still constructed in __init__ since
TaskObserver needs it eagerly). Adding the explicit
`await super().setup(self._task_manager)` in `_setup()` formalizes the
BaseObject lifecycle so any future wiring added to BaseObject.setup is
picked up automatically.
PipelineTask owns its TaskManager but is itself a BaseObject, so it
inherits create_task/cancel_task. Replace the explicit
self._task_manager.create_task(coro, f"{self}::name") call sites with
self.create_task(coro, "name") for consistency with other BaseObject
subclasses.
Lift the task manager wiring (`_task_manager`, `task_manager` property,
`create_task`, `cancel_task`, and `setup(task_manager)`) up to
`BaseObject`. Owners propagate the task manager to their child
`BaseObject`s via `await child.setup(task_manager)`, matching the
existing convention.
Removes duplicated `_task_manager` / `task_manager` property / setup
implementations from `FrameProcessor`, `FrameProcessorMetrics`,
`UserIdleController`, `UserTurnController`,
`BaseUserTurnStartStrategy`, and `BaseUserTurnStopStrategy`.
GeminiLiveVertexLLMService overrides _supports_non_blocking_tools to
return False — Vertex AI's Gemini Live endpoint doesn't yet accept the
NON_BLOCKING behavior field on function declarations or the scheduling
field on FunctionResponse, and sending either breaks tool calling.
Effect: function declarations sent to Vertex no longer carry
NON_BLOCKING; FunctionResponses no longer carry scheduling: WHEN_IDLE.
Users registering a function with cancel_on_interruption=False against
Vertex get the same one-time logger.error + push_error the base class
surfaces on Gemini 3.x.
Mirrors the same change applied to AWSNovaSonicLLMService and
OpenAIRealtimeLLMService in #4441 / GrokRealtimeLLMService in #4447:
replaces the implicit "final happens last" pattern in
_process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block, plus a trailing defensive
`continue` so async-tool messages with an unrecognized kind don't fall
through to the regular tool-result handling block.
Honors cancel_on_interruption=False on Gemini Live for models that support
Gemini's NON_BLOCKING tool mechanism (Gemini 2.x at the time of writing).
Function declarations registered via register_function(...,
cancel_on_interruption=False) are sent with behavior: NON_BLOCKING so the
conversation continues while the tool runs; the matching FunctionResponse
carries scheduling: WHEN_IDLE so the result lands at a graceful pause
rather than mid-sentence. Synchronous (default) tools stay BLOCKING —
applying NON_BLOCKING uniformly produced filler responses like "let me
look that up for you" on regular calls, since the model knew it would
have an opportunity to keep talking while waiting.
A new _supports_non_blocking_tools property gates the flow. On models
that don't support it (currently Gemini 3.x), the service falls back to
plain blocking behavior and surfaces a one-time error + ErrorFrame the
moment async-tool messages first appear in the context, explaining that
the flag's intent is not achievable.
Caveat (Gemini 2.5): an intermittent server-side 1008 "Operation is not
implemented" error can fire when realtime input arrives during a pending
tool call. We auto-reconnect, but the user may need to repeat what they
were saying. The proposed mitigation
(https://discuss.ai.google.dev/t/gemini-live-api-websocket-error-1008-operation-is-not-implemented-or-supported-or-enabled/114644/56)
of gating realtime input during pending tool calls is fundamentally
incompatible with NON_BLOCKING tool calling, so we don't apply it.
Lays groundwork for cancel_on_interruption=False support on Gemini Live by
restructuring _process_completed_function_calls to match the shape used by
AWSNovaSonicLLMService and OpenAIRealtimeLLMService in #4441: a single-pass
forward iteration over raw context messages that detects async-tool
messages via async_tool_messages.parse_message and routes them — started
skipped silently, intermediate logged-as-error and surfaced via push_error,
final delivered via the formal FunctionResponse channel.
Replaces the prior two-pass structure that went through the adapter for
sync results — the service now uses a lightweight self._tool_call_id_to_name
map (populated when the model issues tool calls) for the name lookup the
adapter used to provide. Extracts a new GeminiLLMAdapter.to_function_response_dict
static method for the dict-coercion logic that wraps non-dict tool returns
as {value: <result>} for Gemini's FunctionResponse.response field; the
adapter's existing inline copy in _from_standard_message uses it too.
Example consolidation:
- Folds realtime-gemini-live-function-calling.py into the base
realtime-gemini-live.py example so the base exercises function calling
out of the box (matching realtime-openai.py and realtime-aws-nova-sonic.py).
- Renames realtime-gemini-live-vertex-function-calling.py to
realtime-gemini-live-vertex.py, mirroring the consolidation.
- Adds realtime-gemini-live-async-tool.py.
- Updates scripts/evals/run-release-evals.py for the renames.
This commit alone doesn't make cancel_on_interruption=False fully work on
Gemini Live — additional investigation is pending. This is foundational
work to be built on.
Renames _ASYNC_TOOL_PLACEHOLDER_RESULT to _ASYNC_TOOL_STARTED_RESULT to
match the kind names from async_tool_messages, and lifts the inline
"[Async tool result for tool_call_id=...] {result}" into a sibling
_ASYNC_TOOL_FINAL_RESULT_TEMPLATE constant for the same reason.
Replaces the prior "log a warning and skip" approach with actual handling
of async-tool messages on Ultravox.
The catch with Ultravox is that its API freezes the conversation between
client_tool_invocation and the matching client_tool_result — there's no
"keep talking while the tool runs" channel like NON_BLOCKING on Gemini
or function_call_output-without-blocking on OpenAI Realtime. So:
- When the model invokes an async-registered function (cancel_on_inter
ruption=False), the service immediately ships a placeholder
client_tool_result that tells the model "the actual result isn't
ready yet; a follow-up will arrive shortly; keep the conversation
going". This unfreezes the conversation. The placeholder is sent
from _handle_tool_invocation, since the started async-tool message
doesn't reach the context-frame path until later.
- When the real tool finishes, the final async-tool message lands in
the context. _handle_context now forward-iterates and routes
async-tool messages: started is a no-op (placeholder already sent),
intermediate is logged-as-error and dropped (matching the other
realtime services), and final is injected as user-side text via
user_text_message with bracketed framing — the only mechanism
Ultravox offers for adding non-tool input mid-conversation.
Hoists the registry-lookup helper to LLMService as
_function_is_async(name) so future services can use the same pattern
without re-implementing it.
Adds an async-tool example file for Ultravox modeled on the existing
ones for the other realtime services.
Mirrors the same change applied to AWSNovaSonicLLMService and
OpenAIRealtimeLLMService in #4441: replaces the implicit "final happens
last" pattern in _process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block, plus a trailing defensive
`continue` so async-tool messages with an unrecognized kind don't fall
through to the regular tool-result handling block.
Applies the same async-tool message routing introduced for AWSNovaSonicLLMService
and OpenAIRealtimeLLMService to additional realtime LLM services where the
flag's intent ("keep talking while the tool runs") is achievable:
- GrokRealtimeLLMService (xAI Realtime — also benefits the deprecated Grok
alias since it re-exports the xAI module)
- AzureRealtimeLLMService picks up the fix transitively by inheriting from
OpenAIRealtimeLLMService — no code change needed.
GrokRealtimeLLMService's _process_completed_function_calls now matches
the canonical pattern: skip LLMSpecificMessage, detect async-tool messages
via parse_message and route them — started skipped silently, intermediate
logged as an error and surfaced via push_error, final delivered through
the same channel as a synchronous result.
UltravoxRealtimeLLMService instead gets a one-time warning when async-tool
messages appear in the context. The Ultravox API freezes the conversation
during tool execution
(https://docs.ultravox.ai/tools/async-tools#custom-tool-timeouts), so the
flag's "keep talking while the tool runs" intent isn't achievable there —
applying the same code pattern would mislead users into expecting a UX
Ultravox can't deliver. Surfacing a clear warning is the right behavior
until Ultravox grows true async tool support.
Adds async-tool example files for Grok and Azure modeled on the existing
Nova Sonic / OpenAI Realtime ones (10s simulated network delay, weather
tool registered with cancel_on_interruption=False).
Two services remain excluded:
- GeminiLiveLLMService — the async-tool path needs deeper investigation.
- InworldRealtimeLLMService — appears to have a pre-existing problem
with even simple synchronous tool calling on its Realtime API (the
request reaches the server fine, but response generation fails with a
generic server_error).
Replaces the implicit "final happens last" pattern in
_process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block in both AWSNovaSonicLLMService
and OpenAIRealtimeLLMService. Adds a trailing defensive `continue` so
async-tool messages with an unrecognized kind don't fall through to the
regular tool-result handling block — clearer at the call site, and safer
against future additions to AsyncToolMessageKind.
Before the new async-tool mechanism landed, AWSNovaSonicLLMService and
OpenAIRealtimeLLMService honored cancel_on_interruption=False by simply
not cancelling in-flight function calls on interruption — the eventual
result then flowed through the same channel as any synchronous tool
result. The new mechanism (which appends started/intermediate/final
messages to the LLM context as the underlying task progresses) broke
that path: the realtime services didn't know how to interpret those
messages, and the eventual result was never delivered to the provider.
Restore the flag's behavior by teaching both services to detect
async-tool messages in the context and route them appropriately:
- started → skipped silently. The provider already issued the tool call
and natively awaits a result; nothing to send for the started marker.
- final → delivered via the formal tool-result channel. Same path as a
synchronous tool result, just delayed.
Streamed intermediate results (FunctionCallResultProperties(is_final=
False)) are not supported on these realtime services. An intermediate
result is logged as an error and surfaced via push_error, then dropped.
Use a non-realtime LLM service if a tool needs to stream intermediate
results. (Docstrings on register_function, register_direct_function, and
FunctionCallResultProperties.is_final updated to call this out.)
A new shared module pipecat.processors.aggregators.async_tool_messages
is the single source of truth for the on-the-wire payload shape: the
aggregator uses its build_*_message functions when injecting messages,
and the realtime services use parse_message when scanning the context.
Adds two example files exercising a network-delayed weather tool with
each service. The plain realtime-aws-nova-sonic.py example is also
reverted to a synchronous tool call now that the async variant lives in
its own file.
Similar fixes for other realtime services are forthcoming.
Aligns deprecation docstrings on LLMUserAggregatorParams and
LLMAssistantAggregatorParams with CONTRIBUTING.md conventions:
present-tense parameter descriptions plus a `.. deprecated:: 1.2.0`
directive noting replacement and 2.0.0 removal. Also adds a runtime
DeprecationWarning for `user_turn_completion_config`, which previously
had no warning despite being deprecated.
The old name overlapped semantically with `UserStoppedSpeakingFrame`:
both could be read as "the user's turn is done." They're at different
layers — `UserStoppedSpeakingFrame` is the acoustic stop signal,
while this frame is the post-judgment "inference about the turn is
now complete (turn is semantically final)" signal emitted by the LLM
mixin (on ✓), an end-of-turn classifier, or a custom producer.
The new name pairs naturally with the existing
`on_user_turn_inference_triggered` event vocabulary and removes the
ambiguity with `UserStoppedSpeakingFrame`.
Wrap the detector chain with `deferred(...)` and append the LLM
completion gate via a `UserTurnStrategies` specialization rather than
a free-standing helper, mirroring the existing
`ExternalUserTurnStrategies` pattern. The class lives next to other
strategy containers in `pipecat.turns.user_turn_strategies`, so users
discover it where they're already configuring `user_turn_strategies`.
The deprecated `filter_incomplete_user_turns` flag now rewires
through `FilterIncompleteUserTurnStrategies` under the hood, keeping
the migration path identical to before. `deferred(...)` stays public
as the explicit escape hatch for non-default compositions.
When a stop-strategy chain splits inference-triggered from
finalization (e.g. `LLMTurnCompletionUserTurnStopStrategy` gating a
deferred detector), more than one inference can fire inside a single
user turn — each adds the new transcription segment to the context.
Previously each inference overwrote `_pending_user_turn_aggregation`,
so the eventual `on_user_turn_stopped` event surfaced only the
segment from the last inference, dropping anything the user said
before it.
Concatenate each segment into `_full_user_turn_aggregation` instead
of overwriting, and combine that running buffer with any post-final-
inference segment when emitting the public event.
Add an `LLMMarkerFrame(DataFrame)` for sideband LLM markers that need
to be persisted to context but should not flow through the standard
text path (TTS, transcript). The frame carries an
`append_to_context_immediately` flag so the assistant aggregator can
either commit the marker as a stand-alone message (○ / ◐) or merge it
with the upcoming aggregation as a prefix on the response (✓).
`UserTurnCompletionLLMServiceMixin` now emits `LLMMarkerFrame` instead
of pushing the marker as `LLMTextFrame(skip_tts=True)`, which fixes
the case where an incomplete-turn marker (○ / ◐) was aggregated by
the assistant aggregator but never committed to the context because
the assistant turn lifecycle didn't run to completion (no spoken
response, no `LLMFullResponseEndFrame`-driven `push_aggregation`).
The frame is intentionally generic so other components — STT services
with built-in turn signals, end-of-turn classifiers, custom
annotations — can use the same mechanism to inject sideband signals
into the assistant context.
`LLMTurnCompletionUserTurnStopStrategy` previously bundled two
concerns: pushing `LLMUpdateSettingsFrame` on `StartFrame`, and
finalizing the turn on `UserTurnCompletedFrame`. The latter is
producer-agnostic — any component that emits `UserTurnCompletedFrame`
(STT with built-in turn detection, dedicated end-of-turn classifiers,
custom code) can drive finalization the same way.
Move the frame-handling half into a new
`ExternalUserTurnCompletionStopStrategy`. The LLM-specific subclass
now only adds the settings-frame push and inherits finalization. Mirrors
the existing `ExternalUserTurnStopStrategy` naming pattern.
Fixes a real bug: with `filter_incomplete_user_turns` enabled, the
smart-turn detector's tentative stop was firing `on_user_turn_stopped`
before the LLM had a chance to veto it. Observers, transcript
appenders and UI indicators received an early — and sometimes
duplicated — signal.
Decomposes the single stop concern into two events:
- `on_user_turn_inference_triggered` fires when a stop strategy has
enough signal to start LLM inference. The aggregator pushes the
context here, kicking off the LLM call.
- `on_user_turn_stopped` fires only when the user turn is semantically
final. Built-in strategies fire both events at the same call site,
preserving today's behavior for the common case.
Adds `LLMTurnCompletionUserTurnStopStrategy`, which gates
finalization on a `UserTurnCompletedFrame` (a fieldless system frame
emitted by any component judging turn completeness — currently the
`UserTurnCompletionLLMServiceMixin` on `✓`).
Adds `deferred(strategy)` / `DeferredUserTurnStopStrategy`, a thin
wrapper that forwards an inner strategy's events except
`on_user_turn_stopped`. Use this to install a stop strategy as an
inference trigger only, leaving finalization to a peer (e.g. the LLM
completion strategy).
Adds `llm_completion_user_turn_stop_strategies()` for the common
case:
UserTurnStrategies(
stop=llm_completion_user_turn_stop_strategies(),
)
Deprecates `LLMUserAggregatorParams.filter_incomplete_user_turns`.
The aggregator emits a `DeprecationWarning`, wraps existing stop
strategies with `deferred(...)`, and appends
`LLMTurnCompletionUserTurnStopStrategy` automatically.