Compare commits

...

197 Commits

Author SHA1 Message Date
filipi87
3954f96a6b Preventing injecting silence while we are receiving audio from TTS. 2026-05-13 13:44:05 -03:00
filipi87
6887ac394a Buffering audio to avoid glitches. 2026-05-13 11:50:50 -03:00
filipi87
4f034d4d4e Recording audio in the nvidia sagemaker example. 2026-05-13 07:55:52 -03:00
Filipi da Silva Fuchter
9148e307cc Merge pull request #4464 from pipecat-ai/filipi/nvidia_sagemaker
NVidia sagemaker - TTS and STT services
2026-05-13 07:53:26 -03:00
Filipi da Silva Fuchter
703d23b658 Update examples/voice/voice-nvidia-sagemaker.py
Co-authored-by: Mark Backman <mark@daily.co>
2026-05-13 06:36:57 -04:00
Filipi da Silva Fuchter
227ba288da Update examples/voice/voice-nvidia-sagemaker.py
Co-authored-by: Mark Backman <mark@daily.co>
2026-05-13 06:36:45 -04:00
filipi87
0740021ff4 Removing changelog for sanitize_text_for_tts 2026-05-12 18:29:35 -03:00
filipi87
68f265fa62 Fixing ruff format. 2026-05-12 18:28:14 -03:00
filipi87
b9f052079d Removing sanitize_text_for_tts 2026-05-12 18:22:15 -03:00
filipi87
130bb7371c Removing sanitize_text_for_tts 2026-05-12 18:21:47 -03:00
filipi87
5d61763987 Refactoring how we are reconnecting the STT. 2026-05-12 18:20:19 -03:00
filipi87
7984556692 Fixing typecheck. 2026-05-12 18:00:07 -03:00
filipi87
bea9e4b3ba New example voice-nvidia-sagemaker.py 2026-05-12 17:44:11 -03:00
Mark Backman
19df443500 Merge pull request #4471 from pipecat-ai/mb/fix-gstreamer-pyright-import 2026-05-12 16:34:48 -04:00
Mark Backman
07f241143b Merge pull request #4469 from pipecat-ai/mb/remove-vad-analyzer-runner-utils-docstring 2026-05-12 16:34:27 -04:00
Mark Backman
2fdb9bbf42 Merge pull request #4462 from pipecat-ai/mb/cartesia-sonic-3.5 2026-05-12 16:34:04 -04:00
filipi87
0146947b68 Addressing the comments left in the PR review. 2026-05-12 17:12:19 -03:00
kompfner
88deebbf5f Merge pull request #4472 from pipecat-ai/pk/default-gpt-realtime-2
Switch OpenAIRealtimeLLMService default model to gpt-realtime-2
2026-05-12 15:17:12 -04:00
filipi87
c2bdc1aada Fixing metrics and adding extra guard after sanitization. 2026-05-12 16:11:01 -03:00
Paul Kompfner
fc0589e8f1 Switch OpenAIRealtimeLLMService default model to gpt-realtime-2 2026-05-12 14:57:59 -04:00
kompfner
67f8d34e9f Merge pull request #4470 from pipecat-ai/pk/gpt-realtime-2-reasoning-effort
Add reasoning support to OpenAIRealtimeLLMService for gpt-realtime-2
2026-05-12 14:43:39 -04:00
kompfner
d3b8710720 Merge pull request #4465 from pipecat-ai/pk/gpt-realtime-2
Handle gpt-realtime-2 multi-output-item audio responses
2026-05-12 14:30:15 -04:00
Mark Backman
86e2aa85d3 Fix GStreamer pipeline source pyright import 2026-05-12 14:16:36 -04:00
Paul Kompfner
b89500256d Drop debug logging added while investigating multi-output-item audio 2026-05-12 14:05:16 -04:00
Paul Kompfner
a52bdef32b Add reasoning support to OpenAIRealtimeLLMService for gpt-realtime-2 2026-05-12 13:55:19 -04:00
Mark Backman
afd9fc5fdf Remove vad_analyzer from create_transport docstring example 2026-05-12 13:50:17 -04:00
filipi87
7f98dba925 Changelog files for the new nvidia features. 2026-05-12 14:43:12 -03:00
filipi87
6a27ed35b1 Fixing the Bidi client to accept None. 2026-05-12 12:19:30 -03:00
filipi87
a34864d643 Fixed ruff, pyright, and test_service_init failures 2026-05-12 11:39:52 -03:00
Paul Kompfner
007fa3a3a8 Handle gpt-realtime-2 multi-output-item audio responses
A single Realtime API response can now contain more than one audio item
(observed with gpt-realtime-2), and the first item's audio.done can
arrive after deltas from the second have started arriving. Deltas still
arrive strictly in playback order across items, so we keep forwarding
them as received — matching OpenAI's reference implementation.

Adjusted OpenAIRealtimeLLMService so a multi-item response is treated as
one continuous TTS turn:

- _handle_evt_audio_delta: on item switch, advance the tracked item in
  place (reset total_size) without emitting another TTSStartedFrame.
  Truncation now always targets the latest item.
- _handle_evt_audio_done: debug-trace only; no longer pushes
  TTSStoppedFrame.
- _handle_evt_response_done: pushes a single TTSStoppedFrame per turn,
  bookending the audio with the Started pushed on the first delta.

Added tests covering single-item, overlapping multi-item, non-overlapping
multi-item, and interrupt-during-multi-item (last-item-wins truncation).
2026-05-12 10:34:50 -04:00
filipi87
5dd7413c00 Nvidia Sagemaker Nemotron ASR STT service 2026-05-12 11:16:00 -03:00
filipi87
8e0a338d96 Nvidia Sagemaker Magpie TTS service 2026-05-12 11:15:42 -03:00
Mark Backman
d65aee9181 Add changelog for #4462 2026-05-11 17:34:00 -04:00
Mark Backman
1755016679 Update default Cartesia TTS model to sonic-3.5 2026-05-11 17:33:40 -04:00
Mark Backman
b7f6298601 Merge pull request #4461 from pipecat-ai/mb/security-vuln-2025-05-11
Update uv.lock for urllib3 and langchain-core
2026-05-11 15:58:05 -04:00
Mark Backman
396873ac7e Merge pull request #4460 from pipecat-ai/mb/codex-skills
Add Codex skills and AGENTS.md
2026-05-11 15:57:49 -04:00
Mark Backman
5b33964a1b Update uv.lock for urllib3 and langchain-core 2026-05-11 15:51:01 -04:00
Mark Backman
8b37cd1d3a Add agent-neutral repository instructions 2026-05-11 15:43:43 -04:00
Mark Backman
7a2b667fa1 Add Codex skill symlinks 2026-05-11 15:27:49 -04:00
Mark Backman
ee8c607315 Merge pull request #4452 from pipecat-ai/mb/cleanup-frontmatter
Add cleanup skill frontmatter
2026-05-11 09:33:44 -04:00
Aleix Conchillo Flaqué
71578e7151 Merge pull request #4449 from pipecat-ai/aleix/base-object-task-manager
Move create_task and cancel_task from FrameProcessor to BaseObject
2026-05-10 20:36:54 -07:00
Aleix Conchillo Flaqué
77058b01c4 Add changelog for #4449 2026-05-10 20:34:52 -07:00
Aleix Conchillo Flaqué
4f85e7c089 Fix pyright cr_code access on Coroutine in BaseObject.create_task
`collections.abc.Coroutine` doesn't expose `cr_code`/`co_name`; only
native coroutine objects do. Use `getattr` chains so pyright is happy
and any non-native awaitable falls back to a generic task name instead
of crashing.
2026-05-10 20:34:52 -07:00
Aleix Conchillo Flaqué
15531c8112 Wire TaskObserver via setup() instead of constructor
TaskObserver previously took a TaskManager in __init__ and reached into
it directly. Since BaseObject now provides task_manager / create_task /
cancel_task, drop the constructor argument and call
`observer.setup(task_manager)` from PipelineTask._setup() before
starting it.
2026-05-10 20:34:52 -07:00
Mark Backman
b9e8f13105 Add cleanup skill frontmatter 2026-05-09 12:30:20 -07:00
Aleix Conchillo Flaqué
784667bad2 Use inherited create_task/cancel_task in PipelineTask
PipelineTask owns its TaskManager but is itself a BaseObject, so it
inherits create_task/cancel_task. Replace the explicit
self._task_manager.create_task(coro, f"{self}::name") call sites with
self.create_task(coro, "name") for consistency with other BaseObject
subclasses.
2026-05-08 15:03:44 -07:00
Aleix Conchillo Flaqué
33db71ec32 Call super().setup() in PipelineTask to honor BaseObject contract
PipelineTask owns its TaskManager (still constructed in __init__ since
TaskObserver needs it eagerly). Adding the explicit
`await super().setup(self._task_manager)` in `_setup()` formalizes the
BaseObject lifecycle so any future wiring added to BaseObject.setup is
picked up automatically.
2026-05-08 15:03:44 -07:00
Aleix Conchillo Flaqué
dc035df0aa Use inherited create_task/cancel_task in PipelineTask
PipelineTask owns its TaskManager but is itself a BaseObject, so it
inherits create_task/cancel_task. Replace the explicit
self._task_manager.create_task(coro, f"{self}::name") call sites with
self.create_task(coro, "name") for consistency with other BaseObject
subclasses.
2026-05-08 15:03:44 -07:00
Aleix Conchillo Flaqué
df1b071a13 Move create_task and cancel_task from FrameProcessor to BaseObject
Lift the task manager wiring (`_task_manager`, `task_manager` property,
`create_task`, `cancel_task`, and `setup(task_manager)`) up to
`BaseObject`. Owners propagate the task manager to their child
`BaseObject`s via `await child.setup(task_manager)`, matching the
existing convention.

Removes duplicated `_task_manager` / `task_manager` property / setup
implementations from `FrameProcessor`, `FrameProcessorMetrics`,
`UserIdleController`, `UserTurnController`,
`BaseUserTurnStartStrategy`, and `BaseUserTurnStopStrategy`.
2026-05-08 15:03:44 -07:00
kompfner
95bcebe774 Merge pull request #4448 from pipecat-ai/pk/gemini-live-async-tool-support
feat: support cancel_on_interruption=False on Gemini Live (Gemini 2.x)
2026-05-08 16:57:32 -04:00
Paul Kompfner
5509377344 fix(gemini-live-vertex): disable NON_BLOCKING tools
GeminiLiveVertexLLMService overrides _supports_non_blocking_tools to
return False — Vertex AI's Gemini Live endpoint doesn't yet accept the
NON_BLOCKING behavior field on function declarations or the scheduling
field on FunctionResponse, and sending either breaks tool calling.

Effect: function declarations sent to Vertex no longer carry
NON_BLOCKING; FunctionResponses no longer carry scheduling: WHEN_IDLE.
Users registering a function with cancel_on_interruption=False against
Vertex get the same one-time logger.error + push_error the base class
surfaces on Gemini 3.x.
2026-05-08 16:54:15 -04:00
Paul Kompfner
e21180b962 refactor(gemini-live): use inherited LLMService._function_is_async
The same registry-lookup helper was hoisted to LLMService in #4447, so
drop the local duplicate. Behavior unchanged.
2026-05-08 16:42:54 -04:00
Paul Kompfner
53922819ed refactor: explicit kind=='final' check in async-tool routing (Gemini Live)
Mirrors the same change applied to AWSNovaSonicLLMService and
OpenAIRealtimeLLMService in #4441 / GrokRealtimeLLMService in #4447:
replaces the implicit "final happens last" pattern in
_process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block, plus a trailing defensive
`continue` so async-tool messages with an unrecognized kind don't fall
through to the regular tool-result handling block.
2026-05-08 16:42:54 -04:00
Paul Kompfner
6faeffb884 chore: add changelog entry for cancel_on_interruption=False on Gemini Live 2026-05-08 16:42:54 -04:00
Paul Kompfner
9086a46900 feat(gemini-live): support cancel_on_interruption=False on supported models
Honors cancel_on_interruption=False on Gemini Live for models that support
Gemini's NON_BLOCKING tool mechanism (Gemini 2.x at the time of writing).
Function declarations registered via register_function(...,
cancel_on_interruption=False) are sent with behavior: NON_BLOCKING so the
conversation continues while the tool runs; the matching FunctionResponse
carries scheduling: WHEN_IDLE so the result lands at a graceful pause
rather than mid-sentence. Synchronous (default) tools stay BLOCKING —
applying NON_BLOCKING uniformly produced filler responses like "let me
look that up for you" on regular calls, since the model knew it would
have an opportunity to keep talking while waiting.

A new _supports_non_blocking_tools property gates the flow. On models
that don't support it (currently Gemini 3.x), the service falls back to
plain blocking behavior and surfaces a one-time error + ErrorFrame the
moment async-tool messages first appear in the context, explaining that
the flag's intent is not achievable.

Caveat (Gemini 2.5): an intermittent server-side 1008 "Operation is not
implemented" error can fire when realtime input arrives during a pending
tool call. We auto-reconnect, but the user may need to repeat what they
were saying. The proposed mitigation
(https://discuss.ai.google.dev/t/gemini-live-api-websocket-error-1008-operation-is-not-implemented-or-supported-or-enabled/114644/56)
of gating realtime input during pending tool calls is fundamentally
incompatible with NON_BLOCKING tool calling, so we don't apply it.
2026-05-08 16:42:54 -04:00
Paul Kompfner
1a4a6f4edf refactor(gemini-live): bring tool-result handling in line with the canonical realtime pattern
Lays groundwork for cancel_on_interruption=False support on Gemini Live by
restructuring _process_completed_function_calls to match the shape used by
AWSNovaSonicLLMService and OpenAIRealtimeLLMService in #4441: a single-pass
forward iteration over raw context messages that detects async-tool
messages via async_tool_messages.parse_message and routes them — started
skipped silently, intermediate logged-as-error and surfaced via push_error,
final delivered via the formal FunctionResponse channel.

Replaces the prior two-pass structure that went through the adapter for
sync results — the service now uses a lightweight self._tool_call_id_to_name
map (populated when the model issues tool calls) for the name lookup the
adapter used to provide. Extracts a new GeminiLLMAdapter.to_function_response_dict
static method for the dict-coercion logic that wraps non-dict tool returns
as {value: <result>} for Gemini's FunctionResponse.response field; the
adapter's existing inline copy in _from_standard_message uses it too.

Example consolidation:

- Folds realtime-gemini-live-function-calling.py into the base
  realtime-gemini-live.py example so the base exercises function calling
  out of the box (matching realtime-openai.py and realtime-aws-nova-sonic.py).
- Renames realtime-gemini-live-vertex-function-calling.py to
  realtime-gemini-live-vertex.py, mirroring the consolidation.
- Adds realtime-gemini-live-async-tool.py.
- Updates scripts/evals/run-release-evals.py for the renames.

This commit alone doesn't make cancel_on_interruption=False fully work on
Gemini Live — additional investigation is pending. This is foundational
work to be built on.
2026-05-08 16:42:54 -04:00
kompfner
ff80cde44e Merge pull request #4447 from pipecat-ai/pk/realtime-async-tool-support-followup
fix: extend cancel_on_interruption=False regression fix to remaining realtime services
2026-05-08 16:40:32 -04:00
Paul Kompfner
fb74f7714c refactor(ultravox): name async-tool result strings after the kinds they serve
Renames _ASYNC_TOOL_PLACEHOLDER_RESULT to _ASYNC_TOOL_STARTED_RESULT to
match the kind names from async_tool_messages, and lifts the inline
"[Async tool result for tool_call_id=...] {result}" into a sibling
_ASYNC_TOOL_FINAL_RESULT_TEMPLATE constant for the same reason.
2026-05-08 16:35:14 -04:00
Paul Kompfner
4864eddbc7 feat(ultravox): support cancel_on_interruption=False via placeholder + final-as-text
Replaces the prior "log a warning and skip" approach with actual handling
of async-tool messages on Ultravox.

The catch with Ultravox is that its API freezes the conversation between
client_tool_invocation and the matching client_tool_result — there's no
"keep talking while the tool runs" channel like NON_BLOCKING on Gemini
or function_call_output-without-blocking on OpenAI Realtime. So:

- When the model invokes an async-registered function (cancel_on_inter
  ruption=False), the service immediately ships a placeholder
  client_tool_result that tells the model "the actual result isn't
  ready yet; a follow-up will arrive shortly; keep the conversation
  going". This unfreezes the conversation. The placeholder is sent
  from _handle_tool_invocation, since the started async-tool message
  doesn't reach the context-frame path until later.
- When the real tool finishes, the final async-tool message lands in
  the context. _handle_context now forward-iterates and routes
  async-tool messages: started is a no-op (placeholder already sent),
  intermediate is logged-as-error and dropped (matching the other
  realtime services), and final is injected as user-side text via
  user_text_message with bracketed framing — the only mechanism
  Ultravox offers for adding non-tool input mid-conversation.

Hoists the registry-lookup helper to LLMService as
_function_is_async(name) so future services can use the same pattern
without re-implementing it.

Adds an async-tool example file for Ultravox modeled on the existing
ones for the other realtime services.
2026-05-08 16:20:40 -04:00
kompfner
d831930bd0 Merge pull request #4441 from pipecat-ai/pk/realtime-async-tool-support
fix: restore cancel_on_interruption=False support in AWS Nova Sonic and OpenAI Realtime
2026-05-08 15:53:20 -04:00
Paul Kompfner
2c65713c99 refactor: explicit kind=='final' check in async-tool routing (Grok)
Mirrors the same change applied to AWSNovaSonicLLMService and
OpenAIRealtimeLLMService in #4441: replaces the implicit "final happens
last" pattern in _process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block, plus a trailing defensive
`continue` so async-tool messages with an unrecognized kind don't fall
through to the regular tool-result handling block.
2026-05-08 15:45:05 -04:00
Paul Kompfner
b14a03d01f fix: extend cancel_on_interruption=False regression fix to remaining realtime services
Applies the same async-tool message routing introduced for AWSNovaSonicLLMService
and OpenAIRealtimeLLMService to additional realtime LLM services where the
flag's intent ("keep talking while the tool runs") is achievable:

- GrokRealtimeLLMService (xAI Realtime — also benefits the deprecated Grok
  alias since it re-exports the xAI module)
- AzureRealtimeLLMService picks up the fix transitively by inheriting from
  OpenAIRealtimeLLMService — no code change needed.

GrokRealtimeLLMService's _process_completed_function_calls now matches
the canonical pattern: skip LLMSpecificMessage, detect async-tool messages
via parse_message and route them — started skipped silently, intermediate
logged as an error and surfaced via push_error, final delivered through
the same channel as a synchronous result.

UltravoxRealtimeLLMService instead gets a one-time warning when async-tool
messages appear in the context. The Ultravox API freezes the conversation
during tool execution
(https://docs.ultravox.ai/tools/async-tools#custom-tool-timeouts), so the
flag's "keep talking while the tool runs" intent isn't achievable there —
applying the same code pattern would mislead users into expecting a UX
Ultravox can't deliver. Surfacing a clear warning is the right behavior
until Ultravox grows true async tool support.

Adds async-tool example files for Grok and Azure modeled on the existing
Nova Sonic / OpenAI Realtime ones (10s simulated network delay, weather
tool registered with cancel_on_interruption=False).

Two services remain excluded:

- GeminiLiveLLMService — the async-tool path needs deeper investigation.
- InworldRealtimeLLMService — appears to have a pre-existing problem
  with even simple synchronous tool calling on its Realtime API (the
  request reaches the server fine, but response generation fails with a
  generic server_error).
2026-05-08 15:43:53 -04:00
Paul Kompfner
ad0f0a1294 refactor: explicit kind=='final' check in async-tool routing
Replaces the implicit "final happens last" pattern in
_process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block in both AWSNovaSonicLLMService
and OpenAIRealtimeLLMService. Adds a trailing defensive `continue` so
async-tool messages with an unrecognized kind don't fall through to the
regular tool-result handling block — clearer at the call site, and safer
against future additions to AsyncToolMessageKind.
2026-05-08 15:43:37 -04:00
Paul Kompfner
72d0fb418a fix: restore cancel_on_interruption=False support in AWS Nova Sonic and OpenAI Realtime
Before the new async-tool mechanism landed, AWSNovaSonicLLMService and
OpenAIRealtimeLLMService honored cancel_on_interruption=False by simply
not cancelling in-flight function calls on interruption — the eventual
result then flowed through the same channel as any synchronous tool
result. The new mechanism (which appends started/intermediate/final
messages to the LLM context as the underlying task progresses) broke
that path: the realtime services didn't know how to interpret those
messages, and the eventual result was never delivered to the provider.

Restore the flag's behavior by teaching both services to detect
async-tool messages in the context and route them appropriately:

- started → skipped silently. The provider already issued the tool call
  and natively awaits a result; nothing to send for the started marker.
- final → delivered via the formal tool-result channel. Same path as a
  synchronous tool result, just delayed.

Streamed intermediate results (FunctionCallResultProperties(is_final=
False)) are not supported on these realtime services. An intermediate
result is logged as an error and surfaced via push_error, then dropped.
Use a non-realtime LLM service if a tool needs to stream intermediate
results. (Docstrings on register_function, register_direct_function, and
FunctionCallResultProperties.is_final updated to call this out.)

A new shared module pipecat.processors.aggregators.async_tool_messages
is the single source of truth for the on-the-wire payload shape: the
aggregator uses its build_*_message functions when injecting messages,
and the realtime services use parse_message when scanning the context.

Adds two example files exercising a network-delayed weather tool with
each service. The plain realtime-aws-nova-sonic.py example is also
reverted to a synchronous tool call now that the async variant lives in
its own file.

Similar fixes for other realtime services are forthcoming.
2026-05-08 09:33:06 -04:00
Aleix Conchillo Flaqué
94a94ee28c Merge pull request #4405 from pipecat-ai/aleix/user-turn-inference-event
Split user-turn-stop into inference-triggered and finalized events
2026-05-07 17:51:57 -07:00
Mark Backman
c46ede8335 Use Sphinx .. deprecated:: directive for deprecated aggregator params
Aligns deprecation docstrings on LLMUserAggregatorParams and
LLMAssistantAggregatorParams with CONTRIBUTING.md conventions:
present-tense parameter descriptions plus a `.. deprecated:: 1.2.0`
directive noting replacement and 2.0.0 removal. Also adds a runtime
DeprecationWarning for `user_turn_completion_config`, which previously
had no warning despite being deprecated.
2026-05-07 17:49:00 -07:00
Mark Backman
457a68ce64 Correct docstrings and comments regarding incomplete_long_timeout duration, 10 sec 2026-05-07 17:47:41 -07:00
Aleix Conchillo Flaqué
b78cecf7b2 Rename UserTurnCompletedFrame to UserTurnInferenceCompletedFrame
The old name overlapped semantically with `UserStoppedSpeakingFrame`:
both could be read as "the user's turn is done." They're at different
layers — `UserStoppedSpeakingFrame` is the acoustic stop signal,
while this frame is the post-judgment "inference about the turn is
now complete (turn is semantically final)" signal emitted by the LLM
mixin (on ✓), an end-of-turn classifier, or a custom producer.

The new name pairs naturally with the existing
`on_user_turn_inference_triggered` event vocabulary and removes the
ambiguity with `UserStoppedSpeakingFrame`.
2026-05-07 17:47:41 -07:00
Aleix Conchillo Flaqué
952dddca8b Replace llm_completion_user_turn_stop_strategies() with FilterIncompleteUserTurnStrategies
Wrap the detector chain with `deferred(...)` and append the LLM
completion gate via a `UserTurnStrategies` specialization rather than
a free-standing helper, mirroring the existing
`ExternalUserTurnStrategies` pattern. The class lives next to other
strategy containers in `pipecat.turns.user_turn_strategies`, so users
discover it where they're already configuring `user_turn_strategies`.

The deprecated `filter_incomplete_user_turns` flag now rewires
through `FilterIncompleteUserTurnStrategies` under the hood, keeping
the migration path identical to before. `deferred(...)` stays public
as the explicit escape hatch for non-default compositions.
2026-05-07 17:47:39 -07:00
Aleix Conchillo Flaqué
e3e90d38aa Preserve full user transcript across multiple inferences in one turn
When a stop-strategy chain splits inference-triggered from
finalization (e.g. `LLMTurnCompletionUserTurnStopStrategy` gating a
deferred detector), more than one inference can fire inside a single
user turn — each adds the new transcription segment to the context.
Previously each inference overwrote `_pending_user_turn_aggregation`,
so the eventual `on_user_turn_stopped` event surfaced only the
segment from the last inference, dropping anything the user said
before it.

Concatenate each segment into `_full_user_turn_aggregation` instead
of overwriting, and combine that running buffer with any post-final-
inference segment when emitting the public event.
2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
d1c8162b0c Route turn-completion markers through LLMMarkerFrame
Add an `LLMMarkerFrame(DataFrame)` for sideband LLM markers that need
to be persisted to context but should not flow through the standard
text path (TTS, transcript). The frame carries an
`append_to_context_immediately` flag so the assistant aggregator can
either commit the marker as a stand-alone message (○ / ◐) or merge it
with the upcoming aggregation as a prefix on the response (✓).

`UserTurnCompletionLLMServiceMixin` now emits `LLMMarkerFrame` instead
of pushing the marker as `LLMTextFrame(skip_tts=True)`, which fixes
the case where an incomplete-turn marker (○ / ◐) was aggregated by
the assistant aggregator but never committed to the context because
the assistant turn lifecycle didn't run to completion (no spoken
response, no `LLMFullResponseEndFrame`-driven `push_aggregation`).

The frame is intentionally generic so other components — STT services
with built-in turn signals, end-of-turn classifiers, custom
annotations — can use the same mechanism to inject sideband signals
into the assistant context.
2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
1fa0310ea8 Add changelog for #4405 2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
2281cd8359 Extract ExternalUserTurnCompletionStopStrategy as a reusable base
`LLMTurnCompletionUserTurnStopStrategy` previously bundled two
concerns: pushing `LLMUpdateSettingsFrame` on `StartFrame`, and
finalizing the turn on `UserTurnCompletedFrame`. The latter is
producer-agnostic — any component that emits `UserTurnCompletedFrame`
(STT with built-in turn detection, dedicated end-of-turn classifiers,
custom code) can drive finalization the same way.

Move the frame-handling half into a new
`ExternalUserTurnCompletionStopStrategy`. The LLM-specific subclass
now only adds the settings-frame push and inherits finalization. Mirrors
the existing `ExternalUserTurnStopStrategy` naming pattern.
2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
480eca42f5 Split user-turn-stop into inference-triggered and finalized events
Fixes a real bug: with `filter_incomplete_user_turns` enabled, the
smart-turn detector's tentative stop was firing `on_user_turn_stopped`
before the LLM had a chance to veto it. Observers, transcript
appenders and UI indicators received an early — and sometimes
duplicated — signal.

Decomposes the single stop concern into two events:
- `on_user_turn_inference_triggered` fires when a stop strategy has
  enough signal to start LLM inference. The aggregator pushes the
  context here, kicking off the LLM call.
- `on_user_turn_stopped` fires only when the user turn is semantically
  final. Built-in strategies fire both events at the same call site,
  preserving today's behavior for the common case.

Adds `LLMTurnCompletionUserTurnStopStrategy`, which gates
finalization on a `UserTurnCompletedFrame` (a fieldless system frame
emitted by any component judging turn completeness — currently the
`UserTurnCompletionLLMServiceMixin` on `✓`).

Adds `deferred(strategy)` / `DeferredUserTurnStopStrategy`, a thin
wrapper that forwards an inner strategy's events except
`on_user_turn_stopped`. Use this to install a stop strategy as an
inference trigger only, leaving finalization to a peer (e.g. the LLM
completion strategy).

Adds `llm_completion_user_turn_stop_strategies()` for the common
case:

    UserTurnStrategies(
        stop=llm_completion_user_turn_stop_strategies(),
    )

Deprecates `LLMUserAggregatorParams.filter_incomplete_user_turns`.
The aggregator emits a `DeprecationWarning`, wraps existing stop
strategies with `deferred(...)`, and appends
`LLMTurnCompletionUserTurnStopStrategy` automatically.
2026-05-07 17:46:09 -07:00
Mark Backman
1073510574 Merge pull request #4407 from pipecat-ai/mb/ui-agent-wire-format
feat(rtvi): add UI Agent Protocol as first-class RTVI message types
2026-05-07 20:03:41 -04:00
Mark Backman
47c05f3f30 Simplify changelog entry 2026-05-07 16:58:08 -07:00
Mark Backman
24904b89f5 Merge pull request #4443 from Anrahya/fix-gemini-tts-voice-names
fix: correct Gemini TTS voice names
2026-05-07 19:41:30 -04:00
orphis
c78977e4c7 chore: remove Gemini TTS voice name test 2026-05-08 05:03:15 +05:30
Mark Backman
f78b5f9240 Merge pull request #4446 from inworld-ai/ian/inworld-pcm
[inworld] default to using PCM encoding
2026-05-07 19:25:57 -04:00
Ian Lee
406f8b730b [inworld] default to using PCM encoding
* server returns audio bytes without headers
2026-05-07 16:05:34 -07:00
Mark Backman
7a2cec2e45 Merge pull request #4426 from marcelodiaz558/feature/elevenlabs_stt_keyterms
Add ElevenLabs STT keyterms support
2026-05-07 18:44:09 -04:00
Marcelo Díaz
edfcd6948b Add ElevenLabs STT keyterms support 2026-05-07 21:00:26 +00:00
kompfner
991ee9e0e6 Merge pull request #4404 from pipecat-ai/pk/mitigate-calls-to-missing-tools
Mitigate tool-call-related hallucination
2026-05-07 15:05:13 -04:00
Mark Backman
a696729343 Merge pull request #4439 from pipecat-ai/mb/fix-deprecation-video-out-bitrate 2026-05-07 14:42:26 -04:00
orphis
ba705e9501 chore: add changelog for Gemini TTS voice fix 2026-05-08 00:11:19 +05:30
orphis
98c370457b fix: correct Gemini TTS voice names 2026-05-08 00:09:56 +05:30
Filipi da Silva Fuchter
6189e920e1 Merge pull request #4433 from pipecat-ai/filipi/refactoring_elevenlabs
Refactoring ElevenLabs to send close_context as soon as the turn context is complete.
2026-05-07 13:10:36 -03:00
Filipi da Silva Fuchter
73625a273a Merge pull request #4440 from pipecat-ai/filipi/daily_send_message_issue
Fixing a race condition when cleaning up the daily transport.
2026-05-07 13:09:53 -03:00
filipi87
f91a55c97c Changelog entry for the fix. 2026-05-07 11:32:48 -03:00
filipi87
5f256e241c Fixing a race condition when cleaning up the daily transport. 2026-05-07 11:29:57 -03:00
Mark Backman
954f63dc7b Document deprecation docstring convention in CLAUDE.md.
Adds an explicit Code Style bullet for the `.. deprecated::` Sphinx
directive (forbidding inline `[DEPRECATED]` tags) and extends the
Docstring Example with a Pydantic params class showing the directive
inside a `Parameters:` block — the context CONTRIBUTING.md's existing
example didn't cover.
2026-05-07 10:03:43 -04:00
Mark Backman
6cc66a3df1 Update video_out_bitrate deprecation to use sphinx directive.
Replaces the inline `[DEPRECATED]` tag with a `.. deprecated:: 1.1.0`
directive per CONTRIBUTING.md docstring conventions, so the deprecation
shows up properly in the rendered docs.
2026-05-07 09:57:21 -04:00
filipi87
a445399337 Fixing a bug in the ElevenLabs TTS refactor where alignment state was reset too early mid-turn. 2026-05-07 10:10:54 -03:00
filipi87
5ed2057599 Merge branch 'main' into filipi/refactoring_elevenlabs 2026-05-07 09:32:53 -03:00
Filipi da Silva Fuchter
cacde00e26 Merge pull request #4435 from pipecat-ai/filipi/uninterruptible_frame
Refactoring TTSService to preserve uninterruptible frames.
2026-05-07 08:46:42 -03:00
Filipi da Silva Fuchter
b1b598f65e Merge pull request #4434 from pipecat-ai/filipi/fix_interruption_regression
Fix interruption blocked by slow non-uninterruptible frame in queue
2026-05-07 08:46:10 -03:00
filipi87
c48ee93892 Adding changelog entry for the fix. 2026-05-06 16:30:22 -03:00
filipi87
cf22dac171 Refactoring TTSService to preserve uninterruptible frames. 2026-05-06 16:26:45 -03:00
filipi87
36f6e22aee Adding changelog for the interruption fix. 2026-05-06 15:39:27 -03:00
filipi87
921a7a46cb Fix interruption blocked by slow non-uninterruptible frame in queue
When a non-uninterruptible frame was being processed slowly and an
uninterruptible frame was waiting in the queue, _start_interruption
skipped task cancellation. This caused interruptions to stall until
the slow frame finished, even though it had no reason to block them.

The fix: only skip cancellation when the *current* frame is
uninterruptible. Uninterruptible frames already in the queue are
preserved regardless, because __create_process_task calls
__reset_process_queue internally, which always retains them.

Fixes: https://github.com/pipecat-ai/pipecat/issues/4412
2026-05-06 15:35:43 -03:00
filipi87
fda18a9afa Adding changelog for the elevenlabs improvement. 2026-05-06 14:58:18 -03:00
filipi87
d146a7f8e0 Refactoring ElevenLabs to send close_context as soon as the turn context is complete. 2026-05-06 14:55:49 -03:00
Filipi da Silva Fuchter
90f0f7cd27 Merge pull request #4431 from pipecat-ai/filipi/tts_deadlock
Fixing TTSService deadlock.
2026-05-06 14:52:04 -03:00
Mark Backman
37376b3506 Merge pull request #4429 from pipecat-ai/mb/update-grok-default-llm-model
fix(xai): update default Grok model to grok-4.20-non-reasoning
2026-05-06 13:41:05 -04:00
Mark Backman
729418c2b7 Merge pull request #4428 from pipecat-ai/mb/deprecate-resampy
chore(audio): deprecate ResampyResampler
2026-05-06 13:40:51 -04:00
filipi87
4512038a17 Creating a changelog entry for the fix. 2026-05-06 13:36:20 -03:00
filipi87
a23baf9de6 Fixing TTSService deadlock. 2026-05-06 13:32:26 -03:00
Mark Backman
d18fe7c39c feat(rtvi): type UI accessibility snapshots 2026-05-06 11:29:19 -04:00
Mark Backman
41124dc494 refactor(rtvi): clarify UI message names 2026-05-06 11:08:25 -04:00
Filipi da Silva Fuchter
95db08646c Merge pull request #4430 from pipecat-ai/filipi/flux_audio
Implementing dynamic watchdog timeout for Deepgram Flux STT
2026-05-06 11:40:06 -03:00
filipi87
03e5ebb266 Improving watchdog_min_timeout description. 2026-05-06 11:37:18 -03:00
filipi87
5daf267c11 Adding changelogs. 2026-05-06 11:26:14 -03:00
filipi87
1cb77b422a Created a watchdog_min_timeout to allow to change the default value. 2026-05-06 11:22:37 -03:00
filipi87
0c779b4c3d Implementing dynamic watchdog timeout for Deepgram Flux STT 2026-05-06 11:01:58 -03:00
Mark Backman
138991418a docs(changelog): add 4429 entry for Grok default model update 2026-05-06 09:51:01 -04:00
Mark Backman
94e136a6b7 fix(xai): update default Grok model to grok-4.20-non-reasoning
grok-3 is being retired from the xAI API on May 15, 2026. Switch the
default to grok-4.20-non-reasoning, which xAI recommends for non-reasoning
workloads and is appropriate for real-time voice AI.
2026-05-06 09:48:39 -04:00
Mark Backman
9598e262b5 docs(changelog): add 4428 deprecation entry for ResampyResampler 2026-05-06 09:41:14 -04:00
Mark Backman
8c3521f2e4 chore(audio): deprecate ResampyResampler in favor of SOXR resamplers
Emits a DeprecationWarning on instantiation. ResampyResampler will be
removed in Pipecat 2.0 along with the default resampy and numba
dependencies.
2026-05-06 09:40:13 -04:00
Mark Backman
eda98fb13f Merge pull request #4424 from pipecat-ai/mb/revert-elevenlabs-tts-alignment
fix(elevenlabs): only use normalizedAlignment when pronunciation dict is set
2026-05-06 08:27:25 -04:00
Mark Backman
3722ee223c Merge pull request #4419 from pipecat-ai/mb/fix-changelog-entry-4416
Fix changelog filename for 4416
2026-05-05 14:50:24 -04:00
Mark Backman
2620e76dab docs(elevenlabs): clarify alignment leading-space handling 2026-05-05 14:49:41 -04:00
Mark Backman
2447db766e docs(changelog): add 4424 entry for elevenlabs alignment selection fix 2026-05-05 14:49:41 -04:00
Mark Backman
61a81ed87b fix(elevenlabs): use alignment by default, normalizedAlignment only with pronunciation dicts
PR #4344 unconditionally switched to normalizedAlignment to fix garbled
words with pronunciation dictionaries (#4316). But normalizedAlignment
returns the post-normalized form of what was spoken - including
romanization of non-Latin scripts (Chinese rendered as pinyin), which
ends up in the LLM context and degrades subsequent turns.

Gate the switch on pronunciation_dictionary_locators being configured.
Adds a _select_alignment helper with preferred-with-fallback (both
fields are nullable per the API schema), used by both the WebSocket
and HTTP services. Tests cover dictionary mode, default mode, fallback
when preferred is missing or null, and HTTP field-name variants.
2026-05-05 14:49:41 -04:00
Mark Backman
735cd09c7e Merge pull request #4422 from cshape/tts-2
feat(inworld): default to inworld-tts-2
2026-05-05 14:00:04 -04:00
Paul Kompfner
2616076bec Add deterministic dev-error demo example
``examples/function-calling/function-calling-missing-handler.py``
demonstrates the missing-handler path by deliberately advertising a
tool to the LLM without registering its handler — what happens when a
developer forgets to call ``register_function``. Exercises the new
``logger.error`` severity end-to-end without needing to coax the LLM
into hallucinating.
2026-05-05 13:08:00 -04:00
Paul Kompfner
40667e50fc Add changelog for #4404 2026-05-05 13:03:49 -04:00
Paul Kompfner
e06e0c0282 Mitigate tool-call-related hallucination
When tools change mid-conversation, LLMs can produce a few different
flavors of tool-call-related hallucination: calling tools that have
been removed, avoiding tools that have been re-added, or hallucinating
output (made-up answers or tool-call-shaped non-tool-calls) when tools
are unavailable.

This change introduces an opt-in ``add_tool_change_messages`` flag on
the LLM aggregators (preferred entry point: ``LLMContextAggregatorPair(
..., add_tool_change_messages=True)``) that appends a developer-role
message to the context whenever ``LLMSetToolsFrame`` changes the set
of advertised standard tools. Helps the LLM stay coherent across tool
changes by spelling out exactly what just became available or
unavailable. Both aggregators participate; whichever handles the
frame first wins, and the other (if any) sees an empty diff against
the shared context and stays silent — order-independent regardless of
whether the frame flows downstream or upstream.

Also tightens the existing missing-handler path (introduced in #4301):

- Reworded the terminal tool result to a neutral "The function
  ``X`` is not currently available." (overridable via
  ``LLMService.MISSING_FUNCTION_CALL_MESSAGE_TEMPLATE``). Previously
  read "Error: function 'X' is not registered."
- Logs at the call site now distinguish developer error (tool
  advertised but no handler registered → ``logger.error``) from
  hallucination (tool not advertised → ``logger.warning``).

Includes a manual validation harness
(``examples/features/features-add-tool-change-messages.py``) that
exercises the new ``add_tool_change_messages`` mitigation by flipping
tool availability on a turn counter so its effect can be observed
end-to-end with the flag on vs. off.
2026-05-05 13:02:43 -04:00
Cale Shapera
84eefba4df docs: add changelog fragment for tts-2 default flip 2026-05-05 09:20:16 -07:00
Cale Shapera
fe3af5d9f7 feat(inworld): default to inworld-tts-2
Flip the default Inworld TTS model from inworld-tts-1.5-max to
inworld-tts-2 across:
- InworldHttpTTSService (HTTP)
- InworldTTSService (WebSocket)
- InworldRealtimeLLMService (cascade Realtime)

inworld-tts-1.5-max and inworld-tts-1.5-mini remain valid options;
existing users can pin the prior model explicitly via the model
setting. Docstring examples updated to reference the new default.
2026-05-05 09:20:16 -07:00
Mark Backman
7729eecfe4 Fix changelog filename for 4416 2026-05-04 21:54:58 -04:00
Mark Backman
fa31a2fd63 Merge pull request #4416 from pipecat-ai/mb/pr-4333-aws-credentials-review
feat(aws): add shared credential resolver with boto3 chain fallback
2026-05-04 21:48:33 -04:00
Mark Backman
678d40e102 docs(changelog): add 4333 entries for AWS credential resolver expansion 2026-05-04 19:30:37 -04:00
Mark Backman
8becafee38 fix(aws): use shared credential resolver in Polly, Bedrock, AgentCore
Polly TTS, Bedrock LLM, and AgentCore previously did
`arg or os.getenv("AWS_...")` and handed the result straight to
aioboto3.  When only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
was set, aioboto3 received a half-populated kwarg and errored instead of
falling through to the boto3 credential provider chain (instance
profiles, IRSA, ECS task roles, SSO, etc.).

Route credential resolution through the shared `resolve_credentials()`
helper introduced for AWS Transcribe so all four services follow the
same `explicit → env → boto3 chain` fallback.  Add an
`AWSCredentials.to_boto_kwargs()` method to bridge the dataclass field
names (`access_key`, `secret_key`) to the aioboto3 kwargs
(`aws_access_key_id`, `aws_secret_access_key`).

No public API changes.  Behaviour is identical for fully-explicit and
fully-env-var configurations; partial env vars now correctly trigger
the chain instead of erroring.
2026-05-04 19:23:53 -04:00
Mark Backman
83190d38e9 Merge pull request #4414 from pipecat-ai/mb/fix-ttsspeakframe-assistant-turn-stopped 2026-05-04 18:12:33 -04:00
Mark Backman
7519c26ac5 Merge pull request #4417 from pipecat-ai/mb/resolve-runner-filepath 2026-05-04 18:09:34 -04:00
Mark Backman
b2b7e9ee6f Merge pull request #4415 from pipecat-ai/mb/fix-elevenlabs-leading-spaces-flash 2026-05-04 18:08:31 -04:00
Mark Backman
e864d5778a ci: install runner extra for the coverage job 2026-05-04 16:44:47 -04:00
Mark Backman
89f10dd9a1 test: drop webrtc-dependent test, remove webrtc extra from CI 2026-05-04 16:42:05 -04:00
Mark Backman
f67e3ef0b2 ci: install runner and webrtc extras for the test job 2026-05-04 16:29:58 -04:00
Mark Backman
5b087d6aeb docs: add changelog for #4417 2026-05-04 16:22:26 -04:00
Mark Backman
e780f759d0 fix: validate download path containment in runner
Resolve and contain the user-supplied filename before serving it from
the runner's /files endpoint. Also raise a 404 (instead of returning
None) when the downloads folder is unset, and use the resolved
basename for Content-Disposition.
2026-05-04 16:20:27 -04:00
Daniel Wirjo
35153de28e feat(aws): add shared credential resolver with boto3 chain fallback
AWS Transcribe STT previously only supported credentials via explicit
parameters or environment variables. Services running with IAM roles
(EKS pod roles, IRSA, ECS task roles, EC2 instance profiles) or SSO
couldn't use Transcribe without exporting static credentials.

Changes:
- Add resolve_credentials() to utils.py providing a standard fallback
  chain: explicit params → environment variables → boto3 credential
  provider chain (instance profiles, IRSA, pod roles, SSO, etc.)
- Add AWSCredentials dataclass for type-safe credential passing
- Update AWSTranscribeSTTService to use resolve_credentials() instead
  of manual os.getenv() calls
- The boto3 fallback is only attempted when both access key and secret
  key are unresolved, avoiding replacement of explicitly provided creds
- boto3 is imported lazily inside the function to avoid hard dependency
  for services that don't need the fallback chain
- Add 7 unit tests covering the credential resolution chain

The Bedrock LLM and Polly TTS services already support the full
credential chain via aioboto3.Session() and are not modified.

Related to #4197
2026-05-04 15:40:06 -04:00
Mark Backman
9886d72f5e Add changelog for PR #4415 2026-05-04 15:18:15 -04:00
Mark Backman
90e6b51acd Fix ElevenLabs alignment chunk spacing 2026-05-04 15:15:37 -04:00
Mark Backman
61acdba3ae docs: add changelog entry for #4414 2026-05-04 10:43:52 -04:00
Mark Backman
f1a3ee97de fix: surface TTSSpeakFrame greetings in on_assistant_turn_stopped
Two issues were causing TTSSpeakFrame(append_to_context=True) greetings to
silently lose their trailing words and never fire on_assistant_turn_stopped:

- LLMAssistantPushAggregationFrame was emitted without a PTS, so the
  transport routed it through the audio (sync) queue while word-level
  TTSTextFrames travel through the clock queue. The aggregation could reach
  the assistant aggregator before the final words, leaving them orphaned
  in the buffer. Stamp the frame with `_word_last_pts + 1` when there are
  word timestamps so it can't overtake them.

- The aggregator's LLMAssistantPushAggregationFrame handler called
  push_aggregation() directly, bypassing _trigger_assistant_turn_stopped.
  For TTS-only flows there is no LLMFullResponseStartFrame, so the turn
  start timestamp was never set and on_assistant_turn_stopped never fired.
  Open a turn (if needed) and trigger stopped from the handler.

Fixes #4264.
2026-05-04 10:41:22 -04:00
Mark Backman
b363b91d12 Merge pull request #4401 from pipecat-ai/mb/grok-realtime-model
fix(xai/realtime): pass model as query param on connect
2026-05-04 09:44:33 -04:00
Mark Backman
43abca0b06 feat(rtvi): add UI Agent Protocol as first-class RTVI message types
The UI Agent Protocol lets server-side AI agents observe and drive
a GUI app on the client side through structured RTVI messages.
Five new top-level RTVI types in kebab-case, in line with the rest
of the protocol:

  ui-event         client → server  (named event with payload)
  ui-command       server → client  (named command with payload)
  ui-snapshot      client → server  (accessibility tree of the page)
  ui-cancel-task   client → server  (cancel an in-flight task group)
  ui-task          server → client  (task lifecycle envelope)

Each ships paired ``*Data`` / ``*Message`` pydantic models in
``rtvi.models``, following the existing RTVI envelope convention
(``BotReady`` / ``BotReadyData``, ``Error`` / ``ErrorData``, etc.).
Built-in command payload models (``Toast``, ``Navigate``,
``ScrollTo``, ``Highlight``, ``Focus``, ``Click``, ``SetInputValue``,
``SelectText``) ship alongside; matching default React handlers
live in ``@pipecat-ai/client-react``.

Bumps the RTVI ``PROTOCOL_VERSION`` from ``1.2.0`` to ``1.3.0``.
Purely additive: only new top-level message types are introduced;
no existing wire shapes are changed. The major-version
compatibility check on ``client-ready`` still passes for older
1.x clients, so old clients continue to connect without warning;
they simply will not exercise the new types.

The ``RTVIProcessor`` registers a new ``on_ui_message`` event
handler that fires for inbound ``ui-event`` / ``ui-snapshot`` /
``ui-cancel-task`` with the parsed Message envelope, mirroring how
``on_client_message`` works for ``client-message``.

Five new pipeline frames let pipeline observers and processors see
UI traffic the same way they see other RTVI messages, mirroring
the frame-and-event pattern used by ``client-message``:

  RTVIUICommandFrame(command_name, payload)
    Pushed by downstream code (e.g. ``pipecat-ai-subagents``'s
    bridge) to send a UI command to the client. Wrapped by the
    observer into a ``UICommandMessage`` envelope.

  RTVIUITaskFrame(data: UITaskData)
    Same shape but for ``ui-task``; wrapped into ``UITaskMessage``.
    ``UITaskData`` is a discriminated union of the four lifecycle
    kinds (group_started / task_update / task_completed /
    group_completed).

  RTVIUIEventFrame(msg_id, event_name, payload)
  RTVIUISnapshotFrame(msg_id, tree)
  RTVIUICancelTaskFrame(msg_id, task_id, reason)
    Pushed by ``RTVIProcessor._handle_message`` whenever the
    matching inbound message arrives, alongside firing
    ``on_ui_message``. Pipeline observers and processors can match
    on the frame; subscribers like the subagents bridge keep using
    the event handler.

The data layer is the canonical authority for the wire format:
higher-level frameworks like ``pipecat-ai-subagents`` build the
agent abstractions on top, and single-LLM Pipecat apps can target
the same wire format directly via custom tools that emit these
typed messages.
2026-05-02 12:09:01 -04:00
Mark Backman
30efd11e15 Merge pull request #4397 from pipecat-ai/mb/smallwebrtc-trace-app-message 2026-05-01 20:47:04 -04:00
kompfner
a745e8d318 Merge pull request #4378 from pipecat-ai/pk/more-pyright-fixes
More pyright fixes
2026-05-01 14:09:27 -04:00
Paul Kompfner
2730e47e61 ci: install all extras for the pyright type-check job
The pyright job in `format.yaml` previously installed only `--extra
daily --extra tracing`. That was sufficient when most optional-dep-
using files were in the pyright ignore list, but as this PR has
cleared dozens of files, those files now reference symbols from
optional-dep modules (`aiortc.RTCIceServer` via `IceServer`,
`google.genai.types.HttpOptions`, etc.). `reportMissingImports: false`
tolerates the failed imports themselves, but the imported names
become `Unknown` and using them as type expressions trips
`reportInvalidTypeForm` / `reportAttributeAccessIssue` — errors
that aren't gated by that flag.

Switch to `--all-extras --no-extra gstreamer --no-extra local`
(matching the dev setup in README.md), so pyright sees the same
dependency set the code is intended to be type-checked against and
the install-set scales naturally as more files leave the ignore list.

Also reconcile CLAUDE.md's setup command, which only excluded
`gstreamer`. README.md is canonical and additionally excludes
`local` (pyaudio requires `portaudio` native libs that aren't
installed by default on a clean Ubuntu CI runner).
2026-05-01 09:36:14 -04:00
Paul Kompfner
4703df8686 fix: clear 8 more services from pyright ignore list
A fourth pass over low-error-count files. Drops 8 files (57 → 49) and
full-pyright errors from 525 → 496. Default pyright stays clean.

Optional access on transport/client receivers (4 files). Same fix
shape as #4359 — a receiver typed `X | None` accessed without a
guard. For "should never happen" cases (caller's lifecycle ensures
the field is non-None when the method runs), used `assert` rather
than silent early-return so an invariant violation surfaces loudly:

- `transports/whatsapp/client.py` (5 errors): `_validate_whatsapp_webhook_request`
  was typed `bytes` / `str` but called with `bytes | None` / `str | None`.
  Widened the helper signature and pushed the explicit None-check
  inside (matching its existing empty-string check). Also handled
  `pipecat_connection.get_answer()` returning `None` — would have
  crashed at `.get("sdp")` before.
- `transports/websocket/client.py` (5 errors): four are the deprecated
  `websockets.WebSocketClientProtocol` alias (same `# pyright: ignore[reportAttributeAccessIssue]`
  as the `services/websocket_service.py` fix from earlier in this PR).
  The fifth was `async for message in self._websocket` — traced the
  call chain and confirmed `_client_task` is created only after
  `self._websocket` is assigned and cancelled before it's cleared, so
  the field is never None when `_client_task_handler` runs. Used `assert`.
- `services/openai/stt.py` (4 errors): same pattern. `_receive_messages`
  is started by `_connect()` only when `self._websocket` is set, and
  the reconnect loop in `WebsocketService._receive_task_handler`
  re-establishes it before each retry. `assert` at entry. Plus L478/L483:
  the `try`/`except ModuleNotFoundError` import-guard makes
  `websocket_connect` and `State` `<type> | None`; `__init__` already
  raises `ImportError` if either is None, so an `assert` at the
  `_connect_websocket` use site is honest. Plus an L538 `Language | str`
  cast (same shape as last batch).
- `services/deepgram/flux/base.py` (2 errors): `event = data.get("event")`
  flowed into `_handle_turn_resumed(event: str)` as `Any | None`.
  Tightened with an `isinstance(event, str)` guard before the
  `FluxEventType(event)` lookup. The other error (`average_confidence > min_confidence`
  where `min_confidence: float | None`) was a latent crash on missing
  confidence data — restored the original `not min_confidence` (which
  treats both `None` and `0.0` as "no filter") and added an explicit
  drop-on-missing-confidence-data branch.

`gemini_live` Settings/InputParams (vertex). The deprecated `InputParams`
declares `modalities: GeminiModalities | None` and `media_resolution: GeminiMediaResolution | None`,
but their downstream usage at `services/google/gemini_live/llm.py:952,959`
calls `.value` on each — `None` would crash. Rather than touching the
deprecated input model, translate `None` to the canonical defaults
(`GeminiModalities.AUDIO`, `GeminiMediaResolution.UNSPECIFIED`) at the
assignment site in `vertex/llm.py`. Also fixed an unrelated annotation
bug: `_get_credentials` was annotated `-> str` but actually returns
`service_account.Credentials` (used correctly by the caller — only
the annotation was wrong).

`moondream/vision.py` (3 errors). `frame.format` is `str | None` but
`Image.frombytes(mode, ...)` requires `str`; raise instead of crashing
on missing format. The other two errors are pyright thinking the
moondream2-custom `encode_image` and `query` methods are `Tensor`
(rather than callables) — those are provided by the model code via
`trust_remote_code=True` and aren't visible to pyright on the base
`AutoModelForCausalLM` type. Scoped `# pyright: ignore[reportCallIssue]`
on the two call sites.

`transports/base_output.py` (3 errors). Two are `self._mixer.mix(...)`
calls in `with_mixer`, a closure invoked only when `self._mixer` is
truthy at the call site — captured the mixer to a local variable
inside the closure with an `assert`, then used that. Third is the
PIL `frombytes(mode, ...)` shape — `frame.format is None` early-
return guard at the top of `resize_frame` so the main resize logic
reads cleanly.

`elevenlabs/tts.py` (4 errors). The payload-building dict at L1271
was typed `dict[str, str | dict[str, float | bool]]` — an aspirational
shape that matched only the first two assignments. Subsequent code
assigned `list[dict[...]]` (pronunciation locators) and bools, all
violating the annotation. Same pattern at L926 (the WebSocket-init
`msg`). Both widened to `dict[str, Any]`, which is the honest shape
for a JSON request payload and what similar code uses elsewhere.

Files dropped from the ignore list (57 → 49):
services/deepgram/flux/base.py, services/elevenlabs/tts.py,
services/google/gemini_live/vertex/llm.py,
services/moondream/vision.py, services/openai/stt.py,
transports/base_output.py, transports/websocket/client.py,
transports/whatsapp/client.py.
2026-05-01 09:36:14 -04:00
Paul Kompfner
26a40e2e62 fix: clear 10 more services from pyright ignore list
A third pass over low-error-count files in the ignore list. Drops 10
files (67 → 57) and full-pyright errors from 555 → 525. Default
pyright stays clean.

Optional access guards (4 files). The same fix shape as 9e9b1f39e:
a receiver typed `X | None` accessed without a guard, fixed with a
local-var capture or an early return.

- `mistral/stt.py`: `_connection.send_audio` could crash if
  `_connect()` swallowed an exception and left `_connection` unset;
  drop the audio chunk with a warning instead. `_receive_events`
  iterating `_connection.events()` got the same defensive narrowing.
- `deepgram/flux/stt.py`: `_websocket_url` is set in `_connect`
  before `_connect_websocket` is called, but pyright doesn't track
  that across methods — assert at the use site. `websocket.response`
  is `Response | None` in the websockets stubs even though it's
  always populated post-handshake; guarded with a fallback.
- `audio/filters/rnnoise_filter.py`: the module-level import sets
  `RNNoise` to `None` if `pyrnnoise` isn't installed; raise
  `ImportError` explicitly instead of relying on the existing try-
  block to catch the `None(...)` call. Also gated `filter()` with
  `or self._rnnoise is None` so pyright sees the narrowing.
- `transports/smallwebrtc/request_handler.py`: `get_answer()`
  legitimately returns `None`; raise instead of crashing on three
  subscript accesses.

`TTSService` `audio-context` API tightening. Mirroring the
`append_to_audio_context` fix from the previous batch:
`remove_audio_context` was typed `str` but is called with `str | None`
from `get_active_audio_context_id()` results. Widened to `str | None`
and the `None` handling lives in the function body (early debug log
+ return) — matching `append_to_audio_context`'s shape.
`audio_context_available` keeps its narrow `str` signature; asking
"is `None` available?" isn't a meaningful question (`_audio_contexts`
is `dict[str, asyncio.Queue]`). The internal call site in
`on_turn_context_completed` narrows `_turn_context_id` explicitly
before passing it. Side effect: deepgram/tts.py's L307 error clears
without local changes.

`deepgram/tts.py` (4 errors → 0): the same `push_error(ErrorFrame(...))`
latent bug we fixed in resembleai earlier in this PR — `push_error`
takes a string; there's a separate `push_error_frame` for frames.
Two sites switched. The Optional `_websocket.response` access is
guarded the same way as deepgram/flux/stt.py. The `remove_audio_context`
error was cleared by the tightening above.

`aws/utils.py` (3 errors → 0): `AWSTranscribePresignedURL` declared
`session_token: str` but the dict source is `str | None` (AWS
supports long-term IAM creds without a session token). Same for
`vocabulary_name`/`vocabulary_filter_name` on `get_request_url`,
which were typed `str = ""` even though the body uses truthy checks
to skip them. Widened to `str | None = None` — matches actual
runtime semantics.

`audio/dtmf/utils.py` (2 errors → 0): `files("...").joinpath(...)`
returns a `Traversable`, but `aiofiles.open` wants a real path. For
regular pip installs this worked in practice (Traversable was a
`Path`), but it would fail for zipped distributions (zipapp,
zipimport) where the resource isn't on disk. Wrapped in
`importlib.resources.as_file(...)` — the canonical bridge that
extracts to a temp file when the resource isn't already on the
filesystem. Validated end-to-end: regular install still reads bytes;
ad-hoc zipapp test confirmed `as_file` extracts the resource and
returns a real Path.

`openai/image.py` (2 errors → 0): the `size` arg to
`images.generate` is `Literal[...] | None` in the SDK but our
settings field is `str | None`. Mirrored the `groq/tts.py`
hint-not-constraint pattern from the previous batch: defined a
module-level `OpenAIImageSize = Literal[...]` alias with a comment
attributing the upstream symbol and documenting the cast contract
(callers can pass any string; invalid values surface as an OpenAI
API error). Also guarded `image.data[0]` (response.data is
`list[Image] | None`).

`processors/frameworks/{langchain,strands_agents}.py` (4 + 4 → 0):
both processors do `messages[-1]["content"]` on a value typed
`LLMStandardMessage | LLMSpecificMessage` (the latter is a dataclass,
not a dict, so `__getitem__` errors). Historically these only
handled plain-text user messages, so the fix is two explicit guards
(skip if the last message isn't a dict; skip if `content` isn't a
string) plus a TODO noting that other shapes (multi-modal content,
provider-specific messages) aren't supported yet. langchain's
`__get_token_value` also got a small fix where `AIMessageChunk.content`
is `str | list[parts]` but the function declares `-> str`; stringify
the list case. strands_agents' surfaced two unrelated narrows: a
`graph_exit_node: str | None` arg gated by an `__init__`-time assert,
and `agent.stream_async` reached only when we're not in graph mode.

Files dropped from the ignore list (67 → 57):
audio/dtmf/utils.py, audio/filters/rnnoise_filter.py,
processors/frameworks/langchain.py,
processors/frameworks/strands_agents.py, services/aws/utils.py,
services/deepgram/flux/stt.py, services/deepgram/tts.py,
services/mistral/stt.py, services/openai/image.py,
transports/smallwebrtc/request_handler.py.
2026-05-01 09:36:14 -04:00
Paul Kompfner
31ff07916f fix: clear 10 more services from pyright ignore list
A second pass over the low-error-count files in the ignore list. Drops
10 files (77 → 67) and full-pyright errors from 580 → 555. Default
pyright stays clean.

Three coherent shapes plus a handful of one-offs:

`Language | str | None` → `Language | None` at STT frame boundaries.
`assert_given(self._settings.language)` returns `Language | str | None`
(strips `_NotGiven`, keeps the rest), but `TranscriptionFrame.language`
expects `Language | None`. In practice both `_settings.language` and
SDK-supplied codes resolve to a `Language` enum value, but technically
they could be raw strings — and `Language` is a StrEnum, so downstream
consumers (which mostly compare/serialize as strings) handle either.
Used `cast("Language | None", ...)` at each call site rather than a
runtime-validating helper, so an unrecognised code (e.g. one we
haven't added to the enum yet) still flows through unchanged. Cleared
azure/stt.py, aws/stt.py, gradium/stt.py; mistral/stt.py keeps the
cast at the SDK boundary (storing under `_detected_language: Language
| None`) but stays in the ignore list because of two unrelated
Optional-access errors.

aiobotocore `async with` stub gap. `aioboto3.Session().client(...)`
is an async context manager at runtime but its stubs don't advertise
`__aenter__`/`__aexit__` to pyright. Scoped
`# pyright: ignore[reportGeneralTypeIssues]` on the two affected
sites: aws/agent_core.py and aws/tts.py. aws/tts.py also had a latent
bug on the no-`AudioStream` path: the original code set
`audio_data = None` and then crashed in `resample(...)` and
`len(audio_data)` below; replaced with an early `return` after
logging — matches the convention elsewhere (OpenAI TTS, etc.) of not
recording usage metrics on the error path.

heygen `event_id: str | None` → `str` at transport→client boundary.
Three call sites in transports/heygen/transport.py passed `self._event_id`
(`str | None`) into client methods that take `str`. Added a guard at
each: `agent_speak_end` and `interrupt` only fire when `_event_id` is
set; `write_audio_frame` warn-and-drops when there's no active bot
event rather than sending a malformed message.

`OpenAIResponsesLLMInvocationParams` TypedDict.
`get_llm_invocation_params` always sets both `input` and `tools` in
the same dict literal, but the TypedDict was `total=False` so direct
subscript access (`invocation_params["input"]`) tripped
`reportTypedDictNotRequiredAccess` in services/openai/responses/llm.py.
Marked both keys `Required[...]`; `instructions` stays non-required
since it's only added when a system instruction is present.

Latent bug in heygen/api_interactive_avatar.py: the code accessed
`request_data.voice.voiceId` and `request_data.voice.elevenlabsSettings`,
but those names are Pydantic *aliases*; the actual attribute names
(used for attribute access) are `voice_id` and `elevenlabs_settings`.
Switched to the field names — those camelCase accesses would have
raised AttributeError at runtime if `voice` was set.

Other small fixes:

- assemblyai/stt.py: the deprecated `connection_params=` init path
  was reading `formatted_finals` and `word_finalization_max_wait_time`
  off `AssemblyAIConnectionParams`, but those fields were never on
  the deprecated input model — they were added to Settings later.
  Removed the reads (with a comment noting they're only available
  via the canonical `settings=...` API); the deprecated input model
  is unchanged.
- rtvi/processor.py: two `about: Mapping[str, Any] = None` parameter
  signatures — declared `Mapping`, defaulted to `None`, and both
  function bodies already handled the None case. Widened to
  `Mapping[str, Any] | None = None`.
- aws/stt.py: `subprotocols=["mqtt"]` failed against websockets'
  `Sequence[Subprotocol] | None` (Subprotocol is a NewType wrapper).
  Wrapped: `subprotocols=[Subprotocol("mqtt")]`.

Files dropped from the ignore list (77 → 67):
processors/frameworks/rtvi/processor.py, services/assemblyai/stt.py,
services/aws/agent_core.py, services/aws/stt.py, services/aws/tts.py,
services/azure/stt.py, services/gradium/stt.py,
services/heygen/api_interactive_avatar.py,
services/openai/responses/llm.py, transports/heygen/transport.py.
2026-05-01 09:36:14 -04:00
Paul Kompfner
814f00ce41 fix: clear 19 TTS/STT/etc. services from pyright ignore list
Several adjacent fix shapes that together drop 19 files from the
pyrightconfig.json ignore list (96 → 77) and full-pyright errors from
605 → 580. Default pyright stays clean.

TTS voice/context_id None handling — most files in this batch had a
single error of the shape "value typed `T | None` passed where `T` is
required" coming out of `assert_given(self._settings.voice)` (which
strips `_NotGiven` but not `None`) or `get_active_audio_context_id()`.
Two patterns:

- For services where a missing voice means the request can't proceed
  (hume, openai, xtts, groq, kokoro, piper), added an explicit None
  check. Inside `run_tts` we yield an `ErrorFrame` and return — matching
  each service's existing error-emission style (a few wrap `Exception`
  broadly and were fine; openai/hume/xtts had narrower or no try blocks
  so a bare `raise ValueError` would have escaped uncaught). Piper
  validates in `__init__`, where failing fast at construction is the
  right shape. OpenAI also gained a `voice not in VALID_VOICES` guard
  with a clear message listing supported voices.

- For services where a missing audio context just means "skip this
  message" (fish, lmnt, smallest, sarvam, neuphonic), widened
  `TTSService.append_to_audio_context`'s `context_id` signature to
  `str | None`. The function body already explicitly handled the None
  case with a debug log + early return, so the prior `str` annotation
  was a lie; making it honest cleared call sites without local guards.
  inworld's `_close_context` got the same treatment.

google.genai imports — switched `from google import genai` to
`import google.genai as genai` in google/image.py and google/llm.py.
The dotted form sidesteps a PEP 420 namespace-package stub gap (the
`google` namespace stubs come from a different distribution and don't
declare `genai`), which means pyright now resolves `genai` to the
real module rather than `Unknown`. IDE autocomplete on `genai.<x>`
works for the first time. In image.py this surfaced three latent
bugs that the `Unknown` resolution had been hiding (model was
`str | _NotGiven | None` not narrowed before passing to the SDK; two
spots accessed `.image_bytes` on an `Image | None` without a guard) —
all fixed. llm.py's dotted import surfaced 8 errors (Content-list
typing nuances, internal `_api_client` access, a few small Optionals);
deferred to a future pass since they're outside this commit's scope,
so the file stays in the ignore list with the dotted import.

Latent bug fixes spotted along the way:

- resembleai/tts.py was calling `push_error(ErrorFrame(...))`, but
  `push_error` takes a string — there's a separate `push_error_frame`
  for the frame case. Switched to the right method.
- openai/base_llm.py: `max_completion_tokens` was the only sibling
  field on `OpenAILLMSettings` missing `| None` in its type, which
  caused the assignment in openai/llm.py from `params.max_completion_tokens`
  (`int | None`) to fail. Added `| None` for consistency with
  `max_tokens` etc.
- heygen/base_api.py: `livekit_url: str = None` and `ws_url: str = None`
  declared `str` while defaulting to `None`. Removed the bogus
  defaults — both fields are required at construction in every
  in-tree call site, and the previous `str = None` was a Pydantic
  footgun.

Other small ones: gladia/stt.py needed a None guard on `_session_url`
before `websocket_connect`; openrouter/llm.py's
`build_chat_completion_params` override widened to `dict[str, Any]`
diverging from the parent's `OpenAILLMInvocationParams` — restored
the parent's type; neuphonic/tts.py guarded the receive loop's
`async for message in self._websocket` with a local-variable narrowing
matching the pattern from 9e9b1f39e.

groq/tts.py: tightened `output_format`'s typing to
`Literal["flac","mp3","mulaw","ogg","wav"] | str = "wav"`. The literal
side gives IDE autocomplete hints for the currently-supported set;
the `| str` side keeps callers unblocked if groq adds a new format
before this list is updated. A `cast` at the API boundary satisfies
groq's stricter `Literal` parameter type. The literal alias mirrors
the inlined Literal on `groq.resources.audio.speech.AsyncSpeech.create`'s
`response_format` (the SDK doesn't export it as a named symbol).

websocket_service.py: scoped `# pyright: ignore[reportAttributeAccessIssue]`
on `websockets.WebSocketClientProtocol`. That alias is now a deprecated
re-export from the legacy submodule and pyright doesn't surface it
on the top-level `websockets` namespace; runtime is fine. Migrating
to `websockets.ClientConnection` is a separate piece of work
(transports/websocket/client.py uses the same alias four times) and
left for a future commit.

Files dropped from the ignore list: fish/tts.py, gladia/stt.py,
google/image.py, groq/tts.py, heygen/base_api.py, hume/tts.py,
inworld/tts.py, kokoro/tts.py, lmnt/tts.py, neuphonic/tts.py,
openai/llm.py, openai/tts.py, openrouter/llm.py, piper/tts.py,
resembleai/tts.py, sarvam/tts.py, smallest/tts.py,
websocket_service.py, xtts/tts.py.
2026-05-01 09:36:14 -04:00
Paul Kompfner
96756bc1f6 fix: clean up TypedDict / Optional patterns in 6 more LLM adapters
Same approach as the previous round — apply boundary casts where the
code does dict-style mutation on TypedDict-typed values, narrow at
return sites, and document the LLMSpecificMessage limitation in
realtime adapters that pack history into a single text message.

aws_nova_sonic_adapter.py — pure typing + small narrowing fixes:
- Filter LLMSpecific items in `_from_universal_context_messages`
  (documented).
- `_from_universal_context_message` now declared
  `-> AWSNovaSonicConversationHistoryMessage | None` (it already had
  paths returning None implicitly).
- `get_messages_for_logging` returns `dict[str, Any]` per element
  via `dataclasses.asdict`, matching the declared return type.
- Use a local `role` variable so pyright keeps the narrowing across
  the truthy-content guard.

grok_realtime_adapter.py / inworld_realtime_adapter.py — same shape
of fix as `open_ai_realtime_adapter.py` from the previous batch.
The two files are essentially copies of the OpenAI Realtime adapter,
so the same template applies: cast at the boundary, filter
LLMSpecificMessage with a documented note, replace the implicit-None
fallthrough with `raise ValueError`, and switch the `text_content +=`
pattern (which fails when one of the parts is None) to a
`text_parts.append(...)` + `" ".join(...)` pattern.

open_ai_adapter.py — pure typing. Cast at the
`OpenAILLMInvocationParams` return, narrow the system-instruction
warning's `initial_content` to `str | None`, and cast the custom-tools
list to `list[ChatCompletionToolParam]`.

open_ai_responses_adapter.py — pure typing. Same shape: narrow
`first_content` to `str | None` for the warning resolver, cast the
constructed dict literals at append sites where the target is
`ResponseInputItemParam`, and cast `get_messages_for_logging`'s
return to the declared `list[dict[str, Any]]`.

processors/aggregators/llm_context.py — pure typing. Cast the
deepcopied message in the redaction loop in `get_messages` to
`dict[str, Any]` and the create_image/audio_message return-dict
literals to `LLMContextMessage`.

Removes 6 newly-clean files from the pyright ignore list.

Net: -77 pyright errors (full-config: 680 -> 603).
2026-05-01 09:36:14 -04:00
Paul Kompfner
5e24027fd5 fix: type fixes (and a few latent bug fixes) in 4 LLM adapters
Same shape of fix we applied to anthropic_adapter.py earlier — these
adapters do dict-style mutation on values typed as
ChatCompletionMessageParam (a union of TypedDicts) or against Optional
fields. Apply boundary casts (`cast(dict[str, Any], ...)` for the
mutation block, cast back to the TypedDict at return sites). Most
changes are pure typing (rename + cast); a handful in gemini and
openai_realtime are small defensive bug fixes for code paths that
were latently broken by Optional fields slipping through:

perplexity_adapter.py — pure typing. Cast the deepcopied messages to
`list[dict[str, Any]]` for the role-merging / system-conversion /
trailing-assistant-removal transformations and cast back to
ChatCompletionMessageParam at the return.

bedrock_adapter.py — pure typing. Cast the message to
`dict[str, Any]` at the top of `_from_standard_message` for the
tool-result / tool-use / image-content transformations. Cast the
constructed dict at the return site of `get_llm_invocation_params`.

gemini_adapter.py — typing + several None guards on Content.parts and
related Optional fields. Each guard turns a latent
`TypeError`/`AttributeError` (when the type-system-allowed None
showed up at runtime) into a defensive skip — the type annotations
say these can be None and we now handle that.

open_ai_realtime_adapter.py:
- Typing: cast the deepcopied messages, cast back where needed.
- LLMSpecificMessage handling: previously the function would crash on
  the first `.get()` call if any LLMSpecificMessage was in the list.
  Filter them out and document the limitation — this adapter's
  pack-into-single-text-message strategy doesn't compose with opaque
  per-provider payloads.
- Real bug fix: `events.ConversationItem` is a Pydantic BaseModel,
  not a TypedDict. The bulk-packing path was constructing a raw dict
  where a ConversationItem was expected. Replaced with proper
  constructor calls (matches what the single-user-message path
  already does).
- Real bug fix: `_from_universal_context_message` was declared
  `-> events.ConversationItem` but on the unhandled-message
  fallthrough it logged and returned None implicitly. Raise
  ValueError so the violation is loud, not silent.

Removes 4 newly-clean files from the pyright ignore list:
adapters/services/{perplexity,bedrock,gemini,open_ai_realtime}_adapter.py.

Net: -95 pyright errors (full-config: 775 -> 680).
2026-05-01 09:36:14 -04:00
Paul Kompfner
ef226c8a8e fix: silence _settings NotGiven leaks and tighten Google STT language method
Six pyright errors followed the same pattern: a value flowed out of
`self._settings.X` (typed `T | _NotGiven`) into a context that wanted
the plain `T`. Wrap each with `assert_given(...)` so the sentinel
gets stripped at the boundary:

- aws/nova_sonic/llm.py: `_settings.model` (in InvokeModel...Input)
  and `_settings.system_instruction` (passed to the adapter).
- deepgram/flux/base.py: iterating `_settings.keyterm`.
- google/stt.py: iterating `_settings.languages`.
- google/tts.py: iterating `_settings.speaker_configs`.
- openai/base_llm.py: `_settings.system_instruction` passed to the
  adapter.

Also takes a deeper pass at the related Google STT issue: the override
of `language_to_service_language` had been broadened to take
`Language | list[Language]` and return `str | list[str]`, a Liskov
violation against the base's `Language -> str | None` contract.
External callers always pass a single Language, and the only consumer
of the list path was Google STT's own `_get_language_codes`. Restore
the override to a single-Language signature and let
`_get_language_codes` iterate. The override is also tightened to
return `str` (narrower than the base's `str | None`, which is
LSP-compatible) since it always falls back to `"en-US"` rather than
returning None.

Net: -7 pyright errors (full-config run: 782 -> 775).
2026-05-01 09:36:14 -04:00
Paul Kompfner
2a731336be fix: tighten language_to_<service>_language return types to plain str
These provider-specific helpers are all thin wrappers around
`resolve_language(...)`, which itself returns `str` — never `None`.
The `str | None` annotations were misleading and were producing
spurious pyright errors at the call sites that assigned the result
into a `str` field. Update each helper's signature to `str` and
rewrite the `Returns:` docstring to describe the actual fallback
behaviour (resolve to base or full code, with a warning).

Importantly, the per-class `language_to_service_language(...)`
methods on `STTService`/`TTSService` subclasses keep `str | None` as
their return type. That signature is an extension hook for future
and/or third-party subclasses that may genuinely not be able to
produce a code for some languages, even though all in-tree first-
party services currently return a string.

Also includes one small unrelated tightening in azure/stt.py: wrap
`self._settings.language` with `assert_given(...)` so the truthy
fallback to `language_to_azure_language(Language.EN_US)` doesn't
silently swallow a NotGiven sentinel.

Net: -3 pyright errors (full-config run: 785 -> 782).
2026-05-01 09:36:14 -04:00
Paul Kompfner
bec407ce3a fix: handle Optional websocket/client receivers across services
Pyright flagged 19 sites where `await self._<connection>.send/recv/...`
was called on a receiver typed `X | None`. Each kind of call site
needed a slightly different fix to be both type-safe and behaviour-
preserving:

Streaming/user-facing paths (early return + warn — drop and warn is
the right runtime fail-safe when reconnect didn't succeed):

- cartesia/stt.py (run_stt)
- soniox/stt.py (_send_keepalive)
- elevenlabs/tts.py (run_tts — yields ErrorFrame and returns)
- deepgram/sagemaker/tts.py (run_tts)
- transports/lemonslice/transport.py (send_message)
- transports/tavus/transport.py (send_message)

"Should never happen" cases (early return with comment, no warn —
caller already gated on a separate `_is_*` check, so a warn would be
noise):

- deepgram/flux/stt.py (transport methods, gated by _transport_is_active)
- deepgram/flux/sagemaker/stt.py (same)
- stt_service.py (_send_keepalive, gated by _is_keepalive_ready)
- elevenlabs/stt.py (_send_keepalive, same)
- llm_service.py (_ws_recv — raises ConnectionError to match
  _ensure_connected's contract)
- heygen/client.py (receive loop, gated by self._connected)

Just-assigned-above (use a local variable so pyright keeps the
narrowing across statements):

- lmnt/tts.py
- gradium/stt.py
- fish/tts.py

Other:

- transports/websocket/server.py — used the existing local `websocket`
  parameter in scope instead of `self._websocket` for the close call.
- websocket_service.py — `send_with_retry` raises ConnectionError when
  `self._websocket` is None inside the existing try-block, so the
  broad `except Exception` triggers reconnect just as it would on a
  real send failure (preserving the prior behaviour where None
  silently fell through to the AttributeError-driven reconnect path).

Drops three now-clean files from the pyright ignore list: cartesia/stt.py,
elevenlabs/stt.py, and soniox/stt.py.
2026-05-01 09:36:14 -04:00
Paul Kompfner
1cd73b1ef8 refactor: give TAdapter a default to restore precise typing for unparameterized LLMService subclasses
After making LLMService generic, an unparameterized subclass
(`class MyService(LLMService):` with no bracket — the third-party
provider pattern) saw `get_llm_adapter()` return `Unknown` rather
than `BaseLLMAdapter` as it did before the refactor.

Add `default=BaseLLMAdapter` (PEP 696) on the TypeVar — via
`typing_extensions.TypeVar` so older Python targets keep working —
so unparameterized callers get `LLMService[BaseLLMAdapter]` and
`get_llm_adapter()` returns `BaseLLMAdapter`, matching the
pre-refactor type precision.

Two internal fallouts of having a default (where the default makes
unannotated `LLMService` resolve invariantly to
`LLMService[BaseLLMAdapter]`):

- `FunctionCallParams.llm` is now `LLMService[Any]` so concrete
  parameterizations like `LLMService[OpenAILLMAdapter]` can be
  passed where the field is set.
- The explicit `LLMService.__init__(self, **kwargs)` in
  `WebsocketLLMService.__init__` gets a `pyright: ignore[reportArgumentType]`
  comment — pyright's invariance handling can't see through the
  multi-inheritance + generic + default combination, but the
  runtime call is correct (generics are erased).
2026-05-01 09:36:14 -04:00
Paul Kompfner
c4f5f1ebbb test, refactor: follow-ups to LLMService generic refactor
Two follow-ups now that LLMService is generic over its adapter:

- Add an explicit backward-compat test verifying that an LLMService
  subclass with no generic parameter (the third-party-provider
  pattern) instantiates and returns a usable adapter. The existing
  MockLLMService (declared without brackets) already exercised this
  implicitly, but it's worth a named assertion.

- Drop the now-redundant `params: SomeLLMInvocationParams = ...`
  variable annotations on `adapter.get_llm_invocation_params()`
  results. Since `get_llm_adapter()` now returns the precise adapter
  type, and `BaseLLMAdapter` is generic in its invocation-params
  type, the call already infers the right TypedDict.
2026-05-01 09:36:14 -04:00
Paul Kompfner
49068ff557 refactor: make LLMService generic over its adapter type
Previously, `LLMService.get_llm_adapter()` returned `BaseLLMAdapter`,
which forced every caller that wanted the precise adapter type to
write `adapter: SomeAdapter = self.get_llm_adapter()` and accept
pyright's complaint that the assignment doesn't match the declared
type. That pattern existed in 17 places across the LLM services.

Make `LLMService` generic over its adapter type — `LLMService(...,
Generic[TAdapter])` with `TAdapter = TypeVar("TAdapter",
bound=BaseLLMAdapter)` — so subclasses opt in via
`LLMService[XAdapter]` and callers get the precise type back from
`get_llm_adapter()` automatically.

Backward-compatible for third-party providers: code that says
`class MyService(LLMService):` (no bracket) still type-checks, with
TAdapter resolving to BaseLLMAdapter from the bound — identical to
the pre-refactor behavior. The `adapter_class` attribute keeps its
loose `type[BaseLLMAdapter] = OpenAILLMAdapter` typing so the default
remains usable; one localized cast in `__init__` bridges the loose
class attr to the precise instance attr.

In-tree subclasses opted in:

- AnthropicLLMService -> LLMService[AnthropicLLMAdapter]
- AWSBedrockLLMService -> LLMService[AWSBedrockLLMAdapter]
- AWSNovaSonicLLMService -> LLMService[AWSNovaSonicLLMAdapter]
- BaseOpenAILLMService -> LLMService[OpenAILLMAdapter] (propagates to
  ~15 OpenAI-compatible providers like Cerebras, Groq, Together)
- GeminiLiveLLMService -> LLMService[GeminiLLMAdapter]
- GoogleLLMService -> LLMService[GeminiLLMAdapter]
- GrokRealtimeLLMService -> LLMService[GrokRealtimeLLMAdapter]
- InworldRealtimeLLMService -> LLMService[InworldRealtimeLLMAdapter]
- OpenAIRealtimeLLMService -> LLMService[OpenAIRealtimeLLMAdapter]
- _BaseOpenAIResponsesLLMService -> LLMService[OpenAIResponsesLLMAdapter]
- WebsocketLLMService is also generic so the multi-inheritance case
  (OpenAIResponsesLLMService) can keep both bases agreeing on TAdapter.

All 17 redundant `adapter: SomeAdapter = self.get_llm_adapter()`
annotations are now plain `adapter = self.get_llm_adapter()`.
2026-05-01 09:36:14 -04:00
Paul Kompfner
d23bdaaacd fix: handle NotGiven from from_standard_tools in Nova Sonic connect
Same pattern as the earlier get_setup_params fix: when context tools
are absent, the fallback `adapter.from_standard_tools(self._tools)`
can return the NotGiven sentinel, and `_send_prompt_start_event`
expects a list. Coerce via `or []` so the NotGiven case becomes an
empty list.
2026-05-01 09:36:14 -04:00
Paul Kompfner
53ce57b7fa fix: tighten _process_completed_function_calls in AWS Nova Sonic
Three small changes that resolve pyright errors and sharpen the logic:

- Guard `self._context` with the codebase's "should never happen"
  early-return pattern, so we don't blindly call `.get_messages()` on
  None.
- Skip `LLMSpecificMessage` items in the iteration. They're opaque
  provider-specific payloads with no `.get()`, and the surrounding
  logic only applies to standard tool-result messages.
- Match `role == "tool"` explicitly. The previous truthy-only check
  was working by accident — the `tool_call_id` filter further down
  was effectively narrowing to tool messages, but the intent is
  clearer when stated upfront.
2026-05-01 09:36:14 -04:00
Paul Kompfner
dabca70744 fix: warn and bail in reset_conversation when no context exists
reset_conversation is part of the public AWSNovaSonicLLMService API and
is also called internally from the receive-task error handler.
Previously it captured `self._context` (typed `LLMContext | None`) and
unconditionally passed it to `_handle_context`, which expects a real
context — silently doing the wrong thing if no initial context had
been received yet.

Treat that as developer error: log a warning and return early. Nothing
to preserve means nothing to reset.
2026-05-01 09:36:14 -04:00
Paul Kompfner
191bdc733f fix: conform AWSNovaSonicLLMService.get_setup_params to its protocol
The service implements the NovaSonicSessionSender protocol so the
session-continuation helper can target either the current or next
session. The protocol declares
`get_setup_params(self) -> tuple[str | None, list]`, but the
implementation was unannotated and could return NotGiven in the tools
position when from_standard_tools fell through to its NotGiven
sentinel. Add the matching return annotation and coerce the NotGiven
case to an empty list.
2026-05-01 09:36:14 -04:00
Paul Kompfner
5e1bb4cbe5 chore: remove anthropic_adapter.py from pyright ignore list
The file is now clean under pyright's basic type checking, so it can
move out of the ignore list and be enforced on every run.
2026-05-01 09:36:14 -04:00
Paul Kompfner
9ee123bf33 fix: resolve final pyright error in Anthropic cache control marker
Same MessageParam content-typing issue as the consecutive-message merge
fix: pyright doesn't carry the str-to-list narrowing forward, and
Iterable has no `[-1]` access. Cast to `list[Any]` and document the
chain of assumptions (list, non-empty, dict-typed last item) and where
each is upheld upstream.

This brings anthropic_adapter.py to 0 pyright errors (down from 115).
2026-05-01 09:36:14 -04:00
Paul Kompfner
66f43baf8f fix: resolve pyright errors in Anthropic _from_standard_message
The function takes an OpenAI ChatCompletionMessageParam (a union of
TypedDicts) and returns an Anthropic MessageParam (a different
TypedDict). It does the conversion via dict-level mutations that don't
type-check against either side's TypedDict schema. Work with the
deepcopied message as a plain dict and cast to MessageParam at the
return sites — matching the boundary-cast convention noted in
llm_context.py.

Drops anthropic_adapter.py from 20 to 2 pyright errors.
2026-05-01 09:36:14 -04:00
Paul Kompfner
252bb493af fix: cast Anthropic-format passthrough message to MessageParam
The fallback path in `_from_universal_context_message` returns
`message.message` from an `LLMSpecificMessage`, which is typed loosely
(`Any | dict`). The surrounding comment already documents the
assumption that the message is already in Anthropic format — make that
assumption explicit to pyright with a cast.
2026-05-01 09:36:14 -04:00
Paul Kompfner
c517b67bad fix: resolve pyright error when merging consecutive Anthropic messages
MessageParam types content as `str | Iterable[...]`, and Iterable has
no `.extend()`. After the str-to-list conversions, pyright re-reads
the TypedDict field as the original wide type rather than carrying the
narrowing forward. Cast to `list[Any]` to express the codebase's
existing str-or-list assumption.

Drops anthropic_adapter.py from 23 to 21 pyright errors.
2026-05-01 09:36:14 -04:00
Paul Kompfner
70aeb5c7c2 fix: resolve pyright errors in Anthropic get_messages_for_logging
Content items in MessageParam have a heterogeneous union type (Pydantic
ContentBlock variants and TypedDict *BlockParam variants), neither of
which supports the dict-style access and mutation this sanitizer does.
Treat the deepcopied message as a plain dict and guard each content
item with isinstance(item, dict) — matches the runtime shape produced
by _from_standard_message and avoids crashing if a non-dict ever flows
through the LLMSpecificMessage path.

Drops anthropic_adapter.py from 115 to 23 pyright errors.
2026-05-01 09:36:14 -04:00
Mark Backman
440738f727 Update changelog for #4401: split fix and default-model change 2026-05-01 09:19:27 -04:00
Mark Backman
7da94436f5 Add changelog for #4401 2026-05-01 09:18:34 -04:00
Mark Backman
492c9702ee fix(xai/realtime): pass model as query param on connect
xAI's Voice Agent API selects the model via the ?model= query
parameter on the WebSocket URL; it cannot be changed later via
session.update. The Grok Realtime service was setting the model in
Settings but never including it in the connection URL, so every
session silently fell back to the deprecated default
grok-voice-fast-1.0.

Append the model from Settings to the WebSocket URL on connect, and
default to the recommended grok-voice-think-fast-1.0.
2026-05-01 09:16:52 -04:00
Mark Backman
f1eef9ba0a Merge pull request #4400 from pipecat-ai/mb/deepgram-tts-mip-opt-out
feat(deepgram): add mip_opt_out to TTS services
2026-05-01 09:12:03 -04:00
Mark Backman
132b9b1002 Add changelog for #4400 2026-05-01 08:58:38 -04:00
Mark Backman
eb4e56d2d9 feat(deepgram): expose mip_opt_out on TTS services
Adds a `mip_opt_out` init parameter to both `DeepgramTTSService` (WebSocket)
and `DeepgramHttpTTSService` so callers can opt out of the Deepgram Model
Improvement Program. When set, the value is forwarded as a query parameter
on the request, matching the pattern used by the Deepgram STT services.
2026-05-01 08:55:23 -04:00
Mark Backman
13643b192b Add changelog for #4397 2026-04-30 21:41:28 -04:00
kompfner
6d66bbceeb Merge pull request #4395 from pipecat-ai/pk/app-resources-api-updates
Broaden tool_resources to app_resources
2026-04-30 21:19:05 -04:00
Mark Backman
6cab2ce3f7 chore(smallwebrtc): lower app message log to trace level
App messages can be high-frequency, so logging each one at debug is noisy.
2026-04-30 21:06:47 -04:00
Aleix Conchillo Flaqué
a27d9fc30b Merge pull request #4396 from pipecat-ai/aleix/remove-unused-user-mute-reset
refactor(user_mute): remove unused reset() method from strategies
2026-04-30 17:27:54 -07:00
Aleix Conchillo Flaqué
2a8f4734e0 refactor(user_mute): remove unused reset() method from strategies
The reset() method on BaseUserMuteStrategy and its subclasses was never
called anywhere in the codebase.
2026-04-30 16:31:29 -07:00
Mark Backman
48ac68e3c8 Merge pull request #4393 from pipecat-ai/mb/fix-smart-turn-import
fix(turns): defer LocalSmartTurnAnalyzerV3 import to fix transformers warning
2026-04-30 16:40:17 -04:00
Paul Kompfner
c3ef199efa Add changelog for #4395 2026-04-30 16:19:35 -04:00
Paul Kompfner
1b5c4cfa2a feat: broaden tool_resources to app_resources
Broaden `tool_resources` to `app_resources` for easy access not just in
tool handlers but in other places like custom `FrameProcessor`s.

Involves 3 changes:

- A rename: `tool_resources` -> `app_resources`
- A new property on `PipelineTask`: `app_resources`
- A new property on `FrameProcessor`: `pipeline_task`

Usage in tool handler:

    async def get_weather(params: FunctionCallParams):
        resources = cast(MyAppResources, params.app_resources)
        ...

Usage in custom `FrameProcessor`:

    class MyProcessor(FrameProcessor):
        async def process_frame(self, frame, direction):
            await super().process_frame(frame, direction)
            if self.pipeline_task is not None:
                resources = cast(MyAppResources, self.pipeline_task.app_resources)
                ...

The previous `tool_resources` aliases (on `PipelineTask`,
`FunctionCallParams`, and `FrameProcessorSetup`) keep working but are
deprecated as of 1.2.0 and emit `DeprecationWarning`s.
2026-04-30 16:16:17 -04:00
Mark Backman
6e9dd1dbcc Merge pull request #4390 from pipecat-ai/mb/cartesia-tts-api-updates
feat(cartesia): align TTS services with latest API and buffering guidance
2026-04-30 15:59:15 -04:00
Mark Backman
6487f895b3 Setting use_normalized_timestamps to False so that input and output text match 2026-04-30 14:21:14 -04:00
Mark Backman
351105a975 test(krisp): scope importlib.metadata.version mock to imports only
The four krisp test files installed a process-wide mock of
importlib.metadata.version with `patch(...).start()` at module level and
never called .stop(). Once any of these files was collected, the mock
leaked across the rest of the test session, returning '0.0.0-dev' for
every version check. This corrupted unrelated tests that triggered
transformers' import-time dependency check (e.g. lazy imports of
LocalSmartTurnAnalyzerV3) — transformers saw tqdm=='0.0.0-dev' and
refused to load.

Wrap the pipecat imports in `with patch(...)` so the mock is active
during import (where pipecat's krisp version check needs it) and torn
down before any tests run.
2026-04-30 14:16:54 -04:00
Mark Backman
8ea963852d Add changelog for #4393 2026-04-30 14:16:46 -04:00
Mark Backman
6f4458f21d fix(turns): defer LocalSmartTurnAnalyzerV3 import to avoid loading transformers at module load
Importing pipecat.turns.user_turn_strategies pulled in
LocalSmartTurnAnalyzerV3 → transformers → onnxruntime at module load
time. Since this module is imported by llm_response_universal (and
therefore most LLM services), any LLM service import paid the cost of
loading transformers and triggered its missing-backend warning in
environments without PyTorch/TF/Flax.

Move the LocalSmartTurnAnalyzerV3 import into
default_user_turn_stop_strategies() so it only loads when the default
smart-turn strategy is actually constructed.

Fixes #4392
2026-04-30 14:16:41 -04:00
Mark Backman
fb42a7dcf3 Add changelog for #4390 2026-04-30 09:45:16 -04:00
Mark Backman
21547c8680 fix(cartesia): stop double-yielding ErrorFrame on HTTP non-200
The non-200 branch yielded an ErrorFrame and then raised, which the outer
except caught and yielded a second, less informative "Unknown error" frame.
Return after the yield and fold the status code into the message.
2026-04-30 09:41:43 -04:00
Mark Backman
3e5aabc5f2 fix(cartesia): guard HTTP session before use
Pyright flagged the .post() call on a possibly-None _session. Raise a
clear RuntimeError if start() wasn't called instead of crashing on the
attribute access.
2026-04-30 09:34:02 -04:00
Mark Backman
e508642b0a refactor(cartesia): mark tag helpers as @staticmethod
SPELL/EMOTION_TAG/PAUSE_TAG/VOLUME_TAG/SPEED_TAG are stateless and worked
only via class-level access. Decorating them lets instance access work too
and silences the missing-self lint warning.
2026-04-30 09:31:22 -04:00
Mark Backman
e546541e20 feat(cartesia): align WebSocket TTS with latest API and buffering guidance
- Bump default cartesia_version to 2026-03-01.
- Replace deprecated use_original_timestamps with use_normalized_timestamps
  so word timestamps match what was actually spoken.
- Add max_buffer_delay_ms init arg; auto-derive 0 in SENTENCE mode to avoid
  the doc-warned "middle ground" of client + server buffering, leave unset
  in TOKEN mode for managed buffering.
- Silently consume flush_done messages now emitted per transcript when
  server-side buffering is disabled.
2026-04-30 09:25:31 -04:00
236 changed files with 10109 additions and 1548 deletions

1
.agents/skills/changelog Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/changelog

1
.agents/skills/cleanup Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/cleanup

1
.agents/skills/code-review Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/code-review

1
.agents/skills/docstring Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/docstring

View File

@@ -0,0 +1 @@
../../.claude/skills/pr-description

1
.agents/skills/pr-submit Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/pr-submit

1
.agents/skills/update-docs Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/update-docs

View File

@@ -1,3 +1,8 @@
---
name: cleanup
description: Review, refactor, document, and validate code changes in the current branch
---
# Code Cleanup Skill
The **Code Cleanup Skill** reviews, refactors, and documents code changes in your current branch, ensuring alignment with **Pipecat's architecture, coding standards, and example patterns**.

View File

@@ -42,6 +42,7 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

View File

@@ -32,7 +32,9 @@ jobs:
run: uv python install 3.12
- name: Install development dependencies
run: uv sync --group dev --extra daily --extra tracing
# `--all-extras` (matching the dev setup in README.md) so pyright can
# resolve types from various optional dependencies.
run: uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
- name: Ruff formatter
id: ruff-format

View File

@@ -46,6 +46,7 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

174
AGENTS.md Normal file
View File

@@ -0,0 +1,174 @@
# AGENTS.md
This file provides guidance to AI coding agents when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Deprecations**: Use the `.. deprecated:: <version>` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
# Pydantic params class with a deprecated field
class MyParams(BaseModel):
"""Configuration parameters for MyService.
Parameters:
new_setting: Replacement for ``old_setting``.
old_setting: Legacy setting, no longer used.
.. deprecated:: 1.2.0
Use ``new_setting`` instead. Will be removed in 2.0.0.
"""
new_setting: str = "default"
old_setting: str | None = None
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.

158
CLAUDE.md
View File

@@ -1,157 +1 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.
@AGENTS.md

1
changelog/4390.added.md Normal file
View File

@@ -0,0 +1 @@
- Added a `max_buffer_delay_ms` constructor argument to `CartesiaTTSService` for controlling Cartesia's server-side text buffering. When unset, Pipecat picks a sensible default based on `text_aggregation_mode`: `0` in `SENTENCE` mode (custom buffering — avoids stacking client-side aggregation on top of Cartesia's default 3000ms server buffer) and unset in `TOKEN` mode (Cartesia's managed buffering applies). Pass an explicit value (05000ms) to override.

View File

@@ -0,0 +1 @@
- Default `cartesia_version` for `CartesiaTTSService` bumped from `2025-04-16` to `2026-03-01`, matching `CartesiaHttpTTSService` and unlocking the `use_normalized_timestamps` and `max_buffer_delay_ms` fields.

View File

@@ -0,0 +1 @@
- ⚠️ `CartesiaTTSService` now sends `use_normalized_timestamps: true` instead of the deprecated `use_original_timestamps` field. Word timestamps now reflect what was actually spoken (post text-normalization and pronunciation-dictionary substitution), matching the convention Pipecat uses for ElevenLabs. This is a behavior change for `sonic-3` users, who were previously receiving timestamps tied to the input transcript.

View File

@@ -0,0 +1 @@
- Fixed `CartesiaHttpTTSService` pushing two `ErrorFrame`s on a non-200 response — one with the API's error text and a second, less informative "Unknown error" frame from the outer exception handler. It now pushes a single frame that includes the HTTP status code and returns cleanly.

View File

@@ -0,0 +1 @@
- Fixed Cartesia tag helpers (`SPELL`, `EMOTION_TAG`, `PAUSE_TAG`, `VOLUME_TAG`, `SPEED_TAG`) raising `TypeError` when called on an instance (e.g. `tts.SPELL("hi")`). They're now `@staticmethod` and callable from both the class and an instance.

1
changelog/4390.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `CartesiaTTSService` surfacing `flush_done` messages from Cartesia as `ErrorFrame`s. The latest API emits a `flush_done` per transcript when server-side buffering is disabled; Pipecat now consumes them silently since each turn already has its own `context_id`.

1
changelog/4393.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed an issue where `LocalSmartTurnAnalyzerV3` was imported unconditionally for user turn stop strategies. It is now only imported when `default_user_turn_stop_strategies()` is called. This improves startup time and removes the `transformers` "PyTorch/TensorFlow/Flax not found" warning when the default stop strategies are not used.

View File

@@ -0,0 +1 @@
- Broadened `tool_resources` to `app_resources` for easy access not just in tool handlers but in other places like custom `FrameProcessor`s. Three changes: a rename (`tool_resources``app_resources`), a new `app_resources` property on `PipelineTask`, and a new `pipeline_task` property on `FrameProcessor`. Tool handlers now read `params.app_resources`; custom processors read `self.pipeline_task.app_resources`. The previous `tool_resources` aliases (on `PipelineTask`, `FunctionCallParams`, and `FrameProcessorSetup`) keep working but are deprecated as of 1.2.0 and emit `DeprecationWarning`s.

View File

@@ -0,0 +1 @@
- Lowered the per-message log in `SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App messages can be high-frequency and were noisy at debug level; set the loguru level to `TRACE` to see them again.

1
changelog/4400.added.md Normal file
View File

@@ -0,0 +1 @@
- Added a `mip_opt_out` constructor argument to `DeepgramTTSService` and `DeepgramHttpTTSService` so callers can opt out of the Deepgram Model Improvement Program. When set, the value is forwarded to Deepgram as a query parameter on the speak request. Defaults to `None`, which preserves the existing behavior. See https://dpgr.am/deepgram-mip for pricing implications before enabling.

View File

@@ -0,0 +1 @@
- Changed the default model for `GrokRealtimeLLMService` to `grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is being removed.

1
changelog/4401.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was stored in `Settings` but never sent to xAI, so every session silently fell back to xAI's server-side default. The model is now passed via the `?model=` query parameter on the WebSocket URL as xAI's Voice Agent API requires.

1
changelog/4404.added.md Normal file
View File

@@ -0,0 +1 @@
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that appends a developer-role message to the context whenever `LLMSetToolsFrame` changes the set of advertised standard tools. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable.

View File

@@ -0,0 +1 @@
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.

View File

@@ -0,0 +1 @@
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.

View File

@@ -0,0 +1 @@
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.

View File

@@ -0,0 +1 @@
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.

1
changelog/4405.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.

View File

@@ -0,0 +1 @@
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.

1
changelog/4405.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.

6
changelog/4407.added.md Normal file
View File

@@ -0,0 +1,6 @@
- Added first-class RTVI support for the UI Agent Protocol:
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server messages, plus `ui-command` and `ui-task` server-to-client messages, with paired `*Data` / `*Message` pydantic models.
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`, `Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching default handlers live in `@pipecat-ai/client-react`.
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`, and `ui-cancel-task` messages.
- Adds five UI pipeline frames, mirroring the `client-message` frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` / `RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` / `UITaskMessage` envelopes, while the processor pushes inbound `RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame` alongside `on_ui_message`.
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.

1
changelog/4414.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting across two assistant messages in the LLM context and not surfacing in `on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted at the end of a TTS context now carries a PTS just past the last word so it can't overtake clock-queued `TTSTextFrame`s in the transport's output, and `LLMAssistantAggregator` now triggers `on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting transcripts).

1
changelog/4415.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged words (e.g. `bookLook`) when using Flash models. Flash often splits sentences mid-stream into alignment chunks that begin with a real inter-word space, but the previous fix unconditionally stripped that space from every chunk. Leading spaces are now stripped only on the first alignment chunk of an utterance, so subsequent chunks correctly flush partial words across boundaries.

1
changelog/4416.added.md Normal file
View File

@@ -0,0 +1 @@
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor now resolve credentials via the standard boto3 provider chain (EC2 instance profiles, EKS pod roles / IRSA, ECS task roles, SSO, `~/.aws/credentials`) when explicit credentials and `AWS_*` environment variables are absent. Services running with IAM roles no longer need to export static credentials.

1
changelog/4416.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` was set in the environment. The half-populated kwargs are no longer forwarded to aioboto3; partial env-var configurations now fall through to the boto3 credential chain like fully-unset configurations do.

View File

@@ -0,0 +1 @@
- Fixed a path traversal issue in the development runner's `/files/{filename:path}` download endpoint. Previously, when the runner was started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could escape the configured folder because `%2F`-encoded separators bypassed Starlette's path normalisation. The endpoint now resolves the joined path and rejects any filename that escapes the allowed base with a 403, and also returns 404 (instead of an implicit `null` 200) when `--folder` is unset.

View File

@@ -0,0 +1 @@
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to `inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`, `InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing users can pin the prior model explicitly via the `model`/`tts_model` argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid model IDs.

1
changelog/4424.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing romanized/normalized text to the LLM context. With non-Latin input (e.g., Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao !` instead of `你好!`), which then degraded subsequent LLM turns. The services now consume `alignment` by default and only switch to `normalizedAlignment` / `normalized_alignment` when `pronunciation_dictionary_locators` is configured (where `alignment` has overlapping restarts that produce duplicated/garbled words, per #4316). Both fields are read with preferred-with-fallback semantics since each is nullable per the API schema.

1
changelog/4426.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can bias transcription for both file-based and realtime transcription.

View File

@@ -0,0 +1 @@
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the `create_file_resampler()` / `create_stream_resampler()` factories). Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class will be removed in Pipecat 2.0 along with the default `resampy` and `numba` dependencies.

View File

@@ -0,0 +1 @@
- Changed the default model for `GrokLLMService` from `grok-3` to `grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.

1
changelog/4430.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.

View File

@@ -0,0 +1 @@
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.

1
changelog/4431.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a deadlock in `TTSService` that could permanently stall pipeline processing when all three conditions occurred together: `pause_frame_processing=True`, an interruption arrived before any TTS audio was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`, `FunctionCallResultFrame`) was in the processing queue at that moment. The process task would block on `__process_event.wait()` indefinitely because `BotStoppedSpeakingFrame` never arrives (no audio was played) and the interruption handler did not resume processing. Affects services using `pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and ResembleAI.

View File

@@ -0,0 +1 @@
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.

1
changelog/4434.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed interruptions being delayed when a slow non-uninterruptible frame was processing and an uninterruptible frame was waiting in the queue. The bot would stall until the slow frame finished instead of cancelling it immediately on interruption.

1
changelog/4435.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `TTSService` dropping uninterruptible frames (e.g. `FunctionCallResultFrame`) from its internal serialization queue when an interruption occurs. Previously, the queue was recreated on every interruption, silently discarding any queued frames. The queue is now reset instead of recreated, preserving uninterruptible frames so they are always delivered downstream.

1
changelog/4440.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a race condition in the Daily transport that caused `AttributeError: 'NoneType' object has no attribute 'send_app_message'` when tearing down a pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the same `DailyTransportClient` and both call `cleanup()`, which was releasing the underlying `CallClient` on the first call — leaving the second caller with a `None` client.

1
changelog/4441.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService` and `OpenAIRealtimeLLMService`. These services previously honored the flag by simply not cancelling in-flight function calls on interruption; the introduction of the new async-tool mechanism (which threads started/intermediate/final messages through the LLM context) broke that path because the realtime services didn't know how to interpret those messages. Note that new-style streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are not supported on these realtime services. Similar fixes for other impacted realtime services are forthcoming.

1
changelog/4443.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed two misspelled Gemini TTS voice names in `GeminiTTSService.AVAILABLE_VOICES`.

1
changelog/4446.change.md Normal file
View File

@@ -0,0 +1 @@
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio encoding by default, which returns audio bytes without headers.

1
changelog/4447.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.

1
changelog/4448.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini 2.x); the conversation now continues while the tool runs. On models that don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time warning explaining the limitation. (Note: an intermittent 1008 error can occasionally fire on Gemini 2.5 during long-running tool calls; we auto-reconnect.)

View File

@@ -0,0 +1 @@
- Moved `create_task`, `cancel_task`, the `task_manager` property, and `setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom `BaseObject` subclasses (turn strategies, controllers, etc.) now inherit these methods directly instead of reimplementing the task manager wiring. Owners propagate the task manager to their child `BaseObject`s via `await child.setup(task_manager)`.

View File

@@ -0,0 +1 @@
- Updated the default model for `CartesiaTTSService` and `CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.

View File

@@ -0,0 +1 @@
- Added NVIDIA Magpie TTS services via AWS SageMaker: `NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream with full interruption support via `InterruptibleTTSService`).

1
changelog/4464.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint. Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is VAD-aware, and automatically reconnects on error.

1
changelog/4465.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses (observed with `gpt-realtime-2`). A single response can now contain more than one audio item, and the first item's `audio.done` may arrive after the second item's deltas have started. Deltas still arrive strictly in playback order, so we continue to forward them as received (matching OpenAI's reference implementation). The fix removes spurious warnings, ensures truncation always targets the latest audio item, and emits a single bracketing `TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is now pushed on `response.done`).

1
changelog/4470.added.md Normal file
View File

@@ -0,0 +1 @@
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.

View File

@@ -0,0 +1 @@
- Changed the default model for `OpenAIRealtimeLLMService` from `gpt-realtime-1.5` to `gpt-realtime-2`.

View File

@@ -132,6 +132,10 @@ NOVITA_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
SAGEMAKER_ASR_ENDPOINT_NAME=...
SAGEMAKER_MAGPIE_ENDPOINT_NAME=...
# OpenAI
OPENAI_API_KEY=...

View File

@@ -0,0 +1,232 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual validation harness for the ``add_tool_change_messages`` feature.
When tools change mid-conversation, LLMs can produce a few different
flavors of tool-call-related hallucination:
- **Forward hallucination** — calling a tool that has been removed.
- **Negative hallucination** — refusing to call a tool that has been
re-added (because recent context is full of "I can't" responses).
- **Hallucinated output when tools are unavailable** — making up an
answer rather than declining gracefully, or producing JSON that
*looks* like a tool call but is actually just an assistant text
response.
The ``add_tool_change_messages`` feature mitigates these by appending a
developer-role message to the conversation whenever ``LLMSetToolsFrame``
changes the set of advertised tools, so the LLM stays in sync with what's
actually available.
This harness exercises all of those flavors by flipping the advertised
tool set on a turn counter:
Phase 0 (turns 14): weather tool ACTIVE — confirm baseline.
Phase 1 (turns 58): tool REMOVED — keep asking for weather.
Phase 2 (turn 9+): tool RE-ADDED — does the LLM call it again?
Set ``ADD_TOOL_CHANGE_MESSAGES=0`` to disable the mitigation and see the
unmitigated behavior. The default is ON so a fresh run shows the feature
working.
Defaults to Llama 3.1 8B Instruct via a locally-running Ollama —
anecdotally one of the more hallucination-prone of the easily accessible
models. Pull the model once with ``ollama pull llama3.1:8b`` and make
sure ``ollama serve`` is running. Swap the LLM service to validate other
providers.
Run with::
uv run examples/features/features-add-tool-change-messages.py
ADD_TOOL_CHANGE_MESSAGES=0 uv run examples/features/features-add-tool-change-messages.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# Default ON so a fresh run shows the feature working. Set to "0" to A/B
# against the unmitigated behavior.
ADD_TOOL_CHANGE_MESSAGES = os.environ.get("ADD_TOOL_CHANGE_MESSAGES", "1") == "1"
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(
f"Starting add_tool_change_messages demo bot "
f"(ADD_TOOL_CHANGE_MESSAGES={ADD_TOOL_CHANGE_MESSAGES})"
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OLLamaLLMService(
settings=OLLamaLLMService.Settings(
# Llama 3.1 8B Instruct is anecdotally one of the more
# hallucination-prone of the easily accessible models — exactly
# what we want for this validation harness. Pull it with
# ``ollama pull llama3.1:8b`` and make sure ``ollama serve``
# is running.
model="llama3.1:8b",
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"If the user asks for the current weather, use the `get_current_weather` "
"function if it's available. IMPORTANT: if you do not have access to the function, "
"say something along the lines of 'Sorry, I can't check the weather right now.'."
),
),
)
llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
add_tool_change_messages=ADD_TOOL_CHANGE_MESSAGES,
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Phase controller: roughly 4 turns per phase.
user_turn_count = 0
REMOVE_AT_TURN = 5 # tool gone for turn N onward
READD_AT_TURN = 9 # tool back for turn N onward
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
nonlocal user_turn_count
user_turn_count += 1
logger.info(f"=== User turn {user_turn_count} complete ===")
if user_turn_count == REMOVE_AT_TURN - 1:
logger.info(
"=== Phase 1: weather tool REMOVED. Keep asking about the weather "
"to exercise hallucination scenarios. ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=NOT_GIVEN))
elif user_turn_count == READD_AT_TURN - 1:
logger.info(
"=== Phase 2: weather tool RE-ADDED. Ask for the weather again — "
"does the LLM call it, or keep refusing? (THIS IS THE TEST.) ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=weather_tools))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Phase 0: weather tool ACTIVE. Ask for the weather a few times "
"to confirm it's working. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite them "
"to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,23 +4,33 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example demonstrating ``PipelineTask(tool_resources=...)``.
"""Example demonstrating ``PipelineTask(app_resources=...)``.
``tool_resources`` is an application-defined bag of anything you want every
tool handler in a session to share by reference: database handles, HTTP
clients, feature flags, per-user state, observability clients, in-memory
caches whatever fits your app. Pipecat passes it through untouched as
``FunctionCallParams.tool_resources``.
``app_resources`` is an application-defined bag of anything your
application code may want to share across a session: database handles,
HTTP clients, feature flags, per-user state, observability clients,
in-memory caches whatever fits your app. Pipecat passes it through
untouched and exposes it as ``task.app_resources``, so any code with a
handle on the task can read or mutate it.
This example uses a small ``ToolCallLogger`` as a stand-in for that "shared
thing". A real app might just as easily pass a Postgres pool, a Redis
client, a Stripe SDK instance, or any combination thereof. The mechanics
shown here construct once, hand to the task, read it from each handler,
inspect it after the session are the same regardless of what you put in.
Two of the convenience aliases exercised below:
We bundle resources in a typed ``SessionResources`` dataclass and cast back
to it at the top of each handler. Pipecat doesn't care what type you pass
(a plain dict works too), but a typed container gives you autocomplete and
- Tool handlers read it from ``FunctionCallParams.app_resources``.
- Custom ``FrameProcessor`` subclasses read it from
``self.pipeline_task.app_resources``.
This example uses two small loggers as stand-ins for that "shared thing":
``ToolCallLogger`` (written from tool handlers) and
``TranscriptionLogger`` (written from a custom ``FrameProcessor`` that
sits in the pipeline). A real app might just as easily pass a Postgres
pool, a Redis client, a Stripe SDK instance, or any combination thereof.
The mechanics shown here construct once, hand to the task, read it
from each site, inspect it after the session are the same regardless
of what you put in.
We bundle resources in a typed ``AppResources`` dataclass and cast back
to it at each read site. Pipecat doesn't care what type you pass (a
plain dict works too), but a typed container gives you autocomplete and
refactor safety instead of dict-by-string-key lookups.
"""
@@ -28,7 +38,7 @@ import json
import os
from collections.abc import Mapping
from dataclasses import dataclass
from datetime import UTC, datetime, timezone
from datetime import UTC, datetime
from typing import Any, cast
from dotenv import load_dotenv
@@ -37,7 +47,7 @@ from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.frames.frames import Frame, LLMRunFrame, TranscriptionFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -46,6 +56,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -86,30 +97,80 @@ class ToolCallLogger:
return json.dumps(self._calls, indent=2)
class TranscriptionLogger:
"""Records final user transcriptions — written from a custom FrameProcessor."""
def __init__(self):
"""Initialize the logger with an empty list of recorded transcriptions."""
self._entries: list[dict[str, Any]] = []
def log_transcription(self, text: str) -> None:
"""Record a transcription.
Args:
text: The transcribed user utterance.
"""
entry = {
"timestamp": datetime.now(UTC).isoformat(),
"text": text,
}
self._entries.append(entry)
logger.info(f"[TranscriptionLogger] {text!r}")
def dump(self) -> str:
"""Return all recorded transcriptions as a JSON string."""
return json.dumps(self._entries, indent=2)
@dataclass
class SessionResources:
"""Typed container for everything the tool handlers in this session share.
class AppResources:
"""Typed container for everything the app shares across this session.
Add fields here as the app grows (e.g. ``db: AsyncConnection``,
``http: httpx.AsyncClient``). Handlers ``cast()`` ``params.tool_resources``
to this type to get autocomplete and refactor safety.
``http: httpx.AsyncClient``). Read sites ``cast()`` to this type to
get autocomplete and refactor safety:
- In tools: ``cast(AppResources, params.app_resources)``.
- In custom processors: ``cast(AppResources, self.pipeline_task.app_resources)``.
"""
tool_call_logger: ToolCallLogger
transcription_logger: TranscriptionLogger
async def fetch_weather_from_api(params: FunctionCallParams):
resources = cast(SessionResources, params.tool_resources)
resources = cast(AppResources, params.app_resources)
resources.tool_call_logger.log_tool_call(params.function_name, params.arguments)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
resources = cast(SessionResources, params.tool_resources)
resources = cast(AppResources, params.app_resources)
resources.tool_call_logger.log_tool_call(params.function_name, params.arguments)
await params.result_callback({"name": "The Golden Dragon"})
class TranscriptionLoggingProcessor(FrameProcessor):
"""Logs each final user transcription into the shared app resources.
Demonstrates the second read site for ``app_resources``: any custom
``FrameProcessor`` can reach the same bag every tool handler sees by
going through ``self.pipeline_task.app_resources``. ``pipeline_task``
is ``None`` until the task sets the processor up, so we guard against
that case.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Forward all frames; log final user transcriptions on the way through."""
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame) and self.pipeline_task is not None:
resources = cast(AppResources, self.pipeline_task.app_resources)
resources.transcription_logger.log_transcription(frame.text)
await self.push_frame(frame, direction)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
@@ -203,6 +264,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
[
transport.input(),
stt,
TranscriptionLoggingProcessor(),
user_aggregator,
llm,
tts,
@@ -211,10 +273,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
]
)
# Keep a local handle so we can read collected state after the session
# Keep local handles so we can read collected state after the session
# ends; Pipecat never copies or clears the object.
tool_call_logger = ToolCallLogger()
resources = SessionResources(tool_call_logger=tool_call_logger)
transcription_logger = TranscriptionLogger()
resources = AppResources(
tool_call_logger=tool_call_logger,
transcription_logger=transcription_logger,
)
task = PipelineTask(
pipeline,
@@ -223,7 +289,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
tool_resources=resources,
app_resources=resources,
)
@transport.event_handler("on_client_connected")
@@ -246,6 +312,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# The session has ended; read whatever state the handlers built up.
logger.info(f"Tool calls logged during session:\n{tool_call_logger.dump()}")
logger.info(f"Transcriptions logged during session:\n{transcription_logger.dump()}")
async def bot(runner_args: RunnerArguments):

View File

@@ -0,0 +1,187 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual demonstration of the missing-handler (developer-error) recovery path.
When a tool is advertised to the LLM via ``tools``/``LLMContext`` but
the developer forgets to call ``llm.register_function(...)`` to wire up
its handler, the LLM happily emits a tool call and then... nothing
happens on the Pipecat side, leaving the conversation stuck.
Pipecat's recovery path (``LLMService._missing_function_call_handler``)
catches this case:
- Logs a ``logger.error`` distinguishing **developer error** (tool advertised
but no handler registered) from a hallucination (tool not advertised),
pointing at the missing ``register_function`` call.
- Returns a neutral terminal tool result
(``LLMService.MISSING_FUNCTION_CALL_MESSAGE_TEMPLATE``: "The function
`X` is not currently available.") so the call still terminates with a
normal tool result instead of leaving the conversation stuck.
This example is **deliberately broken**: the weather schema is in
``tools`` but ``register_function`` is *not* called. Ask the bot about
the weather and observe:
1. The LLM emits a tool call for ``get_current_weather``.
2. ``logger.error`` fires with "advertised … but has no registered handler
— did you forget to call register_function()?"
3. The terminal tool result is fed back to the LLM.
4. The LLM responds in voice based on that result (typically something
like "the weather function isn't available right now").
Uses the OpenAI LLM service with defaults. Swap to another provider to
validate this behavior elsewhere.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting missing-handler demo bot (no handler is registered on purpose)")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"Always use the get_current_weather function to answer questions "
"about the current weather."
),
),
)
# *** DELIBERATELY OMITTED ***
# The whole point of this example is to demonstrate the missing-handler
# recovery path. Re-add this line to wire the tool up correctly:
#
# llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Ask for the weather. Watch for a logger.error about the missing "
"handler, and listen for the LLM's response based on the recovery "
"message. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite "
"them to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,184 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the AWS Nova Sonic LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Nova Sonic via the
formal toolResult channel so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
llm = AWSNovaSonicLLMService(
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
region=os.environ["AWS_REGION"],
session_token=os.getenv("AWS_SESSION_TOKEN"),
settings=AWSNovaSonicLLMService.Settings(
voice="tiffany",
system_instruction=system_instruction,
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -46,11 +46,6 @@ async def fetch_weather_from_api(params: FunctionCallParams):
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
# Simulate a long network delay.
# You can continue chatting while waiting for this to complete.
# With Nova 2 Sonic (the default model), the assistant will respond
# appropriately once the function call is complete.
await asyncio.sleep(5)
await params.result_callback(
{
"conditions": "nice",
@@ -150,9 +145,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register function for function calls
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function(
"get_current_weather", fetch_weather_from_api, cancel_on_interruption=False
)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Set up context and context management.
context = LLMContext(tools=tools)

View File

@@ -0,0 +1,195 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Azure Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Azure Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = AzureRealtimeLLMService(
api_key=os.environ["AZURE_REALTIME_API_KEY"],
base_url=os.environ["AZURE_REALTIME_BASE_URL"],
settings=AzureRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(model="whisper-1"),
)
),
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,15 +4,25 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Gemini Live LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Gemini Live as a
FunctionResponse so the model can integrate it naturally into its next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -31,33 +41,55 @@ load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. google_search: Use this tool to search the web for information.
"""
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
@@ -77,42 +109,6 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.environ["GOOGLE_API_KEY"],
settings=GeminiLiveLLMService.Settings(
@@ -121,13 +117,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
tools=tools,
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
# You can provide the system instructions and tools in the context rather
# than as arguments to GeminiLiveLLMService, but note that doing so will
# trigger a (fast) reconnection when the GeminiLiveLLMService first
# receives the context (i.e. when we send the LLMRunFrame below).
context = LLMContext()
# Server-side VAD is enabled by default; no local VAD is added.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
@@ -154,7 +149,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
@@ -166,7 +160,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -6,10 +6,13 @@
import os
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -23,6 +26,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -30,6 +34,32 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. google_search: Use this tool to search the web for information.
"""
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
@@ -51,23 +81,55 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.environ["GOOGLE_API_KEY"],
settings=GeminiLiveLLMService.Settings(
system_instruction=system_instruction,
voice="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."
),
# inference_on_context_initialization=False,
tools=tools,
)
context = LLMContext(
[
{
"role": "user",
"content": "Say hello. Then ask if I want to hear a joke.",
},
],
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
context = LLMContext()
# Server-side VAD is enabled by default; no local VAD is added.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
@@ -94,6 +156,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -0,0 +1,179 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Grok Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Grok Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.xai.realtime.events import SessionProperties
from pipecat.services.xai.realtime.llm import GrokRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
# Note: Grok has built-in server-side VAD, so we don't need local VAD.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GrokRealtimeLLMService(
api_key=os.environ["XAI_API_KEY"],
settings=GrokRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
voice="Ara",
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,198 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the OpenAI Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to OpenAI Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioNoiseReduction,
InputAudioTranscription,
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=SemanticTurnDetection(),
noise_reduction=InputAudioNoiseReduction(type="near_field"),
)
),
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -232,6 +232,20 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
# [LLMUpdateSettingsFrame(settings=SessionProperties(tools=new_tools).model_dump())]
# )
# Reasoning effort can be changed at runtime too. Only
# reasoning-capable Realtime models (e.g. gpt-realtime-2) support this.
# await task.queue_frames(
# [
# LLMUpdateSettingsFrame(
# delta=OpenAIRealtimeLLMService.Settings(
# session_properties=SessionProperties(
# reasoning=Reasoning(effort="xhigh"),
# ),
# )
# )
# ]
# )
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")

View File

@@ -0,0 +1,186 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Ultravox Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
Ultravox's API freezes the conversation between ``client_tool_invocation``
and the matching ``client_tool_result``, so the service ships a placeholder
``client_tool_result`` immediately when an async-registered function is
invoked (to unfreeze the conversation). When the real tool finishes, the
actual result is injected as user-side text so the model picks it up.
"""
import asyncio
import datetime
import os
import random
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
system_prompt = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = UltravoxRealtimeLLMService(
params=OneShotInputParams(
api_key=os.environ["ULTRAVOX_API_KEY"],
system_prompt=system_prompt,
temperature=0.3,
max_duration=datetime.timedelta(minutes=3),
),
one_shot_selected_tools=ToolsSchema(standard_tools=[weather_function]),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext([])
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
vad_analyzer=SileroVADAnalyzer(),
),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -10,7 +10,7 @@ Demonstrates LLM-based turn completion detection to suppress bot responses when
the user was cut off mid-thought. The LLM outputs one of three markers:
- ✓ (complete): User finished their thought, respond normally
- ○ (incomplete short): User was cut off, wait ~5s then prompt
- ◐ (incomplete long): User needs time to think, wait ~15s then prompt
- ◐ (incomplete long): User needs time to think, wait ~10s then prompt
When incomplete is detected, the bot's response is suppressed. After the timeout
expires, the LLM is automatically prompted to re-engage the user.
@@ -41,6 +41,7 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies
load_dotenv(override=True)
@@ -83,23 +84,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
context = LLMContext()
# `FilterIncompleteUserTurnStrategies` pairs the default detector
# chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors
# trigger LLM inference but the public `on_user_turn_stopped` event
# fires only when the LLM confirms ✓. The LLM marks each response
# with one of:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 10s, then prompt)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
# Enable turn completion filtering - the LLM will output:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 15s, then prompt)
filter_incomplete_user_turns=True,
# Optional: customize turn completion behavior
# turn_completion_config=TurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=15.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
user_turn_strategies=FilterIncompleteUserTurnStrategies(
# Optional: customize turn completion behavior
# config=UserTurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=10.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
),
),
)

View File

@@ -0,0 +1,174 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
import datetime
import io
import os
import wave
import aiofiles
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.nvidia.llm import NvidiaLLMService
from pipecat.services.nvidia.sagemaker.stt import NvidiaSageMakerSTTService
from pipecat.services.nvidia.sagemaker.tts import NvidiaSageMakerTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def save_audio_file(audio: bytes, filename: str, sample_rate: int, num_channels: int):
"""Save audio data to a WAV file."""
if len(audio) > 0:
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())
logger.info(f"Audio saved to {filename}")
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = NvidiaSageMakerSTTService(
endpoint_name=os.environ["SAGEMAKER_ASR_ENDPOINT_NAME"],
region=os.getenv("AWS_REGION", "us-west-2"),
)
llm = NvidiaLLMService(
api_key=os.environ["NVIDIA_API_KEY"],
settings=NvidiaLLMService.Settings(
model="meta/llama-3.3-70b-instruct",
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
tts = NvidiaSageMakerTTSService(
endpoint_name=os.environ["SAGEMAKER_MAGPIE_ENDPOINT_NAME"],
region=os.getenv("AWS_REGION", "us-west-2"),
)
audiobuffer = AudioBufferProcessor()
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
audiobuffer, # Audio buffer for recording
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Start recording audio
await audiobuffer.start_recording()
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# Handler for merged audio
@audiobuffer.event_handler("on_audio_data")
async def on_audio_data(buffer, audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recordings/merged_{timestamp}.wav"
os.makedirs("recordings", exist_ok=True)
await save_audio_file(audio, filename, sample_rate, num_channels)
# Handler for separate tracks
@audiobuffer.event_handler("on_track_audio_data")
async def on_track_audio_data(buffer, user_audio, bot_audio, sample_rate, num_channels):
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
os.makedirs("recordings", exist_ok=True)
user_filename = f"recordings/user_{timestamp}.wav"
await save_audio_file(user_audio, user_filename, sample_rate, 1)
bot_filename = f"recordings/bot_{timestamp}.wav"
await save_audio_file(bot_audio, bot_filename, sample_rate, 1)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -6,116 +6,54 @@
"exclude": ["**/*_pb2.py", "**/__pycache__"],
"ignore": [
"tests",
"src/pipecat/adapters/services/anthropic_adapter.py",
"src/pipecat/adapters/services/aws_nova_sonic_adapter.py",
"src/pipecat/adapters/services/bedrock_adapter.py",
"src/pipecat/adapters/services/gemini_adapter.py",
"src/pipecat/adapters/services/grok_realtime_adapter.py",
"src/pipecat/adapters/services/inworld_realtime_adapter.py",
"src/pipecat/adapters/services/open_ai_adapter.py",
"src/pipecat/adapters/services/open_ai_realtime_adapter.py",
"src/pipecat/adapters/services/open_ai_responses_adapter.py",
"src/pipecat/adapters/services/perplexity_adapter.py",
"src/pipecat/audio/dtmf/utils.py",
"src/pipecat/audio/filters/aic_filter.py",
"src/pipecat/audio/filters/krisp_viva_filter.py",
"src/pipecat/audio/filters/rnnoise_filter.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v2.py",
"src/pipecat/audio/turn/smart_turn/local_smart_turn_v3.py",
"src/pipecat/audio/vad/silero.py",
"src/pipecat/processors/aggregators/llm_context.py",
"src/pipecat/processors/aggregators/llm_response_universal.py",
"src/pipecat/processors/frame_processor.py",
"src/pipecat/processors/frameworks/langchain.py",
"src/pipecat/processors/frameworks/rtvi/observer.py",
"src/pipecat/processors/frameworks/rtvi/processor.py",
"src/pipecat/processors/frameworks/strands_agents.py",
"src/pipecat/services/anthropic/llm.py",
"src/pipecat/services/assemblyai/stt.py",
"src/pipecat/services/aws/agent_core.py",
"src/pipecat/services/aws/llm.py",
"src/pipecat/services/aws/nova_sonic/llm.py",
"src/pipecat/services/aws/sagemaker/bidi_client.py",
"src/pipecat/services/aws/stt.py",
"src/pipecat/services/aws/tts.py",
"src/pipecat/services/aws/utils.py",
"src/pipecat/services/azure/stt.py",
"src/pipecat/services/azure/tts.py",
"src/pipecat/services/cartesia/stt.py",
"src/pipecat/services/cartesia/tts.py",
"src/pipecat/services/deepgram/flux/base.py",
"src/pipecat/services/deepgram/flux/sagemaker/stt.py",
"src/pipecat/services/deepgram/flux/stt.py",
"src/pipecat/services/deepgram/sagemaker/stt.py",
"src/pipecat/services/deepgram/sagemaker/tts.py",
"src/pipecat/services/deepgram/tts.py",
"src/pipecat/services/elevenlabs/stt.py",
"src/pipecat/services/elevenlabs/tts.py",
"src/pipecat/services/fish/tts.py",
"src/pipecat/services/gladia/stt.py",
"src/pipecat/services/google/gemini_live/llm.py",
"src/pipecat/services/google/gemini_live/vertex/llm.py",
"src/pipecat/services/google/image.py",
"src/pipecat/services/google/llm.py",
"src/pipecat/services/google/stt.py",
"src/pipecat/services/google/tts.py",
"src/pipecat/services/gradium/stt.py",
"src/pipecat/services/groq/tts.py",
"src/pipecat/services/heygen/api_interactive_avatar.py",
"src/pipecat/services/heygen/base_api.py",
"src/pipecat/services/heygen/client.py",
"src/pipecat/services/heygen/video.py",
"src/pipecat/services/hume/tts.py",
"src/pipecat/services/inworld/realtime/llm.py",
"src/pipecat/services/inworld/tts.py",
"src/pipecat/services/kokoro/tts.py",
"src/pipecat/services/llm_service.py",
"src/pipecat/services/lmnt/tts.py",
"src/pipecat/services/mem0/memory.py",
"src/pipecat/services/mistral/stt.py",
"src/pipecat/services/mistral/tts.py",
"src/pipecat/services/moondream/vision.py",
"src/pipecat/services/neuphonic/tts.py",
"src/pipecat/services/nvidia/stt.py",
"src/pipecat/services/nvidia/tts.py",
"src/pipecat/services/openai/base_llm.py",
"src/pipecat/services/openai/image.py",
"src/pipecat/services/openai/llm.py",
"src/pipecat/services/openai/realtime/llm.py",
"src/pipecat/services/openai/responses/llm.py",
"src/pipecat/services/openai/stt.py",
"src/pipecat/services/openai/tts.py",
"src/pipecat/services/openrouter/llm.py",
"src/pipecat/services/piper/tts.py",
"src/pipecat/services/resembleai/tts.py",
"src/pipecat/services/rime/tts.py",
"src/pipecat/services/sambanova/llm.py",
"src/pipecat/services/sarvam/stt.py",
"src/pipecat/services/sarvam/tts.py",
"src/pipecat/services/simli/video.py",
"src/pipecat/services/smallest/tts.py",
"src/pipecat/services/soniox/stt.py",
"src/pipecat/services/speechmatics/stt.py",
"src/pipecat/services/stt_service.py",
"src/pipecat/services/tavus/video.py",
"src/pipecat/services/tts_service.py",
"src/pipecat/services/ultravox/llm.py",
"src/pipecat/services/websocket_service.py",
"src/pipecat/services/whisper/stt.py",
"src/pipecat/services/xai/realtime/llm.py",
"src/pipecat/services/xtts/tts.py",
"src/pipecat/transports/base_output.py",
"src/pipecat/transports/daily/transport.py",
"src/pipecat/transports/heygen/transport.py",
"src/pipecat/transports/lemonslice/transport.py",
"src/pipecat/transports/livekit/transport.py",
"src/pipecat/transports/smallwebrtc/connection.py",
"src/pipecat/transports/smallwebrtc/request_handler.py",
"src/pipecat/transports/smallwebrtc/transport.py",
"src/pipecat/transports/tavus/transport.py",
"src/pipecat/transports/websocket/client.py",
"src/pipecat/transports/websocket/server.py",
"src/pipecat/transports/whatsapp/client.py"
"src/pipecat/transports/websocket/server.py"
],
"reportMissingImports": false
}

View File

@@ -223,12 +223,11 @@ TESTS_REALTIME = [
# ("realtime/realtime-azure.py", EVAL_WEATHER),
("realtime/realtime-openai-text.py", EVAL_WEATHER),
("realtime/realtime-openai-live-video.py", EVAL_VISION_CAMERA),
("realtime/realtime-gemini-live.py", EVAL_SIMPLE_MATH),
("realtime/realtime-gemini-live.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-local-vad.py", EVAL_SIMPLE_MATH),
("realtime/realtime-gemini-live-function-calling.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-video.py", EVAL_VISION_CAMERA),
("realtime/realtime-gemini-live-google-search.py", EVAL_ONLINE_SEARCH),
("realtime/realtime-gemini-live-vertex-function-calling.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-vertex.py", EVAL_WEATHER),
("realtime/realtime-aws-nova-sonic.py", EVAL_SIMPLE_MATH),
("realtime/realtime-ultravox.py", EVAL_ORDER),
("realtime/realtime-grok.py", EVAL_WEATHER),

View File

@@ -9,7 +9,7 @@
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict, TypeGuard, TypeVar
from typing import Any, TypedDict, TypeGuard, TypeVar, cast
from anthropic import NOT_GIVEN, NotGiven
from anthropic.types.message_param import MessageParam
@@ -121,16 +121,20 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
messages = self._from_universal_context_messages(self.get_messages(context)).messages
# Sanitize messages for logging
messages_for_logging = []
messages_for_logging: list[dict[str, Any]] = []
for message in messages:
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item["type"] == "image":
item["source"]["data"] = "..."
if item["type"] == "thinking" and item.get("signature"):
item["signature"] = "..."
msg: dict[str, Any] = copy.deepcopy(dict(message))
content = msg.get("content")
if isinstance(content, list):
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") == "image":
source = item.get("source")
if isinstance(source, dict):
source["data"] = "..."
if item.get("type") == "thinking" and item.get("signature"):
item["signature"] = "..."
messages_for_logging.append(msg)
return messages_for_logging
@@ -185,8 +189,13 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
]
if isinstance(next_message["content"], str):
next_message["content"] = [{"type": "text", "text": next_message["content"]}]
# Concatenate the content
current_message["content"].extend(next_message["content"])
# Concatenate the content. MessageParam types content as
# `str | Iterable[...]`, but this codebase assumes it's
# either a str or a list. The str case is handled above, so
# we assume that both are lists here.
cast(list[Any], current_message["content"]).extend(
cast(list[Any], next_message["content"])
)
# Remove the next message from the list
messages.pop(i + 1)
else:
@@ -239,7 +248,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
}
# Fall back to assuming that the message is already in Anthropic format
return copy.deepcopy(message.message)
return cast(MessageParam, copy.deepcopy(message.message))
def _from_standard_message(self, message: LLMStandardMessage) -> MessageParam:
"""Convert standard universal context message to Anthropic format.
@@ -280,20 +289,26 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
]
}
"""
message = copy.deepcopy(message)
if message["role"] == "tool":
return {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": message["tool_call_id"],
"content": message["content"],
},
],
}
if message.get("tool_calls"):
tc = message["tool_calls"]
# ChatCompletionMessageParam (input) and MessageParam (output) are
# different TypedDicts — work with the message as a plain dict for the
# transformations below and cast back to MessageParam at return sites.
msg = cast(dict[str, Any], copy.deepcopy(message))
if msg["role"] == "tool":
return cast(
MessageParam,
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": msg["tool_call_id"],
"content": msg["content"],
},
],
},
)
if msg.get("tool_calls"):
tc = msg["tool_calls"]
ret = {"role": "assistant", "content": []}
for tool_call in tc:
function = tool_call["function"]
@@ -305,8 +320,8 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
"input": arguments,
}
ret["content"].append(new_tool_use)
return ret
content = message.get("content")
return cast(MessageParam, ret)
content = msg.get("content")
if isinstance(content, str):
# fix empty text
if content == "":
@@ -354,7 +369,7 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
image_item = content.pop(img_idx)
content.insert(first_txt_idx, image_item)
return message
return cast(MessageParam, msg)
def _with_cache_control_markers(self, messages: list[MessageParam]) -> list[MessageParam]:
"""Add cache control markers to messages for prompt caching.
@@ -369,7 +384,16 @@ class AnthropicLLMAdapter(BaseLLMAdapter[AnthropicLLMInvocationParams]):
def add_cache_control_marker(message: MessageParam):
if isinstance(message["content"], str):
message["content"] = [{"type": "text", "text": message["content"]}]
message["content"][-1]["cache_control"] = {"type": "ephemeral"}
# Assumptions on the next line:
# - content is a list (str case handled above; this codebase only
# ever constructs content as a str or a list)
# - the list is non-empty (guaranteed by the empty-content
# replacement in `_from_universal_context_messages`)
# - the last item is a dict. The standard-message path enforces
# this via TypedDicts (which are dicts at runtime); the
# LLMSpecificMessage passthrough doesn't, but in practice
# callers use dicts.
cast(list[Any], message["content"])[-1]["cache_control"] = {"type": "ephemeral"}
try:
# Add cache control markers to the most recent two user messages.

View File

@@ -8,9 +8,9 @@
import copy
import json
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from enum import Enum
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -110,7 +110,10 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Returns:
List of messages in a format ready for logging about AWS Nova Sonic.
"""
return self._from_universal_context_messages(self.get_messages(context)).messages
return [
asdict(m)
for m in self._from_universal_context_messages(self.get_messages(context)).messages
]
@dataclass
class ConvertedMessages:
@@ -123,18 +126,27 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
self, universal_context_messages: list[LLMContextMessage]
) -> ConvertedMessages:
system_instruction = None
messages = []
messages: list[AWSNovaSonicConversationHistoryMessage] = []
# Bail if there are no messages
if not universal_context_messages:
return self.ConvertedMessages()
return self.ConvertedMessages(messages=[])
universal_context_messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below (the role-extraction and conversion
# logic only applies to standard message dicts). If/when this
# adapter grows a per-provider passthrough like the Anthropic
# adapter has, LLMSpecific items can flow through.
ucm: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
# If we have a "system" message as our first message,
# pull that out into "instruction"
if universal_context_messages[0].get("role") == "system":
system = universal_context_messages.pop(0)
if ucm and ucm[0].get("role") == "system":
system = ucm.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
@@ -145,19 +157,21 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
# Convert any remaining "system"/"developer" messages to "user",
# as Nova Sonic only supports "user" and "assistant" in history.
for msg in universal_context_messages:
for msg in ucm:
if msg.get("role") in ("system", "developer"):
msg["role"] = "user"
# Process remaining messages to fill out conversation history.
for universal_context_message in universal_context_messages:
for universal_context_message in ucm:
message = self._from_universal_context_message(universal_context_message)
if message:
messages.append(message)
return self.ConvertedMessages(messages=messages, system_instruction=system_instruction)
def _from_universal_context_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
def _from_universal_context_message(
self, message: dict[str, Any]
) -> AWSNovaSonicConversationHistoryMessage | None:
"""Convert standard message format to Nova Sonic format.
Args:
@@ -167,17 +181,18 @@ class AWSNovaSonicLLMAdapter(BaseLLMAdapter[AWSNovaSonicLLMInvocationParams]):
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
if role == "user" or role == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if isinstance(content, list):
text_parts = []
for c in content:
if c.get("type") == "text":
content += " " + c.get("text")
text_parts.append(c.get("text"))
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
content = " ".join(t for t in text_parts if t)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history

View File

@@ -10,7 +10,7 @@ import base64
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -68,16 +68,19 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
system_instruction,
discard_context_system=True,
)
return {
"system": [{"text": effective_system}] if effective_system else None,
"messages": converted.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
# To avoid refactoring in AWSBedrockLLMService, we just pass through tool_choice.
# Eventually (when we don't have to maintain the non-LLMContext code path) we should do
# the conversion to Bedrock's expected format here rather than in AWSBedrockLLMService.
"tool_choice": context.tool_choice,
}
return cast(
AWSBedrockLLMInvocationParams,
{
"system": [{"text": effective_system}] if effective_system else None,
"messages": converted.messages,
# NOTE: LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools) or [],
# To avoid refactoring in AWSBedrockLLMService, we just pass through tool_choice.
# Eventually (when we don't have to maintain the non-LLMContext code path) we should do
# the conversion to Bedrock's expected format here rather than in AWSBedrockLLMService.
"tool_choice": context.tool_choice,
},
)
def get_messages_for_logging(self, context) -> list[dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about AWS Bedrock.
@@ -213,35 +216,36 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
]
}
"""
message = copy.deepcopy(message)
if message["role"] == "tool":
# ChatCompletionMessageParam (input) and the dict shape Bedrock expects
# are different — work with the deepcopied message as a plain dict for
# the transformations below.
msg = cast(dict[str, Any], copy.deepcopy(message))
if msg["role"] == "tool":
# Try to parse the content as JSON if it looks like JSON
try:
if message["content"].strip().startswith("{") and message[
"content"
].strip().endswith("}"):
content_json = json.loads(message["content"])
if msg["content"].strip().startswith("{") and msg["content"].strip().endswith("}"):
content_json = json.loads(msg["content"])
tool_result_content = [{"json": content_json}]
else:
tool_result_content = [{"text": message["content"]}]
tool_result_content = [{"text": msg["content"]}]
except (json.JSONDecodeError, ValueError, AttributeError):
tool_result_content = [{"text": message["content"]}]
tool_result_content = [{"text": msg["content"]}]
return {
"role": "user",
"content": [
{
"toolResult": {
"toolUseId": message["tool_call_id"],
"toolUseId": msg["tool_call_id"],
"content": tool_result_content,
},
},
],
}
if message.get("tool_calls"):
tc = message["tool_calls"]
ret = {"role": "assistant", "content": []}
if msg.get("tool_calls"):
tc = msg["tool_calls"]
ret: dict[str, Any] = {"role": "assistant", "content": []}
for tool_call in tc:
function = tool_call["function"]
arguments = json.loads(function["arguments"])
@@ -256,12 +260,12 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
return ret
# Handle text content
content = message.get("content")
content = msg.get("content")
if isinstance(content, str):
if content == "":
return {"role": message["role"], "content": [{"text": "(empty)"}]}
return {"role": msg["role"], "content": [{"text": "(empty)"}]}
else:
return {"role": message["role"], "content": [{"text": content}]}
return {"role": msg["role"], "content": [{"text": content}]}
elif isinstance(content, list):
new_content = []
for item in content:
@@ -300,9 +304,9 @@ class AWSBedrockLLMAdapter(BaseLLMAdapter[AWSBedrockLLMInvocationParams]):
# Move image before the first text
image_item = new_content.pop(img_idx)
new_content.insert(first_txt_idx, image_item)
return {"role": message["role"], "content": new_content}
return {"role": msg["role"], "content": new_content}
return message
return msg
@staticmethod
def _to_bedrock_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -9,7 +9,7 @@
import base64
import json
from dataclasses import dataclass, field
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
from openai import NotGiven
@@ -139,6 +139,36 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
return formatted_standard_tools + custom_gemini_tools
@staticmethod
def to_function_response_dict(content: Any) -> dict[str, Any]:
"""Convert a tool-result content value to Gemini's FunctionResponse.response shape.
Gemini's ``FunctionResponse.response`` field requires a dict, so
non-dict values (e.g. plain strings, JSON-encoded scalars, or
sentinel strings like ``"COMPLETED"`` used when a function returned
no value) are wrapped as ``{"value": <value>}``. JSON strings that
decode to a dict are passed through as-is.
Args:
content: The tool-result content. Typically the JSON-encoded
return value of a function, but can also be a plain string
(e.g. ``"COMPLETED"``) or already-parsed dict.
Returns:
A dict suitable for ``FunctionResponse.response``.
"""
if isinstance(content, dict):
return content
if not isinstance(content, str):
return {"value": content}
try:
decoded = json.loads(content)
except (json.JSONDecodeError, ValueError):
return {"value": content}
if isinstance(decoded, dict):
return decoded
return {"value": decoded}
def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about Gemini.
@@ -154,9 +184,12 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
messages = self._from_universal_context_messages(self.get_messages(context)).messages
# Sanitize messages for logging
messages_for_logging = []
messages_for_logging: list[dict[str, Any]] = []
for message in messages:
obj = message.to_json_dict()
# `to_json_dict()` returns `dict[str, object]`; treat as a plain
# dict for the value indexing/mutation below. The broad `except`
# below is the safety net if any item isn't shaped as expected.
obj: dict[str, Any] = cast(dict[str, Any], message.to_json_dict())
try:
if "parts" in obj:
for part in obj["parts"]:
@@ -274,7 +307,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
# Check if we only have function-related messages (no regular text)
effective_system = extracted_system or system_instruction
has_regular_messages = any(
len(msg.parts) == 1
msg.parts is not None
and len(msg.parts) == 1
and getattr(msg.parts[0], "text", None)
and not getattr(msg.parts[0], "function_call", None)
and not getattr(msg.parts[0], "function_response", None)
@@ -346,8 +380,11 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
parts=[Part(function_call=FunctionCall(name="search", args={"query": "test"}))]
)
"""
role = message["role"]
content = message.get("content", [])
# ChatCompletionMessageParam (a union of TypedDicts) doesn't allow
# the dict-style key access used below; treat it as a plain dict.
msg = cast(dict[str, Any], message)
role = msg["role"]
content = msg.get("content", [])
# Convert non-initial system/developer messages to user role,
# as Gemini doesn't support these as input messages.
@@ -359,8 +396,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
parts = []
tool_call_id_to_name_mapping = {}
if message.get("tool_calls"):
for tc in message["tool_calls"]:
if msg.get("tool_calls"):
for tc in msg["tool_calls"]:
id = tc["id"]
name = tc["function"]["name"]
tool_call_id_to_name_mapping[id] = name
@@ -375,19 +412,10 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
)
elif role == "tool":
role = "user"
try:
response = json.loads(message["content"])
if isinstance(response, dict):
response_dict = response
else:
response_dict = {"value": response}
except Exception as e:
# Response might not be JSON-deserializable.
# This occurs with a UserImageFrame, for example, where we get a plain "COMPLETED" string.
response_dict = {"value": message["content"]}
response_dict = self.to_function_response_dict(msg["content"])
# Get function name from mapping using tool_call_id, or fallback
tool_call_id = message.get("tool_call_id")
tool_call_id = msg.get("tool_call_id")
function_name = "tool_call_result" # Default fallback
if tool_call_id and tool_call_id in params.tool_call_id_to_name_mapping:
function_name = params.tool_call_id_to_name_mapping[tool_call_id]
@@ -491,7 +519,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
def is_tool_call_message(msg: Content) -> bool:
"""Check if message contains only function_call parts."""
return (
return bool(
msg.role == "model"
and msg.parts
and all(getattr(part, "function_call", None) for part in msg.parts)
@@ -499,6 +527,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
def message_has_thought_signature(msg: Content) -> bool:
"""Check if any part in the message has a thought_signature."""
if msg.parts is None:
return False
return any(getattr(part, "thought_signature", None) for part in msg.parts)
merged_messages = []
@@ -564,6 +594,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
logger.debug(f"Thought signatures to apply: {len(thought_signature_dicts)}")
for ts in thought_signature_dicts:
bookmark = ts.get("bookmark")
if bookmark is None:
continue
if bookmark.get("function_call"):
logger.trace(f" - To function call: {bookmark['function_call']}")
elif bookmark.get("text"):
@@ -665,6 +697,8 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
if (
hasattr(part, "inline_data")
and part.inline_data
and part.inline_data.data is not None
and bookmark_inline_data.data is not None
# Comparing length should be good enough for matching inline data,
# especially since we're already matching thought signatures in
# strict message order. Comparing actual data is expensive.

View File

@@ -13,7 +13,7 @@ Grok's Voice Agent API.
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -85,7 +85,10 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
@dataclass
class ConvertedMessages:
@@ -111,11 +114,20 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below. Other adapters (e.g. Anthropic)
# dispatch LLMSpecific items through a per-provider passthrough.
# The pack-into-single-text-message strategy here doesn't compose
# with opaque per-provider payloads.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
system_instruction = None
# Extract system message as session instructions
if messages[0].get("role") == "system":
if messages and messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -133,7 +145,9 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
system_instruction=system_instruction,
)
@@ -181,26 +195,29 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
ConversationItem formatted for Grok Realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, list):
text_content = ""
text_parts = []
for c in content:
if c.get("type") == "text":
text_content += " " + c.get("text")
text_parts.append(c.get("text"))
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
f"Unhandled content type in context message: {c.get('type')} - {msg}"
)
content = text_content.strip()
content = " ".join(t for t in text_parts if t).strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
@@ -208,7 +225,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
@staticmethod
def _to_grok_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -13,7 +13,7 @@ Inworld's Realtime API.
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -85,7 +85,10 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
@dataclass
class ConvertedMessages:
@@ -111,11 +114,20 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` —
# those are filtered out below. Other adapters (e.g. Anthropic)
# dispatch LLMSpecific items through a per-provider passthrough.
# The pack-into-single-text-message strategy here doesn't compose
# with opaque per-provider payloads.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
system_instruction = None
# Extract system message as session instructions
if messages[0].get("role") == "system":
if messages and messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -133,7 +145,9 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
system_instruction=system_instruction,
)
@@ -181,26 +195,29 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
ConversationItem formatted for Inworld Realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, list):
text_content = ""
text_parts = []
for c in content:
if c.get("type") == "text":
text_content += " " + c.get("text")
text_parts.append(c.get("text"))
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
f"Unhandled content type in context message: {c.get('type')} - {msg}"
)
content = text_content.strip()
content = " ".join(t for t in text_parts if t).strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
@@ -208,7 +225,7 @@ class InworldRealtimeLLMAdapter(BaseLLMAdapter):
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
@staticmethod
def _to_inworld_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -127,12 +127,15 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
)
if system_instruction:
# Detect initial system message for warning purposes (don't extract)
initial_content = (
messages[0].get("content", "")
if messages and messages[0].get("role") == "system"
else None
)
# Detect initial system message for warning purposes (don't extract).
# ChatCompletionMessageParam.content is `str | Iterable[...]`; we
# only forward it for warning purposes, so coerce non-strings to
# None — the resolver handles None.
initial_content: str | None = None
if messages and messages[0].get("role") == "system":
raw_content = messages[0].get("content", "")
if isinstance(raw_content, str):
initial_content = raw_content
self._resolve_system_instruction(
initial_content,
system_instruction,
@@ -140,12 +143,15 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
)
messages = [{"role": "system", "content": system_instruction}] + messages
return {
"messages": messages,
# NOTE; LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools),
"tool_choice": _openai_from_llm_context_tool_choice(context.tool_choice),
}
return cast(
OpenAILLMInvocationParams,
{
"messages": messages,
# NOTE; LLMContext's tools are guaranteed to be a ToolsSchema (or NOT_GIVEN)
"tools": self.from_standard_tools(context.tools),
"tool_choice": _openai_from_llm_context_tool_choice(context.tool_choice),
},
)
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> list[ChatCompletionToolParam]:
"""Convert function schemas to OpenAI's function-calling format.
@@ -158,13 +164,19 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
with ChatCompletion API.
"""
functions_schema = tools_schema.standard_tools
formatted_standard_tools = [
ChatCompletionToolParam(type="function", function=func.to_default_dict())
# `function=...` expects a `FunctionDefinition` TypedDict; the dict
# produced by `to_default_dict()` is structurally compatible. Cast at
# the boundary.
formatted_standard_tools: list[ChatCompletionToolParam] = [
ChatCompletionToolParam(type="function", function=cast(Any, func.to_default_dict()))
for func in functions_schema
]
custom_openai_tools = []
custom_openai_tools: list[ChatCompletionToolParam] = []
if tools_schema.custom_tools:
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
custom_openai_tools = cast(
list[ChatCompletionToolParam],
tools_schema.custom_tools.get(AdapterType.OPENAI, []),
)
return formatted_standard_tools + custom_openai_tools
def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]:
@@ -178,7 +190,10 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
Returns:
List of messages in a format ready for logging about OpenAI.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
def _from_universal_context_messages(
self,

View File

@@ -9,7 +9,7 @@
import copy
import json
from dataclasses import dataclass
from typing import Any, TypedDict
from typing import Any, TypedDict, cast
from loguru import logger
@@ -81,7 +81,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages in a format ready for logging about OpenAI Realtime.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(list[dict[str, Any]], self.get_messages(context, truncate_large_values=True))
@dataclass
class ConvertedMessages:
@@ -101,12 +101,24 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
messages = copy.deepcopy(universal_context_messages)
# NOTE: This adapter does not yet handle ``LLMSpecificMessage`` — those
# are filtered out below. Other adapters (e.g. Anthropic) dispatch
# LLMSpecific items through a per-provider passthrough. For OpenAI
# Realtime, the strategy here packs a multi-message history into a
# single text message (see comment further down), which doesn't
# compose with opaque per-provider payloads. If/when this adapter
# adopts the per-message strategy, LLMSpecific items can flow
# through `_from_universal_context_message` like in other adapters.
messages: list[dict[str, Any]] = [
cast(dict[str, Any], m)
for m in copy.deepcopy(universal_context_messages)
if isinstance(m, dict)
]
system_instruction = None
# If we have a "system" message as our first message,
# pull that out into session "instructions"
if messages[0].get("role") == "system":
if messages and messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
@@ -124,7 +136,9 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
messages=[
self._from_universal_context_message(cast(LLMContextMessage, messages[0]))
],
system_instruction=system_instruction,
)
@@ -142,18 +156,18 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
return self.ConvertedMessages(
messages=[
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
events.ConversationItem(
role="user",
type="message",
content=[
events.ItemContent(
type="input_text",
text="\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
)
],
}
)
],
system_instruction=system_instruction,
)
@@ -161,31 +175,34 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
def _from_universal_context_message(
self, message: LLMContextMessage
) -> events.ConversationItem:
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
# NOTE: ``LLMSpecificMessage`` is not yet handled here — see the
# corresponding note in `_from_universal_context_messages`.
msg = cast(dict[str, Any], message)
if msg.get("role") == "user":
content = msg.get("content")
if isinstance(content, list):
content = ""
for c in message.get("content"):
for c in msg.get("content", []):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
f"Unhandled content type in context message: {c.get('type')} - {msg}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
if msg.get("role") == "assistant" and msg.get("tool_calls"):
tc = msg["tool_calls"][0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
raise ValueError(f"Unhandled message type in _from_universal_context_message: {msg}")
@staticmethod
def _to_openai_realtime_function_format(function: FunctionSchema) -> dict[str, Any]:

View File

@@ -6,7 +6,7 @@
"""OpenAI Responses API adapter for Pipecat."""
from typing import Any, TypedDict
from typing import Any, Required, TypedDict, cast
from openai._types import NotGiven as OpenAINotGiven
from openai.types.responses import FunctionToolParam, ResponseInputItemParam, ToolParam
@@ -23,8 +23,10 @@ from pipecat.processors.aggregators.llm_context import (
class OpenAIResponsesLLMInvocationParams(TypedDict, total=False):
"""Context-based parameters for invoking OpenAI Responses API."""
input: list[ResponseInputItemParam]
tools: list[ToolParam] | OpenAINotGiven
# `input` and `tools` are always populated by `get_llm_invocation_params`;
# `instructions` is only set when a system instruction is present.
input: Required[list[ResponseInputItemParam]]
tools: Required[list[ToolParam] | OpenAINotGiven]
instructions: str
@@ -64,8 +66,11 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
if system_instruction and messages:
first_msg = messages[0] if not isinstance(messages[0], LLMSpecificMessage) else None
if first_msg and first_msg.get("role") == "system":
# `content` is `str | Iterable[...]`; we only forward it for
# warning purposes. Coerce non-strings to None.
first_content = first_msg.get("content", "")
self._resolve_system_instruction(
first_msg.get("content", ""),
first_content if isinstance(first_content, str) else None,
system_instruction,
discard_context_system=False,
)
@@ -143,7 +148,10 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
Returns:
List of messages in a format ready for logging.
"""
return self.get_messages(context, truncate_large_values=True)
return cast(
list[dict[str, Any]],
self.get_messages(context, truncate_large_values=True),
)
def _convert_messages_to_input(
self, messages: list[LLMContextMessage]
@@ -169,13 +177,15 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "developer", "content": content})
result.append(
cast(ResponseInputItemParam, {"role": "developer", "content": content})
)
elif role == "user":
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "user", "content": content})
result.append(cast(ResponseInputItemParam, {"role": "user", "content": content}))
elif role == "assistant":
tool_calls = message.get("tool_calls")
@@ -194,7 +204,9 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
content = message.get("content", "")
if isinstance(content, list):
content = self._convert_multimodal_content(content)
result.append({"role": "assistant", "content": content})
result.append(
cast(ResponseInputItemParam, {"role": "assistant", "content": content})
)
elif role == "tool":
content = message.get("content", "")

View File

@@ -28,6 +28,7 @@ the messages are sent to Perplexity's API.
"""
import copy
from typing import Any, cast
from openai.types.chat import ChatCompletionMessageParam
@@ -116,7 +117,11 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
if not messages:
return messages
messages = copy.deepcopy(messages)
# ChatCompletionMessageParam is a union of TypedDicts; the
# transformations below mutate by key/index in ways those TypedDicts
# don't permit. Work against a plain-dict view for the duration of
# the transformation and cast back at the return site.
msgs: list[dict[str, Any]] = cast(list[dict[str, Any]], copy.deepcopy(messages))
# Note: "developer" → "user" conversion is handled by the parent adapter
# via the convert_developer_to_user parameter.
@@ -125,10 +130,10 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
# Perplexity allows system messages at the start, but rejects them
# after any non-system message.
in_initial_system_block = True
for i in range(len(messages)):
if messages[i].get("role") == "system":
for i in range(len(msgs)):
if msgs[i].get("role") == "system":
if not in_initial_system_block:
messages[i]["role"] = "user"
msgs[i]["role"] = "user"
else:
in_initial_system_block = False
@@ -137,9 +142,9 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
# messages that violate Perplexity's strict alternation requirement.
# Skip consecutive system messages at the start — Perplexity allows those.
i = 0
while i < len(messages) - 1:
current = messages[i]
next_msg = messages[i + 1]
while i < len(msgs) - 1:
current = msgs[i]
next_msg = msgs[i + 1]
if current["role"] == next_msg["role"] == "system":
# Perplexity allows multiple initial system messages, don't merge
i += 1
@@ -154,7 +159,7 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
next_msg.get("content"), list
):
current["content"].extend(next_msg["content"])
messages.pop(i + 1)
msgs.pop(i + 1)
else:
i += 1
@@ -162,7 +167,7 @@ class PerplexityLLMAdapter(OpenAILLMAdapter):
# Perplexity requires the last message to be "user" or "tool".
# OpenAI appears to silently ignore trailing assistant messages
# server-side, so removing them preserves equivalent behavior.
while messages and messages[-1].get("role") == "assistant":
messages.pop()
while msgs and msgs[-1].get("role") == "assistant":
msgs.pop()
return messages
return cast(list[ChatCompletionMessageParam], msgs)

View File

@@ -14,7 +14,7 @@ in-memory after first load to improve performance on subsequent accesses.
import asyncio
import io
import wave
from importlib.resources import files
from importlib.resources import as_file, files
import aiofiles
@@ -52,10 +52,12 @@ async def load_dtmf_audio(button: KeypadEntry, *, sample_rate: int = 8000) -> by
__DTMF_RESAMPLER__ = create_file_resampler()
dtmf_file_name = __DTMF_FILE_NAME.get(button, f"dtmf-{button.value}.wav")
dtmf_file_path = files("pipecat.audio.dtmf").joinpath(dtmf_file_name)
async with aiofiles.open(dtmf_file_path, "rb") as f:
data = await f.read()
# `as_file` materialises the resource as a real filesystem `Path`,
# which aiofiles can open. (For installed packages this is just the
# bundled file; for zipped distributions it would extract to a temp.)
with as_file(files("pipecat.audio.dtmf").joinpath(dtmf_file_name)) as dtmf_file_path:
async with aiofiles.open(dtmf_file_path, "rb") as f:
data = await f.read()
with io.BytesIO(data) as buffer:
with wave.open(buffer, "rb") as wf:

View File

@@ -60,7 +60,12 @@ class RNNoiseFilter(BaseAudioFilter):
self._sample_rate = sample_rate
try:
# RNNoise always requires 48kHz
# The module-level import sets `RNNoise` to `None` if pyrnnoise
# isn't installed; raise instead of calling `None(...)` so the
# except clause handles it cleanly.
if RNNoise is None:
raise ImportError("pyrnnoise is not installed")
# RNNoise always requires 48kHz.
self._rnnoise = RNNoise(sample_rate=48000)
self._rnnoise_ready = True
except Exception as e:
@@ -107,7 +112,7 @@ class RNNoiseFilter(BaseAudioFilter):
Returns:
Noise-suppressed audio data as bytes.
"""
if not self._rnnoise_ready or not self._filtering:
if not self._rnnoise_ready or not self._filtering or self._rnnoise is None:
return audio
# Resample input if needed

View File

@@ -10,6 +10,8 @@ This module provides an audio resampler that uses the resampy library
for high-quality audio sample rate conversion.
"""
import warnings
import numpy as np
import resampy
@@ -21,6 +23,11 @@ class ResampyResampler(BaseAudioResampler):
This resampler uses the resampy library's Kaiser windowing filter
for high-quality audio resampling with good performance characteristics.
.. deprecated:: 1.2.0
ResampyResampler is deprecated and will be removed in Pipecat 2.0.
Use SOXRAudioResampler, create_file_resampler(), or create_stream_resampler()
instead.
"""
def __init__(self, **kwargs):
@@ -29,7 +36,15 @@ class ResampyResampler(BaseAudioResampler):
Args:
**kwargs: Additional keyword arguments (currently unused).
"""
pass
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"ResampyResampler is deprecated and will be removed in Pipecat 2.0. "
"Use SOXRAudioResampler, create_file_resampler(), or "
"create_stream_resampler() instead.",
DeprecationWarning,
stacklevel=2,
)
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""Resample audio data using resampy library.

View File

@@ -339,6 +339,40 @@ class LLMTextFrame(TextFrame):
self.includes_inter_frame_spaces = True
@dataclass
class LLMMarkerFrame(DataFrame):
"""Sideband marker emitted by an LLM service.
A marker is short, structured assistant output that should be
persisted in the conversation context but should not flow through
the standard text path (TTS, transcript). The assistant aggregator
writes the marker to the context so the LLM can self-condition on
prior markers on subsequent turns.
The primary use today is the ``filter_incomplete_user_turns``
protocol, where ``UserTurnCompletionLLMServiceMixin`` emits the
turn-completion markers ✓ / ○ / ◐ on every response. The frame is
intentionally generic so other components — STT services with
built-in turn signals, end-of-turn classifiers, custom annotations,
etc. — can use the same mechanism to inject sideband signals into
the assistant context.
Parameters:
marker: The marker payload (typically a short string such as a
single character).
append_to_context_immediately: If True, the marker is written
to the context as its own standalone assistant message as
soon as it's received. If False, the marker is appended to
the running assistant aggregation and flushed to the
context together with the following text as a single
message (e.g. for the ✓ case the context message ends up
as "✓ <response>").
"""
marker: str
append_to_context_immediately: bool = True
@dataclass
class AggregatedTextFrame(TextFrame):
"""Text frame representing an aggregation of TextFrames.
@@ -661,6 +695,11 @@ class FunctionCallResultProperties:
is_final: Whether this is the final result for the function call. When
``False`` the result is treated as an intermediate update. Defaults to ``True``.
Only meaningful for async function calls (``cancel_on_interruption=False``).
Note: realtime LLM services do not support streamed intermediate
results; they deliver only the final result to the provider. An
intermediate result reported to a realtime service is dropped
and an error is raised. Use a non-realtime LLM service if your
tool needs to stream intermediate results.
"""
run_llm: bool | None = None
@@ -970,6 +1009,24 @@ class UserSpeakingFrame(SystemFrame):
pass
@dataclass
class UserTurnInferenceCompletedFrame(SystemFrame):
"""Frame indicating that the user turn is semantically complete.
Emitted by any component that can judge conversational turn
completeness — for example an LLM with turn-completion markers, an
STT service with built-in turn detection, or a dedicated
end-of-turn classifier. Stop strategies that gate the
user-turn-stop event on an external completeness signal (e.g.
``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to
finalize the turn. Producers should emit this frame only when they
judge the turn complete; an absence of this frame means the turn is
not yet considered complete.
"""
pass
@dataclass
class VADUserStartedSpeakingFrame(SystemFrame):
"""Frame emitted when VAD definitively detects user started speaking.

View File

@@ -14,6 +14,7 @@ including heartbeats, idle detection, and observer integration.
import asyncio
import importlib.util
import os
import warnings
from collections.abc import AsyncIterable, Iterable
from pathlib import Path
from typing import Any, TypeVar
@@ -193,6 +194,7 @@ class PipelineTask(BasePipelineTask):
*,
params: PipelineParams | None = None,
additional_span_attributes: dict | None = None,
app_resources: Any = None,
cancel_on_idle_timeout: bool = True,
cancel_timeout_secs: float = CANCEL_TIMEOUT_SECS,
check_dangling_tasks: bool = True,
@@ -216,6 +218,14 @@ class PipelineTask(BasePipelineTask):
params: Configuration parameters for the pipeline.
additional_span_attributes: Optional dictionary of attributes to propagate as
OpenTelemetry conversation span attributes.
app_resources: Optional application-defined bag of anything your
application code may want to share across this session (DB
handles, HTTP clients, etc.), passed by reference. Pipecat
passes it through untouched and exposes it on the task itself
as ``task.app_resources`` and passes it to tool handlers as
``FunctionCallParams.app_resources``. The framework never
copies or clears this object; the caller retains their handle
and can read any mutations after the task finishes.
cancel_on_idle_timeout: Whether the pipeline task should be cancelled if
the idle timeout is reached.
cancel_timeout_secs: Timeout (in seconds) to wait for cancellation to happen
@@ -235,13 +245,24 @@ class PipelineTask(BasePipelineTask):
rtvi_observer_params: The RTVI observer parameter to use if RTVI is enabled.
rtvi_processor: The RTVI processor to add if RTVI is enabled.
task_manager: Optional task manager for handling asyncio tasks.
tool_resources: Optional application-defined bag of resources (DB handles,
clients, state, etc.) passed by reference to every tool handler via
``FunctionCallParams.tool_resources``. The framework never copies or
clears this object; the caller retains their handle and can read any
mutations after the task finishes.
tool_resources: Deprecated alias for ``app_resources``.
.. deprecated:: 1.2.0
Use ``app_resources`` instead. ``tool_resources`` will be
removed in a future version.
"""
super().__init__()
if tool_resources is not None:
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`PipelineTask(tool_resources=...)` is deprecated since 1.2.0, "
"use `app_resources` instead.",
DeprecationWarning,
stacklevel=2,
)
if app_resources is None:
app_resources = tool_resources
self._params = params or PipelineParams()
self._additional_span_attributes = additional_span_attributes or {}
self._cancel_on_idle_timeout = cancel_on_idle_timeout
@@ -252,7 +273,7 @@ class PipelineTask(BasePipelineTask):
self._enable_tracing = enable_tracing and is_tracing_available()
self._enable_turn_tracking = enable_turn_tracking
self._idle_timeout_secs = idle_timeout_secs
self._tool_resources = tool_resources
self._app_resources = app_resources
observers = observers or []
self._turn_tracking_observer: TurnTrackingObserver | None = None
self._user_bot_latency_observer: UserBotLatencyObserver | None = None
@@ -282,7 +303,7 @@ class PipelineTask(BasePipelineTask):
# This task maneger will handle all the asyncio tasks created by this
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
self._pipeline_task_manager = task_manager or TaskManager()
# This queue is the queue used to push frames to the pipeline.
self._push_queue = asyncio.Queue()
@@ -365,7 +386,7 @@ class PipelineTask(BasePipelineTask):
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
# then just acts as a proxy.
self._observer = TaskObserver(observers=observers, task_manager=self._task_manager)
self._observer = TaskObserver(observers=observers)
# These events can be used to check which frames make it to the source
# or sink processors. Instead of calling the event handlers for every
@@ -391,6 +412,21 @@ class PipelineTask(BasePipelineTask):
"""
return self._params
@property
def app_resources(self) -> Any:
"""Get the application-defined resources passed to this task.
This is the same object passed to the constructor as
``app_resources``. Tool handlers can also access it via
``FunctionCallParams.app_resources``. The framework returns the
original reference; mutations are visible to all callers.
Returns:
The application-defined resources, or ``None`` if none were
passed.
"""
return self._app_resources
@property
def pipeline(self) -> BasePipeline:
"""Get the full pipeline managed by this pipeline task.
@@ -618,32 +654,24 @@ class PipelineTask(BasePipelineTask):
async def _create_tasks(self):
"""Create and start all pipeline processing tasks."""
self._process_push_task = self._task_manager.create_task(
self._process_push_queue(), f"{self}::_process_push_queue"
)
self._process_push_task = self.create_task(self._process_push_queue())
return self._process_push_task
def _maybe_start_heartbeat_tasks(self):
"""Start heartbeat tasks if heartbeats are enabled and not already running."""
if self._params.enable_heartbeats and self._heartbeat_push_task is None:
self._heartbeat_push_task = self._task_manager.create_task(
self._heartbeat_push_handler(), f"{self}::_heartbeat_push_handler"
)
self._heartbeat_monitor_task = self._task_manager.create_task(
self._heartbeat_monitor_handler(), f"{self}::_heartbeat_monitor_handler"
)
self._heartbeat_push_task = self.create_task(self._heartbeat_push_handler())
self._heartbeat_monitor_task = self.create_task(self._heartbeat_monitor_handler())
def _maybe_start_idle_task(self):
"""Start idle monitoring task if idle timeout is configured."""
if self._idle_timeout_secs:
self._idle_monitor_task = self._task_manager.create_task(
self._idle_monitor_handler(), f"{self}::_idle_monitor_handler"
)
self._idle_monitor_task = self.create_task(self._idle_monitor_handler())
async def _cancel_tasks(self):
"""Cancel all running pipeline tasks."""
if self._process_push_task:
await self._task_manager.cancel_task(self._process_push_task)
await self.cancel_task(self._process_push_task)
self._process_push_task = None
await self._maybe_cancel_heartbeat_tasks()
@@ -655,17 +683,17 @@ class PipelineTask(BasePipelineTask):
return
if self._heartbeat_push_task:
await self._task_manager.cancel_task(self._heartbeat_push_task)
await self.cancel_task(self._heartbeat_push_task)
self._heartbeat_push_task = None
if self._heartbeat_monitor_task:
await self._task_manager.cancel_task(self._heartbeat_monitor_task)
await self.cancel_task(self._heartbeat_monitor_task)
self._heartbeat_monitor_task = None
async def _maybe_cancel_idle_task(self):
"""Cancel idle monitoring task if it is running."""
if self._idle_monitor_task:
await self._task_manager.cancel_task(self._idle_monitor_task)
await self.cancel_task(self._idle_monitor_task)
self._idle_monitor_task = None
def _initial_metrics_frame(self) -> MetricsFrame:
@@ -723,14 +751,22 @@ class PipelineTask(BasePipelineTask):
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
await super().setup(self._pipeline_task_manager)
mgr_params = TaskManagerParams(loop=params.loop)
self._task_manager.setup(mgr_params)
self.task_manager.setup(mgr_params)
setup = FrameProcessorSetup(
clock=self._clock,
task_manager=self._task_manager,
task_manager=self.task_manager,
observer=self._observer,
tool_resources=self._tool_resources,
pipeline_task=self,
# Populate the deprecated `tool_resources` field for backwards
# compatibility with custom FrameProcessor subclasses whose
# ``setup()`` overrides still read it. Reading the field emits a
# DeprecationWarning; new code should read
# ``setup.pipeline_task.app_resources`` instead.
tool_resources=self._app_resources,
)
await self._pipeline.setup(setup)
@@ -738,6 +774,7 @@ class PipelineTask(BasePipelineTask):
await self._load_setup_files()
# Start task observer.
await self._observer.setup(self.task_manager)
await self._observer.start()
async def _cleanup(self, cleanup_pipeline: bool):
@@ -977,7 +1014,7 @@ class PipelineTask(BasePipelineTask):
def _print_dangling_tasks(self):
"""Log any dangling tasks that haven't been properly cleaned up."""
tasks = [t.get_name() for t in self._task_manager.current_tasks()]
tasks = [t.get_name() for t in self.task_manager.current_tasks()]
if tasks:
logger.warning(f"{self} dangling tasks detected: {tasks}")

View File

@@ -17,7 +17,6 @@ from typing import Any
from attr import dataclass
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@dataclass
@@ -62,19 +61,16 @@ class TaskObserver(BaseObserver):
self,
*,
observers: list[BaseObserver] | None = None,
task_manager: BaseTaskManager,
**kwargs,
):
"""Initialize the TaskObserver.
Args:
observers: List of observers to manage. Defaults to empty list.
task_manager: Task manager for creating and managing observer tasks.
**kwargs: Additional arguments passed to the base observer.
"""
super().__init__(**kwargs)
self._observers = observers or []
self._task_manager = task_manager
self._proxies: dict[BaseObserver, Proxy] | None = (
None # Becomes a dict after start() is called
)
@@ -106,7 +102,7 @@ class TaskObserver(BaseObserver):
# Remove the proxy so it doesn't get called anymore.
del self._proxies[observer]
# Cancel the proxy task right away.
await self._task_manager.cancel_task(proxy.task)
await self.cancel_task(proxy.task)
# Remove the observer from the list.
if observer in self._observers:
@@ -122,7 +118,7 @@ class TaskObserver(BaseObserver):
return
for proxy in self._proxies.values():
await self._task_manager.cancel_task(proxy.task)
await self.cancel_task(proxy.task)
async def cleanup(self):
"""Cleanup all proxy observers."""
@@ -157,9 +153,8 @@ class TaskObserver(BaseObserver):
def _create_proxy(self, observer: BaseObserver) -> Proxy:
"""Create a proxy for a single observer."""
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer}::_proxy_task_handler",
task = self.create_task(
self._proxy_task_handler(queue, observer), f"{observer}::_proxy_task_handler"
)
proxy = Proxy(queue=queue, task=task, observer=observer)
return proxy

View File

@@ -0,0 +1,286 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Helpers for the async-tool message protocol used in LLM contexts.
When a function is registered with ``cancel_on_interruption=False``, the
``LLMUserContextAggregator`` / ``LLMAssistantContextAggregator`` pair appends
async-tool messages to the conversation context as the underlying task
progresses:
- A ``started`` message (``role="tool"``) is appended immediately when the
tool starts running.
- An ``intermediate`` message (``role="developer"``) is appended each time an
intermediate result is reported via
``result_callback(..., FunctionCallResultProperties(is_final=False))``.
- A ``final`` message (``role="developer"``) is appended when the task
finishes.
This module is the single source of truth for the on-the-wire payload shape:
- The aggregator uses the ``build_*_message`` functions when injecting messages.
- Realtime LLM services use ``parse_message`` to detect async-tool messages
while iterating the context, then read ``payload.result`` and deliver it via
their formal tool-result channel.
Internally, ``AsyncToolMessagePayload`` is the canonical structured form;
the on-the-wire JSON string is always derived from it (never stored) so the
two representations can't drift.
Consumers are expected to import the module rather than its individual
functions, e.g.::
from pipecat.processors.aggregators import async_tool_messages
...
async_tool_messages.build_started_message(tool_call_id)
async_tool_messages.parse_message(msg)
"""
import json
from dataclasses import dataclass
from typing import Any, Literal
from pipecat.processors.aggregators.llm_context import LLMStandardMessage
AsyncToolMessageKind = Literal["started", "intermediate", "final"]
# --- Payload shape (private; canonical source of truth) ---------------------
# The ``type`` field that identifies an async-tool message payload. Both the
# builders and the parser use this constant; do not duplicate the literal.
_PAYLOAD_TYPE = "async_tool"
# Status value for started / intermediate messages (task still running).
_STATUS_RUNNING = "running"
# Status value for the final message (task complete).
_STATUS_FINISHED = "finished"
# Description shipped on the started message. The text is intentionally
# self-explanatory so a model reading the context can tell what's about to
# happen even without out-of-band knowledge of the protocol.
_STARTED_DESCRIPTION = (
"An asynchronous task associated with this tool_call_id has started "
"running. Expect results to arrive later as developer messages that look "
"roughly like this one (with 'type=async_tool' and a matching tool_call_id) "
"but with a 'result' field. Note that there *may* be more than one result "
"(i.e., a stream of results), but there doesn't have to be (there may be "
"only one). The last result will come in a message with 'status=finished'."
)
# Description shipped on each intermediate-result message.
_INTERMEDIATE_DESCRIPTION = (
"This is an intermediate result for the asynchronous task associated with "
"this tool_call_id. The task is still running. More intermediate results "
"may follow, or the next result may be the final one with "
"'status=finished'."
)
# Description shipped on the final-result message.
_FINAL_DESCRIPTION = (
"This is the final result for the asynchronous task associated with this "
"tool_call_id. The task has completed. No further results will arrive for "
"this tool_call_id."
)
@dataclass(frozen=True)
class AsyncToolMessagePayload:
"""The structured contents of an async-tool message in an LLM context.
Parameters:
kind: Which of the three async-tool message stages this is.
tool_call_id: The id of the tool invocation this payload relates to.
status: ``"running"`` for started/intermediate, ``"finished"`` for
the final message.
description: Human-readable description from the payload. May be empty.
result: For ``intermediate`` and ``final`` messages, the JSON-encoded
result string (or the literal ``"COMPLETED"`` if the function
returned no value). ``None`` for ``started`` messages.
"""
kind: AsyncToolMessageKind
tool_call_id: str
status: Literal["running", "finished"]
description: str
result: str | None
# --- Internal: payload ↔ on-the-wire forms -----------------------------------
def _payload_to_json(payload: AsyncToolMessagePayload) -> str:
"""Serialize a payload to its on-the-wire JSON string form.
Fields that don't apply to the payload's kind are omitted (notably
``result`` is left out of ``started`` payloads, since the task hasn't
produced a result yet).
"""
obj: dict[str, Any] = {
"type": _PAYLOAD_TYPE,
"status": payload.status,
"tool_call_id": payload.tool_call_id,
"description": payload.description,
}
if payload.result is not None:
obj["result"] = payload.result
return json.dumps(obj)
def _payload_to_message(payload: AsyncToolMessagePayload) -> LLMStandardMessage:
"""Wrap a payload in the LLM context message shape that matches its kind.
- ``started``: ``role="tool"`` plus ``tool_call_id`` at the top level
(so the message can sit alongside other regular tool-result messages).
- ``intermediate`` / ``final``: ``role="developer"``; ``tool_call_id``
lives only inside the JSON payload.
"""
content = _payload_to_json(payload)
if payload.kind == "started":
return {
"role": "tool",
"content": content,
"tool_call_id": payload.tool_call_id,
}
return {
"role": "developer",
"content": content,
}
# --- Builders ----------------------------------------------------------------
def build_started_message(tool_call_id: str) -> LLMStandardMessage:
"""Build a ``started`` async-tool message for an LLM context.
Append the returned message to the LLM context immediately when an async
function call (registered with ``cancel_on_interruption=False``) starts
running. The message lets the model know a task is in flight and that its
results will arrive later in subsequent ``developer``-role messages.
Args:
tool_call_id: The id of the tool invocation this message is for.
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return _payload_to_message(
AsyncToolMessagePayload(
kind="started",
tool_call_id=tool_call_id,
status=_STATUS_RUNNING,
description=_STARTED_DESCRIPTION,
result=None,
)
)
def build_intermediate_result_message(tool_call_id: str, result: str) -> LLMStandardMessage:
"""Build an intermediate-result async-tool message for an LLM context.
Append the returned message to the LLM context each time the running async
function reports a non-final result via
``result_callback(..., FunctionCallResultProperties(is_final=False))``.
Args:
tool_call_id: The id of the tool invocation the result is for.
result: The JSON-encoded result string (caller is responsible for
encoding the function's actual return value, typically via
``json.dumps``).
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return _payload_to_message(
AsyncToolMessagePayload(
kind="intermediate",
tool_call_id=tool_call_id,
status=_STATUS_RUNNING,
description=_INTERMEDIATE_DESCRIPTION,
result=result,
)
)
def build_final_result_message(tool_call_id: str, result: str) -> LLMStandardMessage:
"""Build a final-result async-tool message for an LLM context.
Append the returned message to the LLM context when the async function
finishes. After this message no further async-tool messages will arrive
for this ``tool_call_id``.
Args:
tool_call_id: The id of the tool invocation the result is for.
result: The JSON-encoded result string, or the literal ``"COMPLETED"``
sentinel when the function returned ``None`` (matching the same
convention used for synchronous tool calls).
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return _payload_to_message(
AsyncToolMessagePayload(
kind="final",
tool_call_id=tool_call_id,
status=_STATUS_FINISHED,
description=_FINAL_DESCRIPTION,
result=result,
)
)
# --- Parsing -----------------------------------------------------------------
def parse_message(message: LLMStandardMessage) -> AsyncToolMessagePayload | None:
"""Decode an async-tool message payload, or return None if not async-tool.
Args:
message: A standard message from the LLM context. Callers iterating
over ``LLMContext.get_messages()`` should filter out
``LLMSpecificMessage`` entries first; only ``LLMStandardMessage``
values can carry async-tool payloads.
Returns:
An ``AsyncToolMessagePayload`` if the message is a recognized
async-tool payload, otherwise ``None``.
"""
role = message.get("role")
if role not in ("tool", "developer"):
return None
content = message.get("content")
if not isinstance(content, str):
return None
try:
payload = json.loads(content)
except (json.JSONDecodeError, ValueError):
return None
if not isinstance(payload, dict) or payload.get("type") != _PAYLOAD_TYPE:
return None
tool_call_id = payload.get("tool_call_id")
status = payload.get("status")
if not isinstance(tool_call_id, str) or status not in (_STATUS_RUNNING, _STATUS_FINISHED):
return None
description = payload.get("description", "")
if not isinstance(description, str):
description = ""
result = payload.get("result")
if result is not None and not isinstance(result, str):
result = None
if result is None:
kind: AsyncToolMessageKind = "started"
elif status == _STATUS_FINISHED:
kind = "final"
else:
kind = "intermediate"
return AsyncToolMessagePayload(
kind=kind,
tool_call_id=tool_call_id,
status=status,
description=description,
result=result,
)

View File

@@ -21,7 +21,7 @@ import io
import wave
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, TypeAlias, TypeGuard, TypeVar
from typing import Any, TypeAlias, TypeGuard, TypeVar, cast
from loguru import logger
from openai._types import NOT_GIVEN as OPEN_AI_NOT_GIVEN
@@ -129,13 +129,13 @@ class LLMContext:
url: The URL of the image.
text: Optional text to include with the image.
"""
content = []
content: list[dict[str, Any]] = []
if text:
content.append({"type": "text", "text": text})
content.append({"type": "image_url", "image_url": {"url": url}})
return {"role": role, "content": content}
return cast(LLMContextMessage, {"role": role, "content": content})
@staticmethod
async def create_image_message(
@@ -187,7 +187,7 @@ class LLMContext:
audio_frames: List of audio frame objects to include.
text: Optional text to include with the audio.
"""
content = [{"type": "text", "text": text}]
content: list[dict[str, Any]] = [{"type": "text", "text": text}]
def encode_audio():
sample_rate = audio_frames[0].sample_rate
@@ -214,7 +214,7 @@ class LLMContext:
}
)
return {"role": role, "content": content}
return cast(LLMContextMessage, {"role": role, "content": content})
@property
def messages(self) -> list[LLMContextMessage]:
@@ -295,7 +295,10 @@ class LLMContext:
result.append(msg_copy)
continue
msg = copy.deepcopy(message)
# The standard message variant is a union of TypedDicts; the
# mutations below operate on plain dicts at runtime. Treat as
# such for the duration of the redaction loop.
msg: dict[str, Any] = cast(dict[str, Any], copy.deepcopy(message))
content = msg.get("content")
if isinstance(content, list):
for item in content:

View File

@@ -44,6 +44,7 @@ from pipecat.frames.frames import (
LLMContextSummaryRequestFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMarkerFrame,
LLMMessagesAppendFrame,
LLMMessagesTransformFrame,
LLMMessagesUpdateFrame,
@@ -53,7 +54,6 @@ from pipecat.frames.frames import (
LLMThoughtEndFrame,
LLMThoughtStartFrame,
LLMThoughtTextFrame,
LLMUpdateSettingsFrame,
StartFrame,
TextFrame,
TranscriptionFrame,
@@ -67,25 +67,29 @@ from pipecat.frames.frames import (
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
LLMSpecificMessage,
NotGiven,
is_given,
)
from pipecat.processors.aggregators.llm_context_summarizer import (
LLMContextSummarizer,
SummaryAppliedEvent,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.settings import LLMSettings
from pipecat.turns.user_idle_controller import UserIdleController
from pipecat.turns.user_mute import BaseUserMuteStrategy
from pipecat.turns.user_start import BaseUserTurnStartStrategy, UserTurnStartedParams
from pipecat.turns.user_stop import BaseUserTurnStopStrategy, UserTurnStoppedParams
from pipecat.turns.user_turn_completion_mixin import UserTurnCompletionConfig
from pipecat.turns.user_turn_controller import UserTurnController
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from pipecat.turns.user_turn_strategies import (
FilterIncompleteUserTurnStrategies,
UserTurnStrategies,
)
from pipecat.utils.context.llm_context_summarization import (
LLMAutoContextSummarizationConfig,
LLMContextSummarizationConfig,
@@ -99,6 +103,21 @@ class LLMUserAggregatorParams:
"""Parameters for configuring LLM user aggregation behavior.
Parameters:
add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the
aggregator computes the diff against the currently advertised tools
and appends a developer-role message to the context describing
additions/removals. Helps the LLM stay coherent across
mid-conversation tool changes, mitigating several flavors of
tool-call-related hallucination: calling tools that have been
removed, avoiding tools that have been re-added, and hallucinating
output (made-up answers or tool-call-shaped non-tool-calls) when
tools are unavailable. Only standard tools are diffed; custom
(LLM-specific) tools are ignored. When using
``LLMContextAggregatorPair``, prefer setting this via its
``add_tool_change_messages`` argument instead. Defaults to False.
audio_idle_timeout: Timeout in seconds to force speech stop when
no audio frames are received while in SPEAKING state (e.g. user mutes
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
user_turn_strategies: User turn start and stop strategies.
user_mute_strategies: List of user mute strategies.
user_turn_stop_timeout: Time in seconds to wait before considering the
@@ -108,27 +127,64 @@ class LLMUserAggregatorParams:
has been idle (not speaking) for this duration. Set to 0 to disable
idle detection.
vad_analyzer: Voice Activity Detection analyzer instance.
audio_idle_timeout: Timeout in seconds to force speech stop when
no audio frames are received while in SPEAKING state (e.g. user mutes
mic mid-speech). Set to 0 to disable. Defaults to 1.0.
filter_incomplete_user_turns: Whether to filter out incomplete user turns.
When enabled, the LLM outputs a turn completion marker at the start of
each response: ✓ (complete), ○ (incomplete short), or ◐ (incomplete long).
Incomplete responses are suppressed and timeouts trigger re-prompting.
user_turn_completion_config: Configuration for turn completion behavior including
custom instructions, timeouts, and prompts. Only used when
filter_incomplete_user_turns is True.
filter_incomplete_user_turns: [DEPRECATED] Use
``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
instead. When enabled, the LLM outputs a turn-completion
marker at the start of each response: ✓ (complete), ○
(incomplete short), or ◐ (incomplete long). Incomplete
responses are suppressed and timeouts trigger re-prompting.
.. deprecated:: 1.2.0
Use ``user_turn_strategies=FilterIncompleteUserTurnStrategies()``
instead. Will be removed in version 2.0.0.
user_turn_completion_config: [DEPRECATED] Configuration for turn
completion behavior including custom instructions, timeouts, and
prompts. Only used when filter_incomplete_user_turns is True
(deprecated path) — for the new strategy-based API, pass the config
directly to ``FilterIncompleteUserTurnStrategies(config=...)``.
.. deprecated:: 1.2.0
Pass the config directly to
``FilterIncompleteUserTurnStrategies(config=...)`` instead.
Will be removed in version 2.0.0.
"""
add_tool_change_messages: bool = False
audio_idle_timeout: float = 1.0
user_turn_strategies: UserTurnStrategies | None = None
user_mute_strategies: list[BaseUserMuteStrategy] = field(default_factory=list)
user_turn_stop_timeout: float = 5.0
user_idle_timeout: float = 0
vad_analyzer: VADAnalyzer | None = None
audio_idle_timeout: float = 1.0
filter_incomplete_user_turns: bool = False
user_turn_completion_config: UserTurnCompletionConfig | None = None
def __post_init__(self):
if self.filter_incomplete_user_turns:
warnings.warn(
"LLMUserAggregatorParams.filter_incomplete_user_turns is deprecated. "
"Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.",
DeprecationWarning,
stacklevel=2,
)
if self.user_turn_completion_config:
warnings.warn(
"LLMUserAggregatorParams.user_turn_completion_config is deprecated. "
"Use user_turn_strategies=FilterIncompleteUserTurnStrategies() instead.",
DeprecationWarning,
stacklevel=2,
)
if self.user_turn_completion_config is not None:
warnings.warn(
"LLMUserAggregatorParams.user_turn_completion_config is deprecated. "
"Pass the config directly to "
"FilterIncompleteUserTurnStrategies(config=...) instead.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class LLMAssistantAggregatorParams:
@@ -143,14 +199,32 @@ class LLMAssistantAggregatorParams:
summarization. Controls trigger thresholds, message preservation, and
summarization prompts. If None, uses default
``LLMAutoContextSummarizationConfig`` values.
add_tool_change_messages: When True, on each ``LLMSetToolsFrame`` the
aggregator computes the diff against the currently advertised tools
and appends a developer-role message to the context describing
additions/removals. Helps the LLM stay coherent across
mid-conversation tool changes, mitigating several flavors of
tool-call-related hallucination: calling tools that have been
removed, avoiding tools that have been re-added, and hallucinating
output (made-up answers or tool-call-shaped non-tool-calls) when
tools are unavailable. Only standard tools are diffed; custom
(LLM-specific) tools are ignored. When using
``LLMContextAggregatorPair``, prefer setting this via its
``add_tool_change_messages`` argument instead. Defaults to False.
"""
enable_auto_context_summarization: bool = False
auto_context_summarization_config: LLMAutoContextSummarizationConfig | None = None
add_tool_change_messages: bool = False
# ---------------------------------------------------------------------------
# Deprecated field names — kept for backward compatibility.
# Use enable_auto_context_summarization and auto_context_summarization_config instead.
#
# .. deprecated:: 1.2.0
# Use ``enable_auto_context_summarization`` and
# ``auto_context_summarization_config`` instead. Will be removed in
# version 2.0.0.
# ---------------------------------------------------------------------------
enable_context_summarization: bool | None = None
context_summarization_config: LLMContextSummarizationConfig | None = None
@@ -248,20 +322,87 @@ class LLMContextAggregator(FrameProcessor):
common functionality for context-based conversation management.
"""
def __init__(self, *, context: LLMContext, role: str, **kwargs):
# Developer-role messages appended to the context when tools are added/
# removed via ``LLMSetToolsFrame`` (only when ``add_tool_change_messages``
# is enabled on the aggregator's params). ``{function_names}`` is
# substituted with a sorted, comma-separated, backtick-wrapped list.
TOOL_ACTIVATION_MESSAGE_TEMPLATE = (
"The following function(s) have just been added and may now be called: "
"{function_names}. Any previously available functions remain available."
)
TOOL_DEACTIVATION_MESSAGE_TEMPLATE = (
"The following function(s) have just been removed and should not be called: "
"{function_names}. Any previously available functions remain available. "
"The removed function(s) may become available again later, in which case "
"you will be informed."
)
def __init__(
self,
*,
context: LLMContext,
role: str,
add_tool_change_messages: bool = False,
**kwargs,
):
"""Initialize the context response aggregator.
Args:
context: The LLM context to use for conversation storage.
role: The role this aggregator represents (e.g. "user", "assistant").
add_tool_change_messages: See the field of the same name on the
aggregator-specific params dataclasses. Subclasses propagate
this from their ``params``.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(**kwargs)
self._context = context
self._role = role
self._add_tool_change_messages = add_tool_change_messages
self._aggregation: list[TextPartForConcatenation] = []
def _maybe_add_tool_change_messages(self, new_tools: ToolsSchema | NotGiven) -> None:
"""Append a developer message describing tool add/remove deltas.
No-op unless ``add_tool_change_messages`` was enabled on the aggregator,
and no-op when the diff against the currently advertised tools is empty.
Custom (LLM-specific) tools are ignored — only standard tools are diffed.
Both aggregators call this on every ``LLMSetToolsFrame`` they handle.
Whichever aggregator handles the frame first computes a real diff
against the shared context and adds the announcement; by the time
the other aggregator sees it (if at all), the context already
reflects the new tools, so its diff is empty and no duplicate
message is added. This is order-independent: it works whether the
frame flows downstream (user aggregator first) or upstream
(assistant aggregator first, and consumed without being forwarded).
"""
if not self._add_tool_change_messages:
return
def _names(tools: ToolsSchema | NotGiven) -> set[str]:
if not is_given(tools):
return set()
return {s.name for s in tools.standard_tools}
old_names = _names(self._context.tools)
new_names = _names(new_tools)
added = new_names - old_names
removed = old_names - new_names
if not added and not removed:
return
parts: list[str] = []
if added:
names = ", ".join(f"`{n}`" for n in sorted(added))
parts.append(self.TOOL_ACTIVATION_MESSAGE_TEMPLATE.format(function_names=names))
if removed:
names = ", ".join(f"`{n}`" for n in sorted(removed))
parts.append(self.TOOL_DEACTIVATION_MESSAGE_TEMPLATE.format(function_names=names))
self._context.add_message({"role": "developer", "content": " ".join(parts)})
@property
def messages(self) -> list[LLMContextMessage]:
"""Get messages from the LLM context.
@@ -434,20 +575,46 @@ class LLMUserAggregator(LLMContextAggregator):
params: Configuration parameters for aggregation behavior.
**kwargs: Additional arguments.
"""
super().__init__(context=context, role="user", **kwargs)
self._params = params or LLMUserAggregatorParams()
params = params or LLMUserAggregatorParams()
super().__init__(
context=context,
role="user",
add_tool_change_messages=params.add_tool_change_messages,
**kwargs,
)
self._params = params
self._register_event_handler("on_user_turn_started")
self._register_event_handler("on_user_turn_stopped")
self._register_event_handler("on_user_turn_stop_timeout")
self._register_event_handler("on_user_turn_idle")
self._register_event_handler("on_user_turn_inference_triggered")
self._register_event_handler("on_user_mute_started")
self._register_event_handler("on_user_mute_stopped")
user_turn_strategies = self._params.user_turn_strategies or UserTurnStrategies()
# Deprecated path: translate filter_incomplete_user_turns into
# the equivalent FilterIncompleteUserTurnStrategies wiring. The
# DeprecationWarning is emitted in LLMUserAggregatorParams.__post_init__.
if self._params.filter_incomplete_user_turns:
user_turn_strategies = FilterIncompleteUserTurnStrategies(
start=user_turn_strategies.start,
stop=user_turn_strategies.stop,
config=self._params.user_turn_completion_config,
)
self._params.user_turn_strategies = user_turn_strategies
self._user_is_muted = False
self._user_turn_start_timestamp = ""
# Full transcript across the user turn. Each
# `_on_user_turn_inference_triggered` push captures only the
# new segment since the previous push (push_aggregation resets
# `_aggregation` after writing to context); we accumulate those
# segments here so the eventual `on_user_turn_stopped` event
# surfaces the full turn transcript even when several
# inferences fire before finalization.
self._full_user_turn_aggregation: str | None = None
self._user_turn_controller = UserTurnController(
user_turn_strategies=user_turn_strategies,
@@ -458,6 +625,9 @@ class LLMUserAggregator(LLMContextAggregator):
self._user_turn_controller.add_event_handler(
"on_user_turn_started", self._on_user_turn_started
)
self._user_turn_controller.add_event_handler(
"on_user_turn_inference_triggered", self._on_user_turn_inference_triggered
)
self._user_turn_controller.add_event_handler(
"on_user_turn_stopped", self._on_user_turn_stopped
)
@@ -536,6 +706,7 @@ class LLMUserAggregator(LLMContextAggregator):
elif isinstance(frame, LLMMessagesTransformFrame):
await self._handle_llm_messages_transform(frame)
elif isinstance(frame, LLMSetToolsFrame):
self._maybe_add_tool_change_messages(frame.tools)
self.set_tools(frame.tools)
# Push the LLMSetToolsFrame as well, since speech-to-speech LLM
# services (like OpenAI Realtime) may need to know about tool
@@ -575,21 +746,6 @@ class LLMUserAggregator(LLMContextAggregator):
for s in self._params.user_mute_strategies:
await s.setup(self.task_manager)
# Enable incomplete turn filtering on the LLM if configured
if self._params.filter_incomplete_user_turns:
# Get config or use defaults
config = self._params.user_turn_completion_config or UserTurnCompletionConfig()
# Enable the feature on the LLM with config
await self.push_frame(
LLMUpdateSettingsFrame(
delta=LLMSettings(
filter_incomplete_user_turns=True,
user_turn_completion_config=config,
)
)
)
async def _stop(self, frame: EndFrame):
await self._maybe_emit_user_turn_stopped(on_session_end=True)
await self._cleanup()
@@ -729,6 +885,7 @@ class LLMUserAggregator(LLMContextAggregator):
logger.debug(f"{self}: User started speaking (strategy: {strategy})")
self._user_turn_start_timestamp = time_now_iso8601()
self._full_user_turn_aggregation = None
if params.enable_user_speaking_frames:
await self.broadcast_frame(UserStartedSpeakingFrame)
@@ -740,6 +897,30 @@ class LLMUserAggregator(LLMContextAggregator):
await self._call_event_handler("on_user_turn_started", strategy)
async def _on_user_turn_inference_triggered(
self,
controller: UserTurnController,
strategy: BaseUserTurnStopStrategy,
):
logger.debug(f"{self}: User turn inference triggered (strategy: {strategy})")
# Push aggregation now: this writes the user message segment to
# the context and emits LLMContextFrame, which kicks LLM
# inference. Concatenate the segment into
# `_full_user_turn_aggregation` so multiple inferences in the
# same turn don't lose earlier segments from the eventual
# `on_user_turn_stopped` event.
segment = await self.push_aggregation()
if segment:
if self._full_user_turn_aggregation:
self._full_user_turn_aggregation = (
f"{self._full_user_turn_aggregation} {segment}".strip()
)
else:
self._full_user_turn_aggregation = segment
await self._call_event_handler("on_user_turn_inference_triggered", strategy)
async def _on_user_turn_stopped(
self,
controller: UserTurnController,
@@ -774,15 +955,29 @@ class LLMUserAggregator(LLMContextAggregator):
):
"""Maybe emit user turn stopped event.
Earlier inference triggers in the same turn have already pushed
their segments to the context and accumulated them into
``self._full_user_turn_aggregation``. Any aggregation that
arrived after the last inference trigger is flushed here so
end-of-turn content is never lost from the public event.
Args:
strategy: The strategy that triggered the turn stop.
on_session_end: If True, only emit if there's unemitted content
(avoids duplicate events when session ends).
"""
aggregation = await self.push_aggregation()
if not on_session_end or aggregation:
segment = await self.push_aggregation()
full_aggregation = self._full_user_turn_aggregation
self._full_user_turn_aggregation = None
if segment and full_aggregation:
content = f"{full_aggregation} {segment}".strip()
else:
content = full_aggregation or segment
if not on_session_end or content:
message = UserTurnStoppedMessage(
content=aggregation, timestamp=self._user_turn_start_timestamp
content=content, timestamp=self._user_turn_start_timestamp
)
await self._call_event_handler("on_user_turn_stopped", strategy, message)
self._user_turn_start_timestamp = ""
@@ -843,8 +1038,14 @@ class LLMAssistantAggregator(LLMContextAggregator):
params: Configuration parameters for aggregation behavior.
**kwargs: Additional arguments.
"""
super().__init__(context=context, role="assistant", **kwargs)
self._params = params or LLMAssistantAggregatorParams()
params = params or LLMAssistantAggregatorParams()
super().__init__(
context=context,
role="assistant",
add_tool_change_messages=params.add_tool_change_messages,
**kwargs,
)
self._params = params
self._function_calls_in_progress: dict[str, FunctionCallInProgressFrame | None] = {}
self._function_calls_image_results: dict[str, UserImageRawFrame] = {}
@@ -927,13 +1128,15 @@ class LLMAssistantAggregator(LLMContextAggregator):
await self._handle_end_or_cancel(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, LLMAssistantPushAggregationFrame):
await self.push_aggregation()
await self._handle_push_aggregation()
elif isinstance(frame, LLMFullResponseStartFrame):
await self._handle_llm_start(frame)
elif isinstance(frame, LLMFullResponseEndFrame):
await self._handle_llm_end(frame)
elif isinstance(frame, TextFrame):
await self._handle_text(frame)
elif isinstance(frame, LLMMarkerFrame):
await self._handle_marker_frame(frame)
elif isinstance(frame, LLMThoughtStartFrame):
await self._handle_thought_start(frame)
elif isinstance(frame, LLMThoughtTextFrame):
@@ -949,6 +1152,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
elif isinstance(frame, LLMMessagesTransformFrame):
await self._handle_llm_messages_transform(frame)
elif isinstance(frame, LLMSetToolsFrame):
self._maybe_add_tool_change_messages(frame.tools)
self.set_tools(frame.tools)
elif isinstance(frame, LLMSetToolChoiceFrame):
self.set_tool_choice(frame.tool_choice)
@@ -1075,23 +1279,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
is_async = not frame.cancel_on_interruption
if is_async:
self._context.add_message(
{
"role": "tool",
"content": json.dumps(
{
"type": "async_tool",
"status": "running",
"tool_call_id": frame.tool_call_id,
"description": "An asynchronous task associated with this tool_call_id has started running. "
+ "Expect results to arrive later as developer messages that look roughly like this one (with 'type=async_tool' and a matching tool_call_id) but with a 'result' field. "
+ "Note that there *may* be more than one result (i.e., a stream of results), but there doesn't have to be (there may be only one). "
+ "The last result will come in a message with 'status=finished'.",
}
),
"tool_call_id": frame.tool_call_id,
}
)
self._context.add_message(async_tool_messages.build_started_message(frame.tool_call_id))
else:
self._context.add_message(
{
@@ -1204,19 +1392,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
result = json.dumps(frame.result, ensure_ascii=False)
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "running",
"description": "This is an intermediate result for the asynchronous task associated with this tool_call_id. "
+ "The task is still running. More intermediate results may follow, or the next result may be the final one with 'status=finished'.",
"result": result,
}
),
}
async_tool_messages.build_intermediate_result_message(frame.tool_call_id, result)
)
async def _handle_function_call_finished(
@@ -1237,19 +1413,7 @@ class LLMAssistantAggregator(LLMContextAggregator):
# notified of the completed result instead of updating the IN_PROGRESS
# tool message.
self._context.add_message(
{
"role": "developer",
"content": json.dumps(
{
"type": "async_tool",
"tool_call_id": frame.tool_call_id,
"status": "finished",
"description": "This is the final result for the asynchronous task associated with this tool_call_id. "
+ "The task has completed. No further results will arrive for this tool_call_id.",
"result": result,
}
),
}
async_tool_messages.build_final_result_message(frame.tool_call_id, result)
)
else:
self._update_function_call_result(frame.function_name, frame.tool_call_id, result)
@@ -1309,6 +1473,17 @@ class LLMAssistantAggregator(LLMContextAggregator):
async def _handle_llm_end(self, _: LLMFullResponseEndFrame):
await self._trigger_assistant_turn_stopped()
async def _handle_push_aggregation(self):
# LLMAssistantPushAggregationFrame is emitted by TTSService at the end
# of a TTSSpeakFrame-driven utterance (no surrounding LLM response
# cycle), so no LLMFullResponseStartFrame ever set the turn-start
# timestamp. Open a turn now so on_assistant_turn_stopped fires for the
# greeting text the same way it did before LLMAssistantPushAggregationFrame
# was introduced.
if not self._assistant_turn_start_timestamp:
await self._trigger_assistant_turn_started()
await self._trigger_assistant_turn_stopped()
async def _handle_text(self, frame: TextFrame):
# Skip TextFrame types not intended to build the assistant context
if isinstance(frame, (TranscriptionFrame, TranslationFrame, InterimTranscriptionFrame)):
@@ -1327,6 +1502,31 @@ class LLMAssistantAggregator(LLMContextAggregator):
)
)
async def _handle_marker_frame(self, frame: LLMMarkerFrame):
if frame.append_to_context_immediately:
# Stand-alone marker: write it to the context now as its
# own assistant message. Used when the marker is the entire
# assistant turn — e.g. the ○ / ◐ incomplete-turn signals,
# where the spoken response is suppressed and the marker
# is the only artifact.
self._context.add_message({"role": "assistant", "content": frame.marker})
await self.push_context_frame()
timestamp_frame = LLMContextAssistantTimestampFrame(timestamp=time_now_iso8601())
await self.push_frame(timestamp_frame)
return
# Marker is part of an in-progress assistant response. Append
# it to the running aggregation so `push_aggregation` writes
# marker + text as a single context message — e.g. the ✓
# complete-turn signal that prefixes the spoken response,
# producing "✓ <response>" in context. Markers are stripped
# from the transcript via
# `_maybe_strip_turn_completion_markers` so consumers see
# clean text.
self._aggregation.append(
TextPartForConcatenation(frame.marker, includes_inter_part_spaces=False)
)
async def _handle_thought_start(self, frame: LLMThoughtStartFrame):
await self._reset_thought_aggregation()
self._thought_append_to_context = frame.append_to_context
@@ -1478,6 +1678,7 @@ class LLMContextAggregatorPair:
*,
user_params: LLMUserAggregatorParams | None = None,
assistant_params: LLMAssistantAggregatorParams | None = None,
add_tool_change_messages: bool | None = None,
):
"""Initialize the LLM context aggregator pair.
@@ -1485,9 +1686,22 @@ class LLMContextAggregatorPair:
context: The context to be managed by the aggregators.
user_params: Parameters for the user context aggregator.
assistant_params: Parameters for the assistant context aggregator.
add_tool_change_messages: When provided, sets the field of the
same name on both ``user_params`` and ``assistant_params``,
overriding any value already set on either. This is the
preferred way to enable tool-change announcements: it ensures
both aggregators participate, which makes the feature robust
regardless of which aggregator handles a given
``LLMSetToolsFrame``. The shared context guarantees the
announcement is added exactly once (the second aggregator's
diff is empty by the time it sees the frame). Leave as
``None`` to respect per-params settings.
"""
user_params = user_params or LLMUserAggregatorParams()
assistant_params = assistant_params or LLMAssistantAggregatorParams()
if add_tool_change_messages is not None:
user_params.add_tool_change_messages = add_tool_change_messages
assistant_params.add_tool_change_messages = add_tool_change_messages
self._user = LLMUserAggregator(context, params=user_params)
self._assistant = LLMAssistantAggregator(context, params=assistant_params)

Some files were not shown because too many files have changed in this diff Show More