Compare commits

...

167 Commits

Author SHA1 Message Date
Paul Kompfner
3fee91ddec Drop redundant changelog entry for OpenAI Realtime example
The OpenAI Realtime story didn't add any service-level code — just a
new example. The original 4480.added.md entry already describes the
feature as "a realtime service like Gemini Live," which generalizes
to OpenAI Realtime.
2026-05-18 12:06:48 -04:00
Paul Kompfner
638294c1cc Add realtime-openai-local-vad example
Mirrors the Gemini Live local-VAD example for OpenAI Realtime, showing
that `wait_for_transcript_to_end_user_turn=False` composes cleanly
with `turn_detection=False`. The OpenAI Realtime service already wires
`UserStoppedSpeakingFrame` to `input_audio_buffer.commit` +
`response.create` when `turn_detection=False`, so the example is the
only new code needed.
2026-05-18 11:50:16 -04:00
Paul Kompfner
ea96b7aec7 Rename transcript-gather to post-turn transcript wait
Switch the vocabulary for the timer-driven phase that runs when
`wait_for_transcript_to_end_user_turn=False`. "Transcript gather" was
too vague to be self-documenting; "post-turn transcript wait" names
when it happens (after the user turn ends) and what it's for (waiting
for late-arriving transcripts).

Renames the internal property to `_wait_for_post_turn_transcripts`
and the supporting state/method names to match
(`_post_turn_transcript_wait_task`, `_complete_post_turn_transcript_wait`,
etc.). Updates docstrings, comments, log messages, the example
inline doc, and the test prose to use the new vocabulary consistently.
2026-05-18 10:51:14 -04:00
Paul Kompfner
666c619113 Size transcript-gather timer to STT-reported P99 TTFS
The aggregator's transcript-gather timer (used when
`wait_for_transcript_to_end_user_turn=False`) was hardcoded to
`DEFAULT_TTFS_P99`. Capture `STTMetadataFrame.ttfs_p99_latency` as
it flows through the user aggregator and prefer that value, just
like the stop strategies already do. Falls back to
`DEFAULT_TTFS_P99` when no STT service has reported a value.
2026-05-18 10:29:19 -04:00
Paul Kompfner
797d09a1d5 Align vocabulary around wait_for_transcript_to_end_user_turn=False
Reframe comments, docstrings, identifiers, changelog, and example
around a single explanation of the option: (1) turn strategies do not
consider user transcripts, letting the user turn end sooner, and (2)
the aggregator gathers user transcripts on its own after the turn
ends via a simple timer, then emits `on_user_turn_message_finalized`
with the new user context message.

The mechanism is generic, so internal aggregator vocabulary stays
generic ("transcript-gather", "after the user turn ends"); the
public-facing param docstring is the one place that explains the
"local turn detection drives a realtime service" use case. The stop
strategies' `wait_for_transcript` flag is pointed at as something
that's "usually flipped indirectly" by the aggregator param rather
than something to pair with it.

Renames internal state to match: `_expect_delayed_transcripts` →
`_aggregator_gathers_transcripts`, `_pending_finalization_*` →
`_transcript_gather_*`, `_finalize_delayed_user_message` →
`_finalize_user_message`, etc.
2026-05-18 10:18:22 -04:00
Paul Kompfner
ee1538d18e test: cover fallback path and align with vocabulary refactor
Adds two tests for the strategy's transcripts-without-VAD fallback
path — one in default mode (both events fire with the aggregated
content) and one in delayed-transcript mode (only
``on_user_turn_message_finalized`` fires; no end-of-turn event is
emitted since no turn ever started in the controller).

Updates existing tests for the vocabulary refactor: assertions now
expect ``content=None`` (not ``""``) for the end-of-turn event in
delayed-transcript mode; comments and docstrings use the
standardized terms (end of turn, user message finalization,
pending-finalization timer, plural "transcripts").
2026-05-18 09:55:42 -04:00
Paul Kompfner
8330c3487d Refactor delayed-transcript machinery; standardize vocabulary
Splits ``_maybe_emit_user_turn_stopped`` into three focused methods —
``_flush_user_message_to_context`` (push aggregation, return content +
timestamp), ``_finalize_user_turn`` (default-mode flow, emits both
events), and ``_finalize_delayed_user_message`` (delayed-mode flow,
emits only ``on_user_turn_message_finalized``). Fixes a side-issue
where ``on_user_turn_stopped`` could fire from non-end-of-turn paths
in delayed-transcript mode; that event now has a single origin (the
end-of-turn handler).

Standardizes vocabulary across docstrings and comments:

- "Default mode" / "Delayed-transcript mode" (with
  ``_expect_delayed_transcripts == False/True``)
- "End of turn" (not "audible stop" or "audible end of turn")
- "User message finalization" (the moment user-text is flushed to
  context + ``on_user_turn_message_finalized`` fires)
- "Pending finalization" (the in-between state in delayed mode)
- Transcripts (plural — the aggregator combines multiple per turn)

The timer that triggers user message finalization is no longer
described as a "backstop" — it's the sole trigger for finalization
in delayed-transcript mode, not a fallback. Renamed accordingly:
``_pending_finalization_task``, ``_pending_finalization_handler``,
``_run_pending_finalization``, ``_discard_pending_finalization``.

Adds a separate message class for the two events:
``UserTurnStoppedMessage.content`` is now ``str | None`` (``None``
at end-of-turn in delayed-transcript mode), and a new
``UserMessageFinalizedMessage`` carries the always-populated
``content`` for the finalization event.
2026-05-18 09:55:11 -04:00
Paul Kompfner
4479a3a6af docs: tighten wait_for_transcript_to_end_user_turn docstring + test docstring
Reframes the strategy mutations as part of configuring the flag
(not an "also" aside), and the ordering invariant in the test
docstring as flush-timing (not arrival-timing).
2026-05-15 15:16:39 -04:00
Paul Kompfner
8631518388 test: cover wait_for_transcript_to_end_user_turn=False aggregator behavior
Adds five tests for the delayed-transcript flow on
`LLMUserAggregator`:

- basic flow: `on_user_turn_stopped` fires fast with empty content;
  `on_user_turn_message_finalized` fires later with the populated
  transcript; user message lands in context.
- backstop with no transcript: backstop timer still finalizes the
  turn; message_finalized fires with empty content; no user message
  added to context.
- next-turn precondition violation: a new VAD start fires while the
  previous turn is still pending; the previous turn is force-flushed
  before the new turn begins.
- context-order with assistant response: paired aggregators with a
  late user transcript arriving before the assistant content streams;
  verifies the user message lands in context before the assistant
  message (the conversational-order invariant the design relies on).
- strategy mutation: explicit start/stop strategies are mutated by
  the bundle — `TranscriptionUserTurnStartStrategy` is dropped from
  start, `wait_for_transcript=False` is flipped on the stop strategy
  that had it explicitly set to True.

Tests patch `DEFAULT_TTFS_P99` to keep the backstop fast.
2026-05-15 14:08:50 -04:00
Paul Kompfner
47e2f7a037 realtime + local turn detection: drop the user-transcript wait
Add the configuration surface to drive a realtime service like Gemini
Live from local turn detection without paying user-transcript latency.
Cascaded pipelines wait for a transcript before ending the user's turn
because the downstream LLM needs the user's words recorded in context
— but that wait is pure latency in pipelines using local turn
detection to drive a realtime service, which consumes user audio
directly.

Set `wait_for_transcript_to_end_user_turn=False` on
`LLMUserAggregatorParams` to turn this on. With that single flag the
aggregator:

- drops `TranscriptionUserTurnStartStrategy` from the start strategies
  (so late-arriving realtime transcripts don't trigger new turns),
- sets `wait_for_transcript=False` on any stop strategy that supports
  it (so the turn ends on the audible end of the turn, without
  waiting for a transcript),
- fires `on_user_turn_stopped` on the audible end of the turn with
  empty `content` (since the transcript hasn't arrived), and
- defers the context flush until the transcript arrives or a backstop
  timer fires.

A new `on_user_turn_message_finalized` event fires when the user's
message has been written to context. In the default mode it
coincides with `on_user_turn_stopped`; in the delayed-transcript mode
it fires later. Consumers that want the populated transcript should
subscribe to `on_user_turn_message_finalized` — it's the event that
always carries the user message, regardless of mode.

Strategy mutations are logged: loudly when the user passed their own
strategies (we're overwriting parts of their config), quietly
otherwise. The strategy-level `wait_for_transcript` parameter on
`TurnAnalyzerUserTurnStopStrategy` and `SpeechTimeoutUserTurnStopStrategy`
remains exposed for advanced cases.

The example `realtime-gemini-live-local-vad.py` demonstrates the full
pattern.
2026-05-15 13:49:16 -04:00
Paul Kompfner
6d21507e95 user turn stop strategies: don't always wait for transcripts
Until now, both TurnAnalyzerUserTurnStopStrategy and
SpeechTimeoutUserTurnStopStrategy waited for at least one transcript
before ending the user turn. That's the right behavior for cascaded
pipelines, where the downstream LLM can't respond until the user's
words are recorded in its context — but it's pure latency in pipelines
using local turn detection to drive a realtime service like Gemini
Live.

Add a `require_transcript: bool | None = None` parameter to both
strategies. When None (default), it infers from whether an
STTMetadataFrame has been seen — a proxy for "does the downstream LLM
need the transcript in context?". Explicit True/False overrides the
heuristic.

When a transcript isn't required, the strategies also skip the
STT-waiting timeout in the VAD-stopped handler, so the user turn ends
as soon as the analyzer (or speech timer) concludes the turn is
complete.
2026-05-13 15:45:51 -04:00
Mark Backman
5fef239b68 Merge pull request #4450 from pipecat-ai/mb/gpt-realtime-whisper
Default OpenAI Realtime transcription to gpt-realtime-whisper
2026-05-13 09:48:33 -04:00
Filipi da Silva Fuchter
9148e307cc Merge pull request #4464 from pipecat-ai/filipi/nvidia_sagemaker
NVidia sagemaker - TTS and STT services
2026-05-13 07:53:26 -03:00
Filipi da Silva Fuchter
703d23b658 Update examples/voice/voice-nvidia-sagemaker.py
Co-authored-by: Mark Backman <mark@daily.co>
2026-05-13 06:36:57 -04:00
Filipi da Silva Fuchter
227ba288da Update examples/voice/voice-nvidia-sagemaker.py
Co-authored-by: Mark Backman <mark@daily.co>
2026-05-13 06:36:45 -04:00
Mark Backman
3e8c5c08f4 Clarify realtime settings update condition 2026-05-12 17:48:53 -04:00
Mark Backman
644030584f Centralize OpenAI audio constants 2026-05-12 17:48:53 -04:00
filipi87
0740021ff4 Removing changelog for sanitize_text_for_tts 2026-05-12 18:29:35 -03:00
filipi87
68f265fa62 Fixing ruff format. 2026-05-12 18:28:14 -03:00
filipi87
b9f052079d Removing sanitize_text_for_tts 2026-05-12 18:22:15 -03:00
filipi87
130bb7371c Removing sanitize_text_for_tts 2026-05-12 18:21:47 -03:00
filipi87
5d61763987 Refactoring how we are reconnecting the STT. 2026-05-12 18:20:19 -03:00
filipi87
7984556692 Fixing typecheck. 2026-05-12 18:00:07 -03:00
filipi87
bea9e4b3ba New example voice-nvidia-sagemaker.py 2026-05-12 17:44:11 -03:00
Mark Backman
19df443500 Merge pull request #4471 from pipecat-ai/mb/fix-gstreamer-pyright-import 2026-05-12 16:34:48 -04:00
Mark Backman
07f241143b Merge pull request #4469 from pipecat-ai/mb/remove-vad-analyzer-runner-utils-docstring 2026-05-12 16:34:27 -04:00
Mark Backman
2fdb9bbf42 Merge pull request #4462 from pipecat-ai/mb/cartesia-sonic-3.5 2026-05-12 16:34:04 -04:00
filipi87
0146947b68 Addressing the comments left in the PR review. 2026-05-12 17:12:19 -03:00
Mark Backman
e2bfa6352f Add changelog for #4450 2026-05-12 15:20:57 -04:00
Mark Backman
abd28e2ac1 Update OpenAI realtime transcription default 2026-05-12 15:20:57 -04:00
kompfner
88deebbf5f Merge pull request #4472 from pipecat-ai/pk/default-gpt-realtime-2
Switch OpenAIRealtimeLLMService default model to gpt-realtime-2
2026-05-12 15:17:12 -04:00
filipi87
c2bdc1aada Fixing metrics and adding extra guard after sanitization. 2026-05-12 16:11:01 -03:00
Paul Kompfner
fc0589e8f1 Switch OpenAIRealtimeLLMService default model to gpt-realtime-2 2026-05-12 14:57:59 -04:00
kompfner
67f8d34e9f Merge pull request #4470 from pipecat-ai/pk/gpt-realtime-2-reasoning-effort
Add reasoning support to OpenAIRealtimeLLMService for gpt-realtime-2
2026-05-12 14:43:39 -04:00
kompfner
d3b8710720 Merge pull request #4465 from pipecat-ai/pk/gpt-realtime-2
Handle gpt-realtime-2 multi-output-item audio responses
2026-05-12 14:30:15 -04:00
Mark Backman
86e2aa85d3 Fix GStreamer pipeline source pyright import 2026-05-12 14:16:36 -04:00
Paul Kompfner
b89500256d Drop debug logging added while investigating multi-output-item audio 2026-05-12 14:05:16 -04:00
Paul Kompfner
a52bdef32b Add reasoning support to OpenAIRealtimeLLMService for gpt-realtime-2 2026-05-12 13:55:19 -04:00
Mark Backman
afd9fc5fdf Remove vad_analyzer from create_transport docstring example 2026-05-12 13:50:17 -04:00
filipi87
7f98dba925 Changelog files for the new nvidia features. 2026-05-12 14:43:12 -03:00
filipi87
6a27ed35b1 Fixing the Bidi client to accept None. 2026-05-12 12:19:30 -03:00
filipi87
a34864d643 Fixed ruff, pyright, and test_service_init failures 2026-05-12 11:39:52 -03:00
Paul Kompfner
007fa3a3a8 Handle gpt-realtime-2 multi-output-item audio responses
A single Realtime API response can now contain more than one audio item
(observed with gpt-realtime-2), and the first item's audio.done can
arrive after deltas from the second have started arriving. Deltas still
arrive strictly in playback order across items, so we keep forwarding
them as received — matching OpenAI's reference implementation.

Adjusted OpenAIRealtimeLLMService so a multi-item response is treated as
one continuous TTS turn:

- _handle_evt_audio_delta: on item switch, advance the tracked item in
  place (reset total_size) without emitting another TTSStartedFrame.
  Truncation now always targets the latest item.
- _handle_evt_audio_done: debug-trace only; no longer pushes
  TTSStoppedFrame.
- _handle_evt_response_done: pushes a single TTSStoppedFrame per turn,
  bookending the audio with the Started pushed on the first delta.

Added tests covering single-item, overlapping multi-item, non-overlapping
multi-item, and interrupt-during-multi-item (last-item-wins truncation).
2026-05-12 10:34:50 -04:00
filipi87
5dd7413c00 Nvidia Sagemaker Nemotron ASR STT service 2026-05-12 11:16:00 -03:00
filipi87
8e0a338d96 Nvidia Sagemaker Magpie TTS service 2026-05-12 11:15:42 -03:00
Mark Backman
d65aee9181 Add changelog for #4462 2026-05-11 17:34:00 -04:00
Mark Backman
1755016679 Update default Cartesia TTS model to sonic-3.5 2026-05-11 17:33:40 -04:00
Mark Backman
b7f6298601 Merge pull request #4461 from pipecat-ai/mb/security-vuln-2025-05-11
Update uv.lock for urllib3 and langchain-core
2026-05-11 15:58:05 -04:00
Mark Backman
396873ac7e Merge pull request #4460 from pipecat-ai/mb/codex-skills
Add Codex skills and AGENTS.md
2026-05-11 15:57:49 -04:00
Mark Backman
5b33964a1b Update uv.lock for urllib3 and langchain-core 2026-05-11 15:51:01 -04:00
Mark Backman
8b37cd1d3a Add agent-neutral repository instructions 2026-05-11 15:43:43 -04:00
Mark Backman
7a2b667fa1 Add Codex skill symlinks 2026-05-11 15:27:49 -04:00
Mark Backman
ee8c607315 Merge pull request #4452 from pipecat-ai/mb/cleanup-frontmatter
Add cleanup skill frontmatter
2026-05-11 09:33:44 -04:00
Aleix Conchillo Flaqué
71578e7151 Merge pull request #4449 from pipecat-ai/aleix/base-object-task-manager
Move create_task and cancel_task from FrameProcessor to BaseObject
2026-05-10 20:36:54 -07:00
Aleix Conchillo Flaqué
77058b01c4 Add changelog for #4449 2026-05-10 20:34:52 -07:00
Aleix Conchillo Flaqué
4f85e7c089 Fix pyright cr_code access on Coroutine in BaseObject.create_task
`collections.abc.Coroutine` doesn't expose `cr_code`/`co_name`; only
native coroutine objects do. Use `getattr` chains so pyright is happy
and any non-native awaitable falls back to a generic task name instead
of crashing.
2026-05-10 20:34:52 -07:00
Aleix Conchillo Flaqué
15531c8112 Wire TaskObserver via setup() instead of constructor
TaskObserver previously took a TaskManager in __init__ and reached into
it directly. Since BaseObject now provides task_manager / create_task /
cancel_task, drop the constructor argument and call
`observer.setup(task_manager)` from PipelineTask._setup() before
starting it.
2026-05-10 20:34:52 -07:00
Mark Backman
b9e8f13105 Add cleanup skill frontmatter 2026-05-09 12:30:20 -07:00
Aleix Conchillo Flaqué
784667bad2 Use inherited create_task/cancel_task in PipelineTask
PipelineTask owns its TaskManager but is itself a BaseObject, so it
inherits create_task/cancel_task. Replace the explicit
self._task_manager.create_task(coro, f"{self}::name") call sites with
self.create_task(coro, "name") for consistency with other BaseObject
subclasses.
2026-05-08 15:03:44 -07:00
Aleix Conchillo Flaqué
33db71ec32 Call super().setup() in PipelineTask to honor BaseObject contract
PipelineTask owns its TaskManager (still constructed in __init__ since
TaskObserver needs it eagerly). Adding the explicit
`await super().setup(self._task_manager)` in `_setup()` formalizes the
BaseObject lifecycle so any future wiring added to BaseObject.setup is
picked up automatically.
2026-05-08 15:03:44 -07:00
Aleix Conchillo Flaqué
dc035df0aa Use inherited create_task/cancel_task in PipelineTask
PipelineTask owns its TaskManager but is itself a BaseObject, so it
inherits create_task/cancel_task. Replace the explicit
self._task_manager.create_task(coro, f"{self}::name") call sites with
self.create_task(coro, "name") for consistency with other BaseObject
subclasses.
2026-05-08 15:03:44 -07:00
Aleix Conchillo Flaqué
df1b071a13 Move create_task and cancel_task from FrameProcessor to BaseObject
Lift the task manager wiring (`_task_manager`, `task_manager` property,
`create_task`, `cancel_task`, and `setup(task_manager)`) up to
`BaseObject`. Owners propagate the task manager to their child
`BaseObject`s via `await child.setup(task_manager)`, matching the
existing convention.

Removes duplicated `_task_manager` / `task_manager` property / setup
implementations from `FrameProcessor`, `FrameProcessorMetrics`,
`UserIdleController`, `UserTurnController`,
`BaseUserTurnStartStrategy`, and `BaseUserTurnStopStrategy`.
2026-05-08 15:03:44 -07:00
kompfner
95bcebe774 Merge pull request #4448 from pipecat-ai/pk/gemini-live-async-tool-support
feat: support cancel_on_interruption=False on Gemini Live (Gemini 2.x)
2026-05-08 16:57:32 -04:00
Paul Kompfner
5509377344 fix(gemini-live-vertex): disable NON_BLOCKING tools
GeminiLiveVertexLLMService overrides _supports_non_blocking_tools to
return False — Vertex AI's Gemini Live endpoint doesn't yet accept the
NON_BLOCKING behavior field on function declarations or the scheduling
field on FunctionResponse, and sending either breaks tool calling.

Effect: function declarations sent to Vertex no longer carry
NON_BLOCKING; FunctionResponses no longer carry scheduling: WHEN_IDLE.
Users registering a function with cancel_on_interruption=False against
Vertex get the same one-time logger.error + push_error the base class
surfaces on Gemini 3.x.
2026-05-08 16:54:15 -04:00
Paul Kompfner
e21180b962 refactor(gemini-live): use inherited LLMService._function_is_async
The same registry-lookup helper was hoisted to LLMService in #4447, so
drop the local duplicate. Behavior unchanged.
2026-05-08 16:42:54 -04:00
Paul Kompfner
53922819ed refactor: explicit kind=='final' check in async-tool routing (Gemini Live)
Mirrors the same change applied to AWSNovaSonicLLMService and
OpenAIRealtimeLLMService in #4441 / GrokRealtimeLLMService in #4447:
replaces the implicit "final happens last" pattern in
_process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block, plus a trailing defensive
`continue` so async-tool messages with an unrecognized kind don't fall
through to the regular tool-result handling block.
2026-05-08 16:42:54 -04:00
Paul Kompfner
6faeffb884 chore: add changelog entry for cancel_on_interruption=False on Gemini Live 2026-05-08 16:42:54 -04:00
Paul Kompfner
9086a46900 feat(gemini-live): support cancel_on_interruption=False on supported models
Honors cancel_on_interruption=False on Gemini Live for models that support
Gemini's NON_BLOCKING tool mechanism (Gemini 2.x at the time of writing).
Function declarations registered via register_function(...,
cancel_on_interruption=False) are sent with behavior: NON_BLOCKING so the
conversation continues while the tool runs; the matching FunctionResponse
carries scheduling: WHEN_IDLE so the result lands at a graceful pause
rather than mid-sentence. Synchronous (default) tools stay BLOCKING —
applying NON_BLOCKING uniformly produced filler responses like "let me
look that up for you" on regular calls, since the model knew it would
have an opportunity to keep talking while waiting.

A new _supports_non_blocking_tools property gates the flow. On models
that don't support it (currently Gemini 3.x), the service falls back to
plain blocking behavior and surfaces a one-time error + ErrorFrame the
moment async-tool messages first appear in the context, explaining that
the flag's intent is not achievable.

Caveat (Gemini 2.5): an intermittent server-side 1008 "Operation is not
implemented" error can fire when realtime input arrives during a pending
tool call. We auto-reconnect, but the user may need to repeat what they
were saying. The proposed mitigation
(https://discuss.ai.google.dev/t/gemini-live-api-websocket-error-1008-operation-is-not-implemented-or-supported-or-enabled/114644/56)
of gating realtime input during pending tool calls is fundamentally
incompatible with NON_BLOCKING tool calling, so we don't apply it.
2026-05-08 16:42:54 -04:00
Paul Kompfner
1a4a6f4edf refactor(gemini-live): bring tool-result handling in line with the canonical realtime pattern
Lays groundwork for cancel_on_interruption=False support on Gemini Live by
restructuring _process_completed_function_calls to match the shape used by
AWSNovaSonicLLMService and OpenAIRealtimeLLMService in #4441: a single-pass
forward iteration over raw context messages that detects async-tool
messages via async_tool_messages.parse_message and routes them — started
skipped silently, intermediate logged-as-error and surfaced via push_error,
final delivered via the formal FunctionResponse channel.

Replaces the prior two-pass structure that went through the adapter for
sync results — the service now uses a lightweight self._tool_call_id_to_name
map (populated when the model issues tool calls) for the name lookup the
adapter used to provide. Extracts a new GeminiLLMAdapter.to_function_response_dict
static method for the dict-coercion logic that wraps non-dict tool returns
as {value: <result>} for Gemini's FunctionResponse.response field; the
adapter's existing inline copy in _from_standard_message uses it too.

Example consolidation:

- Folds realtime-gemini-live-function-calling.py into the base
  realtime-gemini-live.py example so the base exercises function calling
  out of the box (matching realtime-openai.py and realtime-aws-nova-sonic.py).
- Renames realtime-gemini-live-vertex-function-calling.py to
  realtime-gemini-live-vertex.py, mirroring the consolidation.
- Adds realtime-gemini-live-async-tool.py.
- Updates scripts/evals/run-release-evals.py for the renames.

This commit alone doesn't make cancel_on_interruption=False fully work on
Gemini Live — additional investigation is pending. This is foundational
work to be built on.
2026-05-08 16:42:54 -04:00
kompfner
ff80cde44e Merge pull request #4447 from pipecat-ai/pk/realtime-async-tool-support-followup
fix: extend cancel_on_interruption=False regression fix to remaining realtime services
2026-05-08 16:40:32 -04:00
Paul Kompfner
fb74f7714c refactor(ultravox): name async-tool result strings after the kinds they serve
Renames _ASYNC_TOOL_PLACEHOLDER_RESULT to _ASYNC_TOOL_STARTED_RESULT to
match the kind names from async_tool_messages, and lifts the inline
"[Async tool result for tool_call_id=...] {result}" into a sibling
_ASYNC_TOOL_FINAL_RESULT_TEMPLATE constant for the same reason.
2026-05-08 16:35:14 -04:00
Paul Kompfner
4864eddbc7 feat(ultravox): support cancel_on_interruption=False via placeholder + final-as-text
Replaces the prior "log a warning and skip" approach with actual handling
of async-tool messages on Ultravox.

The catch with Ultravox is that its API freezes the conversation between
client_tool_invocation and the matching client_tool_result — there's no
"keep talking while the tool runs" channel like NON_BLOCKING on Gemini
or function_call_output-without-blocking on OpenAI Realtime. So:

- When the model invokes an async-registered function (cancel_on_inter
  ruption=False), the service immediately ships a placeholder
  client_tool_result that tells the model "the actual result isn't
  ready yet; a follow-up will arrive shortly; keep the conversation
  going". This unfreezes the conversation. The placeholder is sent
  from _handle_tool_invocation, since the started async-tool message
  doesn't reach the context-frame path until later.
- When the real tool finishes, the final async-tool message lands in
  the context. _handle_context now forward-iterates and routes
  async-tool messages: started is a no-op (placeholder already sent),
  intermediate is logged-as-error and dropped (matching the other
  realtime services), and final is injected as user-side text via
  user_text_message with bracketed framing — the only mechanism
  Ultravox offers for adding non-tool input mid-conversation.

Hoists the registry-lookup helper to LLMService as
_function_is_async(name) so future services can use the same pattern
without re-implementing it.

Adds an async-tool example file for Ultravox modeled on the existing
ones for the other realtime services.
2026-05-08 16:20:40 -04:00
kompfner
d831930bd0 Merge pull request #4441 from pipecat-ai/pk/realtime-async-tool-support
fix: restore cancel_on_interruption=False support in AWS Nova Sonic and OpenAI Realtime
2026-05-08 15:53:20 -04:00
Paul Kompfner
2c65713c99 refactor: explicit kind=='final' check in async-tool routing (Grok)
Mirrors the same change applied to AWSNovaSonicLLMService and
OpenAIRealtimeLLMService in #4441: replaces the implicit "final happens
last" pattern in _process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block, plus a trailing defensive
`continue` so async-tool messages with an unrecognized kind don't fall
through to the regular tool-result handling block.
2026-05-08 15:45:05 -04:00
Paul Kompfner
b14a03d01f fix: extend cancel_on_interruption=False regression fix to remaining realtime services
Applies the same async-tool message routing introduced for AWSNovaSonicLLMService
and OpenAIRealtimeLLMService to additional realtime LLM services where the
flag's intent ("keep talking while the tool runs") is achievable:

- GrokRealtimeLLMService (xAI Realtime — also benefits the deprecated Grok
  alias since it re-exports the xAI module)
- AzureRealtimeLLMService picks up the fix transitively by inheriting from
  OpenAIRealtimeLLMService — no code change needed.

GrokRealtimeLLMService's _process_completed_function_calls now matches
the canonical pattern: skip LLMSpecificMessage, detect async-tool messages
via parse_message and route them — started skipped silently, intermediate
logged as an error and surfaced via push_error, final delivered through
the same channel as a synchronous result.

UltravoxRealtimeLLMService instead gets a one-time warning when async-tool
messages appear in the context. The Ultravox API freezes the conversation
during tool execution
(https://docs.ultravox.ai/tools/async-tools#custom-tool-timeouts), so the
flag's "keep talking while the tool runs" intent isn't achievable there —
applying the same code pattern would mislead users into expecting a UX
Ultravox can't deliver. Surfacing a clear warning is the right behavior
until Ultravox grows true async tool support.

Adds async-tool example files for Grok and Azure modeled on the existing
Nova Sonic / OpenAI Realtime ones (10s simulated network delay, weather
tool registered with cancel_on_interruption=False).

Two services remain excluded:

- GeminiLiveLLMService — the async-tool path needs deeper investigation.
- InworldRealtimeLLMService — appears to have a pre-existing problem
  with even simple synchronous tool calling on its Realtime API (the
  request reaches the server fine, but response generation fails with a
  generic server_error).
2026-05-08 15:43:53 -04:00
Paul Kompfner
ad0f0a1294 refactor: explicit kind=='final' check in async-tool routing
Replaces the implicit "final happens last" pattern in
_process_completed_function_calls with an explicit
`if async_payload.kind == "final":` block in both AWSNovaSonicLLMService
and OpenAIRealtimeLLMService. Adds a trailing defensive `continue` so
async-tool messages with an unrecognized kind don't fall through to the
regular tool-result handling block — clearer at the call site, and safer
against future additions to AsyncToolMessageKind.
2026-05-08 15:43:37 -04:00
Paul Kompfner
72d0fb418a fix: restore cancel_on_interruption=False support in AWS Nova Sonic and OpenAI Realtime
Before the new async-tool mechanism landed, AWSNovaSonicLLMService and
OpenAIRealtimeLLMService honored cancel_on_interruption=False by simply
not cancelling in-flight function calls on interruption — the eventual
result then flowed through the same channel as any synchronous tool
result. The new mechanism (which appends started/intermediate/final
messages to the LLM context as the underlying task progresses) broke
that path: the realtime services didn't know how to interpret those
messages, and the eventual result was never delivered to the provider.

Restore the flag's behavior by teaching both services to detect
async-tool messages in the context and route them appropriately:

- started → skipped silently. The provider already issued the tool call
  and natively awaits a result; nothing to send for the started marker.
- final → delivered via the formal tool-result channel. Same path as a
  synchronous tool result, just delayed.

Streamed intermediate results (FunctionCallResultProperties(is_final=
False)) are not supported on these realtime services. An intermediate
result is logged as an error and surfaced via push_error, then dropped.
Use a non-realtime LLM service if a tool needs to stream intermediate
results. (Docstrings on register_function, register_direct_function, and
FunctionCallResultProperties.is_final updated to call this out.)

A new shared module pipecat.processors.aggregators.async_tool_messages
is the single source of truth for the on-the-wire payload shape: the
aggregator uses its build_*_message functions when injecting messages,
and the realtime services use parse_message when scanning the context.

Adds two example files exercising a network-delayed weather tool with
each service. The plain realtime-aws-nova-sonic.py example is also
reverted to a synchronous tool call now that the async variant lives in
its own file.

Similar fixes for other realtime services are forthcoming.
2026-05-08 09:33:06 -04:00
Aleix Conchillo Flaqué
94a94ee28c Merge pull request #4405 from pipecat-ai/aleix/user-turn-inference-event
Split user-turn-stop into inference-triggered and finalized events
2026-05-07 17:51:57 -07:00
Mark Backman
c46ede8335 Use Sphinx .. deprecated:: directive for deprecated aggregator params
Aligns deprecation docstrings on LLMUserAggregatorParams and
LLMAssistantAggregatorParams with CONTRIBUTING.md conventions:
present-tense parameter descriptions plus a `.. deprecated:: 1.2.0`
directive noting replacement and 2.0.0 removal. Also adds a runtime
DeprecationWarning for `user_turn_completion_config`, which previously
had no warning despite being deprecated.
2026-05-07 17:49:00 -07:00
Mark Backman
457a68ce64 Correct docstrings and comments regarding incomplete_long_timeout duration, 10 sec 2026-05-07 17:47:41 -07:00
Aleix Conchillo Flaqué
b78cecf7b2 Rename UserTurnCompletedFrame to UserTurnInferenceCompletedFrame
The old name overlapped semantically with `UserStoppedSpeakingFrame`:
both could be read as "the user's turn is done." They're at different
layers — `UserStoppedSpeakingFrame` is the acoustic stop signal,
while this frame is the post-judgment "inference about the turn is
now complete (turn is semantically final)" signal emitted by the LLM
mixin (on ✓), an end-of-turn classifier, or a custom producer.

The new name pairs naturally with the existing
`on_user_turn_inference_triggered` event vocabulary and removes the
ambiguity with `UserStoppedSpeakingFrame`.
2026-05-07 17:47:41 -07:00
Aleix Conchillo Flaqué
952dddca8b Replace llm_completion_user_turn_stop_strategies() with FilterIncompleteUserTurnStrategies
Wrap the detector chain with `deferred(...)` and append the LLM
completion gate via a `UserTurnStrategies` specialization rather than
a free-standing helper, mirroring the existing
`ExternalUserTurnStrategies` pattern. The class lives next to other
strategy containers in `pipecat.turns.user_turn_strategies`, so users
discover it where they're already configuring `user_turn_strategies`.

The deprecated `filter_incomplete_user_turns` flag now rewires
through `FilterIncompleteUserTurnStrategies` under the hood, keeping
the migration path identical to before. `deferred(...)` stays public
as the explicit escape hatch for non-default compositions.
2026-05-07 17:47:39 -07:00
Aleix Conchillo Flaqué
e3e90d38aa Preserve full user transcript across multiple inferences in one turn
When a stop-strategy chain splits inference-triggered from
finalization (e.g. `LLMTurnCompletionUserTurnStopStrategy` gating a
deferred detector), more than one inference can fire inside a single
user turn — each adds the new transcription segment to the context.
Previously each inference overwrote `_pending_user_turn_aggregation`,
so the eventual `on_user_turn_stopped` event surfaced only the
segment from the last inference, dropping anything the user said
before it.

Concatenate each segment into `_full_user_turn_aggregation` instead
of overwriting, and combine that running buffer with any post-final-
inference segment when emitting the public event.
2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
d1c8162b0c Route turn-completion markers through LLMMarkerFrame
Add an `LLMMarkerFrame(DataFrame)` for sideband LLM markers that need
to be persisted to context but should not flow through the standard
text path (TTS, transcript). The frame carries an
`append_to_context_immediately` flag so the assistant aggregator can
either commit the marker as a stand-alone message (○ / ◐) or merge it
with the upcoming aggregation as a prefix on the response (✓).

`UserTurnCompletionLLMServiceMixin` now emits `LLMMarkerFrame` instead
of pushing the marker as `LLMTextFrame(skip_tts=True)`, which fixes
the case where an incomplete-turn marker (○ / ◐) was aggregated by
the assistant aggregator but never committed to the context because
the assistant turn lifecycle didn't run to completion (no spoken
response, no `LLMFullResponseEndFrame`-driven `push_aggregation`).

The frame is intentionally generic so other components — STT services
with built-in turn signals, end-of-turn classifiers, custom
annotations — can use the same mechanism to inject sideband signals
into the assistant context.
2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
1fa0310ea8 Add changelog for #4405 2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
2281cd8359 Extract ExternalUserTurnCompletionStopStrategy as a reusable base
`LLMTurnCompletionUserTurnStopStrategy` previously bundled two
concerns: pushing `LLMUpdateSettingsFrame` on `StartFrame`, and
finalizing the turn on `UserTurnCompletedFrame`. The latter is
producer-agnostic — any component that emits `UserTurnCompletedFrame`
(STT with built-in turn detection, dedicated end-of-turn classifiers,
custom code) can drive finalization the same way.

Move the frame-handling half into a new
`ExternalUserTurnCompletionStopStrategy`. The LLM-specific subclass
now only adds the settings-frame push and inherits finalization. Mirrors
the existing `ExternalUserTurnStopStrategy` naming pattern.
2026-05-07 17:46:15 -07:00
Aleix Conchillo Flaqué
480eca42f5 Split user-turn-stop into inference-triggered and finalized events
Fixes a real bug: with `filter_incomplete_user_turns` enabled, the
smart-turn detector's tentative stop was firing `on_user_turn_stopped`
before the LLM had a chance to veto it. Observers, transcript
appenders and UI indicators received an early — and sometimes
duplicated — signal.

Decomposes the single stop concern into two events:
- `on_user_turn_inference_triggered` fires when a stop strategy has
  enough signal to start LLM inference. The aggregator pushes the
  context here, kicking off the LLM call.
- `on_user_turn_stopped` fires only when the user turn is semantically
  final. Built-in strategies fire both events at the same call site,
  preserving today's behavior for the common case.

Adds `LLMTurnCompletionUserTurnStopStrategy`, which gates
finalization on a `UserTurnCompletedFrame` (a fieldless system frame
emitted by any component judging turn completeness — currently the
`UserTurnCompletionLLMServiceMixin` on `✓`).

Adds `deferred(strategy)` / `DeferredUserTurnStopStrategy`, a thin
wrapper that forwards an inner strategy's events except
`on_user_turn_stopped`. Use this to install a stop strategy as an
inference trigger only, leaving finalization to a peer (e.g. the LLM
completion strategy).

Adds `llm_completion_user_turn_stop_strategies()` for the common
case:

    UserTurnStrategies(
        stop=llm_completion_user_turn_stop_strategies(),
    )

Deprecates `LLMUserAggregatorParams.filter_incomplete_user_turns`.
The aggregator emits a `DeprecationWarning`, wraps existing stop
strategies with `deferred(...)`, and appends
`LLMTurnCompletionUserTurnStopStrategy` automatically.
2026-05-07 17:46:09 -07:00
Mark Backman
1073510574 Merge pull request #4407 from pipecat-ai/mb/ui-agent-wire-format
feat(rtvi): add UI Agent Protocol as first-class RTVI message types
2026-05-07 20:03:41 -04:00
Mark Backman
47c05f3f30 Simplify changelog entry 2026-05-07 16:58:08 -07:00
Mark Backman
24904b89f5 Merge pull request #4443 from Anrahya/fix-gemini-tts-voice-names
fix: correct Gemini TTS voice names
2026-05-07 19:41:30 -04:00
orphis
c78977e4c7 chore: remove Gemini TTS voice name test 2026-05-08 05:03:15 +05:30
Mark Backman
f78b5f9240 Merge pull request #4446 from inworld-ai/ian/inworld-pcm
[inworld] default to using PCM encoding
2026-05-07 19:25:57 -04:00
Ian Lee
406f8b730b [inworld] default to using PCM encoding
* server returns audio bytes without headers
2026-05-07 16:05:34 -07:00
Mark Backman
7a2cec2e45 Merge pull request #4426 from marcelodiaz558/feature/elevenlabs_stt_keyterms
Add ElevenLabs STT keyterms support
2026-05-07 18:44:09 -04:00
Marcelo Díaz
edfcd6948b Add ElevenLabs STT keyterms support 2026-05-07 21:00:26 +00:00
kompfner
991ee9e0e6 Merge pull request #4404 from pipecat-ai/pk/mitigate-calls-to-missing-tools
Mitigate tool-call-related hallucination
2026-05-07 15:05:13 -04:00
Mark Backman
a696729343 Merge pull request #4439 from pipecat-ai/mb/fix-deprecation-video-out-bitrate 2026-05-07 14:42:26 -04:00
orphis
ba705e9501 chore: add changelog for Gemini TTS voice fix 2026-05-08 00:11:19 +05:30
orphis
98c370457b fix: correct Gemini TTS voice names 2026-05-08 00:09:56 +05:30
Filipi da Silva Fuchter
6189e920e1 Merge pull request #4433 from pipecat-ai/filipi/refactoring_elevenlabs
Refactoring ElevenLabs to send close_context as soon as the turn context is complete.
2026-05-07 13:10:36 -03:00
Filipi da Silva Fuchter
73625a273a Merge pull request #4440 from pipecat-ai/filipi/daily_send_message_issue
Fixing a race condition when cleaning up the daily transport.
2026-05-07 13:09:53 -03:00
filipi87
f91a55c97c Changelog entry for the fix. 2026-05-07 11:32:48 -03:00
filipi87
5f256e241c Fixing a race condition when cleaning up the daily transport. 2026-05-07 11:29:57 -03:00
Mark Backman
954f63dc7b Document deprecation docstring convention in CLAUDE.md.
Adds an explicit Code Style bullet for the `.. deprecated::` Sphinx
directive (forbidding inline `[DEPRECATED]` tags) and extends the
Docstring Example with a Pydantic params class showing the directive
inside a `Parameters:` block — the context CONTRIBUTING.md's existing
example didn't cover.
2026-05-07 10:03:43 -04:00
Mark Backman
6cc66a3df1 Update video_out_bitrate deprecation to use sphinx directive.
Replaces the inline `[DEPRECATED]` tag with a `.. deprecated:: 1.1.0`
directive per CONTRIBUTING.md docstring conventions, so the deprecation
shows up properly in the rendered docs.
2026-05-07 09:57:21 -04:00
filipi87
a445399337 Fixing a bug in the ElevenLabs TTS refactor where alignment state was reset too early mid-turn. 2026-05-07 10:10:54 -03:00
filipi87
5ed2057599 Merge branch 'main' into filipi/refactoring_elevenlabs 2026-05-07 09:32:53 -03:00
Filipi da Silva Fuchter
cacde00e26 Merge pull request #4435 from pipecat-ai/filipi/uninterruptible_frame
Refactoring TTSService to preserve uninterruptible frames.
2026-05-07 08:46:42 -03:00
Filipi da Silva Fuchter
b1b598f65e Merge pull request #4434 from pipecat-ai/filipi/fix_interruption_regression
Fix interruption blocked by slow non-uninterruptible frame in queue
2026-05-07 08:46:10 -03:00
filipi87
c48ee93892 Adding changelog entry for the fix. 2026-05-06 16:30:22 -03:00
filipi87
cf22dac171 Refactoring TTSService to preserve uninterruptible frames. 2026-05-06 16:26:45 -03:00
filipi87
36f6e22aee Adding changelog for the interruption fix. 2026-05-06 15:39:27 -03:00
filipi87
921a7a46cb Fix interruption blocked by slow non-uninterruptible frame in queue
When a non-uninterruptible frame was being processed slowly and an
uninterruptible frame was waiting in the queue, _start_interruption
skipped task cancellation. This caused interruptions to stall until
the slow frame finished, even though it had no reason to block them.

The fix: only skip cancellation when the *current* frame is
uninterruptible. Uninterruptible frames already in the queue are
preserved regardless, because __create_process_task calls
__reset_process_queue internally, which always retains them.

Fixes: https://github.com/pipecat-ai/pipecat/issues/4412
2026-05-06 15:35:43 -03:00
filipi87
fda18a9afa Adding changelog for the elevenlabs improvement. 2026-05-06 14:58:18 -03:00
filipi87
d146a7f8e0 Refactoring ElevenLabs to send close_context as soon as the turn context is complete. 2026-05-06 14:55:49 -03:00
Filipi da Silva Fuchter
90f0f7cd27 Merge pull request #4431 from pipecat-ai/filipi/tts_deadlock
Fixing TTSService deadlock.
2026-05-06 14:52:04 -03:00
Mark Backman
37376b3506 Merge pull request #4429 from pipecat-ai/mb/update-grok-default-llm-model
fix(xai): update default Grok model to grok-4.20-non-reasoning
2026-05-06 13:41:05 -04:00
Mark Backman
729418c2b7 Merge pull request #4428 from pipecat-ai/mb/deprecate-resampy
chore(audio): deprecate ResampyResampler
2026-05-06 13:40:51 -04:00
filipi87
4512038a17 Creating a changelog entry for the fix. 2026-05-06 13:36:20 -03:00
filipi87
a23baf9de6 Fixing TTSService deadlock. 2026-05-06 13:32:26 -03:00
Mark Backman
d18fe7c39c feat(rtvi): type UI accessibility snapshots 2026-05-06 11:29:19 -04:00
Mark Backman
41124dc494 refactor(rtvi): clarify UI message names 2026-05-06 11:08:25 -04:00
Filipi da Silva Fuchter
95db08646c Merge pull request #4430 from pipecat-ai/filipi/flux_audio
Implementing dynamic watchdog timeout for Deepgram Flux STT
2026-05-06 11:40:06 -03:00
filipi87
03e5ebb266 Improving watchdog_min_timeout description. 2026-05-06 11:37:18 -03:00
filipi87
5daf267c11 Adding changelogs. 2026-05-06 11:26:14 -03:00
filipi87
1cb77b422a Created a watchdog_min_timeout to allow to change the default value. 2026-05-06 11:22:37 -03:00
filipi87
0c779b4c3d Implementing dynamic watchdog timeout for Deepgram Flux STT 2026-05-06 11:01:58 -03:00
Mark Backman
138991418a docs(changelog): add 4429 entry for Grok default model update 2026-05-06 09:51:01 -04:00
Mark Backman
94e136a6b7 fix(xai): update default Grok model to grok-4.20-non-reasoning
grok-3 is being retired from the xAI API on May 15, 2026. Switch the
default to grok-4.20-non-reasoning, which xAI recommends for non-reasoning
workloads and is appropriate for real-time voice AI.
2026-05-06 09:48:39 -04:00
Mark Backman
9598e262b5 docs(changelog): add 4428 deprecation entry for ResampyResampler 2026-05-06 09:41:14 -04:00
Mark Backman
8c3521f2e4 chore(audio): deprecate ResampyResampler in favor of SOXR resamplers
Emits a DeprecationWarning on instantiation. ResampyResampler will be
removed in Pipecat 2.0 along with the default resampy and numba
dependencies.
2026-05-06 09:40:13 -04:00
Mark Backman
eda98fb13f Merge pull request #4424 from pipecat-ai/mb/revert-elevenlabs-tts-alignment
fix(elevenlabs): only use normalizedAlignment when pronunciation dict is set
2026-05-06 08:27:25 -04:00
Mark Backman
3722ee223c Merge pull request #4419 from pipecat-ai/mb/fix-changelog-entry-4416
Fix changelog filename for 4416
2026-05-05 14:50:24 -04:00
Mark Backman
2620e76dab docs(elevenlabs): clarify alignment leading-space handling 2026-05-05 14:49:41 -04:00
Mark Backman
2447db766e docs(changelog): add 4424 entry for elevenlabs alignment selection fix 2026-05-05 14:49:41 -04:00
Mark Backman
61a81ed87b fix(elevenlabs): use alignment by default, normalizedAlignment only with pronunciation dicts
PR #4344 unconditionally switched to normalizedAlignment to fix garbled
words with pronunciation dictionaries (#4316). But normalizedAlignment
returns the post-normalized form of what was spoken - including
romanization of non-Latin scripts (Chinese rendered as pinyin), which
ends up in the LLM context and degrades subsequent turns.

Gate the switch on pronunciation_dictionary_locators being configured.
Adds a _select_alignment helper with preferred-with-fallback (both
fields are nullable per the API schema), used by both the WebSocket
and HTTP services. Tests cover dictionary mode, default mode, fallback
when preferred is missing or null, and HTTP field-name variants.
2026-05-05 14:49:41 -04:00
Mark Backman
735cd09c7e Merge pull request #4422 from cshape/tts-2
feat(inworld): default to inworld-tts-2
2026-05-05 14:00:04 -04:00
Paul Kompfner
2616076bec Add deterministic dev-error demo example
``examples/function-calling/function-calling-missing-handler.py``
demonstrates the missing-handler path by deliberately advertising a
tool to the LLM without registering its handler — what happens when a
developer forgets to call ``register_function``. Exercises the new
``logger.error`` severity end-to-end without needing to coax the LLM
into hallucinating.
2026-05-05 13:08:00 -04:00
Paul Kompfner
40667e50fc Add changelog for #4404 2026-05-05 13:03:49 -04:00
Paul Kompfner
e06e0c0282 Mitigate tool-call-related hallucination
When tools change mid-conversation, LLMs can produce a few different
flavors of tool-call-related hallucination: calling tools that have
been removed, avoiding tools that have been re-added, or hallucinating
output (made-up answers or tool-call-shaped non-tool-calls) when tools
are unavailable.

This change introduces an opt-in ``add_tool_change_messages`` flag on
the LLM aggregators (preferred entry point: ``LLMContextAggregatorPair(
..., add_tool_change_messages=True)``) that appends a developer-role
message to the context whenever ``LLMSetToolsFrame`` changes the set
of advertised standard tools. Helps the LLM stay coherent across tool
changes by spelling out exactly what just became available or
unavailable. Both aggregators participate; whichever handles the
frame first wins, and the other (if any) sees an empty diff against
the shared context and stays silent — order-independent regardless of
whether the frame flows downstream or upstream.

Also tightens the existing missing-handler path (introduced in #4301):

- Reworded the terminal tool result to a neutral "The function
  ``X`` is not currently available." (overridable via
  ``LLMService.MISSING_FUNCTION_CALL_MESSAGE_TEMPLATE``). Previously
  read "Error: function 'X' is not registered."
- Logs at the call site now distinguish developer error (tool
  advertised but no handler registered → ``logger.error``) from
  hallucination (tool not advertised → ``logger.warning``).

Includes a manual validation harness
(``examples/features/features-add-tool-change-messages.py``) that
exercises the new ``add_tool_change_messages`` mitigation by flipping
tool availability on a turn counter so its effect can be observed
end-to-end with the flag on vs. off.
2026-05-05 13:02:43 -04:00
Cale Shapera
84eefba4df docs: add changelog fragment for tts-2 default flip 2026-05-05 09:20:16 -07:00
Cale Shapera
fe3af5d9f7 feat(inworld): default to inworld-tts-2
Flip the default Inworld TTS model from inworld-tts-1.5-max to
inworld-tts-2 across:
- InworldHttpTTSService (HTTP)
- InworldTTSService (WebSocket)
- InworldRealtimeLLMService (cascade Realtime)

inworld-tts-1.5-max and inworld-tts-1.5-mini remain valid options;
existing users can pin the prior model explicitly via the model
setting. Docstring examples updated to reference the new default.
2026-05-05 09:20:16 -07:00
Mark Backman
7729eecfe4 Fix changelog filename for 4416 2026-05-04 21:54:58 -04:00
Mark Backman
fa31a2fd63 Merge pull request #4416 from pipecat-ai/mb/pr-4333-aws-credentials-review
feat(aws): add shared credential resolver with boto3 chain fallback
2026-05-04 21:48:33 -04:00
Mark Backman
678d40e102 docs(changelog): add 4333 entries for AWS credential resolver expansion 2026-05-04 19:30:37 -04:00
Mark Backman
8becafee38 fix(aws): use shared credential resolver in Polly, Bedrock, AgentCore
Polly TTS, Bedrock LLM, and AgentCore previously did
`arg or os.getenv("AWS_...")` and handed the result straight to
aioboto3.  When only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY`
was set, aioboto3 received a half-populated kwarg and errored instead of
falling through to the boto3 credential provider chain (instance
profiles, IRSA, ECS task roles, SSO, etc.).

Route credential resolution through the shared `resolve_credentials()`
helper introduced for AWS Transcribe so all four services follow the
same `explicit → env → boto3 chain` fallback.  Add an
`AWSCredentials.to_boto_kwargs()` method to bridge the dataclass field
names (`access_key`, `secret_key`) to the aioboto3 kwargs
(`aws_access_key_id`, `aws_secret_access_key`).

No public API changes.  Behaviour is identical for fully-explicit and
fully-env-var configurations; partial env vars now correctly trigger
the chain instead of erroring.
2026-05-04 19:23:53 -04:00
Mark Backman
83190d38e9 Merge pull request #4414 from pipecat-ai/mb/fix-ttsspeakframe-assistant-turn-stopped 2026-05-04 18:12:33 -04:00
Mark Backman
7519c26ac5 Merge pull request #4417 from pipecat-ai/mb/resolve-runner-filepath 2026-05-04 18:09:34 -04:00
Mark Backman
b2b7e9ee6f Merge pull request #4415 from pipecat-ai/mb/fix-elevenlabs-leading-spaces-flash 2026-05-04 18:08:31 -04:00
Mark Backman
e864d5778a ci: install runner extra for the coverage job 2026-05-04 16:44:47 -04:00
Mark Backman
89f10dd9a1 test: drop webrtc-dependent test, remove webrtc extra from CI 2026-05-04 16:42:05 -04:00
Mark Backman
f67e3ef0b2 ci: install runner and webrtc extras for the test job 2026-05-04 16:29:58 -04:00
Mark Backman
5b087d6aeb docs: add changelog for #4417 2026-05-04 16:22:26 -04:00
Mark Backman
e780f759d0 fix: validate download path containment in runner
Resolve and contain the user-supplied filename before serving it from
the runner's /files endpoint. Also raise a 404 (instead of returning
None) when the downloads folder is unset, and use the resolved
basename for Content-Disposition.
2026-05-04 16:20:27 -04:00
Daniel Wirjo
35153de28e feat(aws): add shared credential resolver with boto3 chain fallback
AWS Transcribe STT previously only supported credentials via explicit
parameters or environment variables. Services running with IAM roles
(EKS pod roles, IRSA, ECS task roles, EC2 instance profiles) or SSO
couldn't use Transcribe without exporting static credentials.

Changes:
- Add resolve_credentials() to utils.py providing a standard fallback
  chain: explicit params → environment variables → boto3 credential
  provider chain (instance profiles, IRSA, pod roles, SSO, etc.)
- Add AWSCredentials dataclass for type-safe credential passing
- Update AWSTranscribeSTTService to use resolve_credentials() instead
  of manual os.getenv() calls
- The boto3 fallback is only attempted when both access key and secret
  key are unresolved, avoiding replacement of explicitly provided creds
- boto3 is imported lazily inside the function to avoid hard dependency
  for services that don't need the fallback chain
- Add 7 unit tests covering the credential resolution chain

The Bedrock LLM and Polly TTS services already support the full
credential chain via aioboto3.Session() and are not modified.

Related to #4197
2026-05-04 15:40:06 -04:00
Mark Backman
9886d72f5e Add changelog for PR #4415 2026-05-04 15:18:15 -04:00
Mark Backman
90e6b51acd Fix ElevenLabs alignment chunk spacing 2026-05-04 15:15:37 -04:00
Mark Backman
61acdba3ae docs: add changelog entry for #4414 2026-05-04 10:43:52 -04:00
Mark Backman
f1a3ee97de fix: surface TTSSpeakFrame greetings in on_assistant_turn_stopped
Two issues were causing TTSSpeakFrame(append_to_context=True) greetings to
silently lose their trailing words and never fire on_assistant_turn_stopped:

- LLMAssistantPushAggregationFrame was emitted without a PTS, so the
  transport routed it through the audio (sync) queue while word-level
  TTSTextFrames travel through the clock queue. The aggregation could reach
  the assistant aggregator before the final words, leaving them orphaned
  in the buffer. Stamp the frame with `_word_last_pts + 1` when there are
  word timestamps so it can't overtake them.

- The aggregator's LLMAssistantPushAggregationFrame handler called
  push_aggregation() directly, bypassing _trigger_assistant_turn_stopped.
  For TTS-only flows there is no LLMFullResponseStartFrame, so the turn
  start timestamp was never set and on_assistant_turn_stopped never fired.
  Open a turn (if needed) and trigger stopped from the handler.

Fixes #4264.
2026-05-04 10:41:22 -04:00
Mark Backman
b363b91d12 Merge pull request #4401 from pipecat-ai/mb/grok-realtime-model
fix(xai/realtime): pass model as query param on connect
2026-05-04 09:44:33 -04:00
Mark Backman
43abca0b06 feat(rtvi): add UI Agent Protocol as first-class RTVI message types
The UI Agent Protocol lets server-side AI agents observe and drive
a GUI app on the client side through structured RTVI messages.
Five new top-level RTVI types in kebab-case, in line with the rest
of the protocol:

  ui-event         client → server  (named event with payload)
  ui-command       server → client  (named command with payload)
  ui-snapshot      client → server  (accessibility tree of the page)
  ui-cancel-task   client → server  (cancel an in-flight task group)
  ui-task          server → client  (task lifecycle envelope)

Each ships paired ``*Data`` / ``*Message`` pydantic models in
``rtvi.models``, following the existing RTVI envelope convention
(``BotReady`` / ``BotReadyData``, ``Error`` / ``ErrorData``, etc.).
Built-in command payload models (``Toast``, ``Navigate``,
``ScrollTo``, ``Highlight``, ``Focus``, ``Click``, ``SetInputValue``,
``SelectText``) ship alongside; matching default React handlers
live in ``@pipecat-ai/client-react``.

Bumps the RTVI ``PROTOCOL_VERSION`` from ``1.2.0`` to ``1.3.0``.
Purely additive: only new top-level message types are introduced;
no existing wire shapes are changed. The major-version
compatibility check on ``client-ready`` still passes for older
1.x clients, so old clients continue to connect without warning;
they simply will not exercise the new types.

The ``RTVIProcessor`` registers a new ``on_ui_message`` event
handler that fires for inbound ``ui-event`` / ``ui-snapshot`` /
``ui-cancel-task`` with the parsed Message envelope, mirroring how
``on_client_message`` works for ``client-message``.

Five new pipeline frames let pipeline observers and processors see
UI traffic the same way they see other RTVI messages, mirroring
the frame-and-event pattern used by ``client-message``:

  RTVIUICommandFrame(command_name, payload)
    Pushed by downstream code (e.g. ``pipecat-ai-subagents``'s
    bridge) to send a UI command to the client. Wrapped by the
    observer into a ``UICommandMessage`` envelope.

  RTVIUITaskFrame(data: UITaskData)
    Same shape but for ``ui-task``; wrapped into ``UITaskMessage``.
    ``UITaskData`` is a discriminated union of the four lifecycle
    kinds (group_started / task_update / task_completed /
    group_completed).

  RTVIUIEventFrame(msg_id, event_name, payload)
  RTVIUISnapshotFrame(msg_id, tree)
  RTVIUICancelTaskFrame(msg_id, task_id, reason)
    Pushed by ``RTVIProcessor._handle_message`` whenever the
    matching inbound message arrives, alongside firing
    ``on_ui_message``. Pipeline observers and processors can match
    on the frame; subscribers like the subagents bridge keep using
    the event handler.

The data layer is the canonical authority for the wire format:
higher-level frameworks like ``pipecat-ai-subagents`` build the
agent abstractions on top, and single-LLM Pipecat apps can target
the same wire format directly via custom tools that emit these
typed messages.
2026-05-02 12:09:01 -04:00
Mark Backman
30efd11e15 Merge pull request #4397 from pipecat-ai/mb/smallwebrtc-trace-app-message 2026-05-01 20:47:04 -04:00
Mark Backman
440738f727 Update changelog for #4401: split fix and default-model change 2026-05-01 09:19:27 -04:00
Mark Backman
7da94436f5 Add changelog for #4401 2026-05-01 09:18:34 -04:00
Mark Backman
492c9702ee fix(xai/realtime): pass model as query param on connect
xAI's Voice Agent API selects the model via the ?model= query
parameter on the WebSocket URL; it cannot be changed later via
session.update. The Grok Realtime service was setting the model in
Settings but never including it in the connection URL, so every
session silently fell back to the deprecated default
grok-voice-fast-1.0.

Append the model from Settings to the WebSocket URL on connect, and
default to the recommended grok-voice-think-fast-1.0.
2026-05-01 09:16:52 -04:00
Mark Backman
13643b192b Add changelog for #4397 2026-04-30 21:41:28 -04:00
Mark Backman
6cab2ce3f7 chore(smallwebrtc): lower app message log to trace level
App messages can be high-frequency, so logging each one at debug is noisy.
2026-04-30 21:06:47 -04:00
159 changed files with 9599 additions and 896 deletions

1
.agents/skills/changelog Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/changelog

1
.agents/skills/cleanup Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/cleanup

1
.agents/skills/code-review Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/code-review

1
.agents/skills/docstring Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/docstring

View File

@@ -0,0 +1 @@
../../.claude/skills/pr-description

1
.agents/skills/pr-submit Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/pr-submit

1
.agents/skills/update-docs Symbolic link
View File

@@ -0,0 +1 @@
../../.claude/skills/update-docs

View File

@@ -1,3 +1,8 @@
---
name: cleanup
description: Review, refactor, document, and validate code changes in the current branch
---
# Code Cleanup Skill
The **Code Cleanup Skill** reviews, refactors, and documents code changes in your current branch, ensuring alignment with **Pipecat's architecture, coding standards, and example patterns**.

View File

@@ -42,6 +42,7 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

View File

@@ -46,6 +46,7 @@ jobs:
--extra langchain \
--extra livekit \
--extra piper \
--extra runner \
--extra sagemaker \
--extra tracing \
--extra websocket

174
AGENTS.md Normal file
View File

@@ -0,0 +1,174 @@
# AGENTS.md
This file provides guidance to AI coding agents when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Deprecations**: Use the `.. deprecated:: <version>` Sphinx directive in docstrings (never inline tags like `[DEPRECATED]`), and pair it with a runtime `warnings.warn(..., DeprecationWarning)` at the call site. See `CONTRIBUTING.md` for full conventions.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
# Pydantic params class with a deprecated field
class MyParams(BaseModel):
"""Configuration parameters for MyService.
Parameters:
new_setting: Replacement for ``old_setting``.
old_setting: Legacy setting, no longer used.
.. deprecated:: 1.2.0
Use ``new_setting`` instead. Will be removed in 2.0.0.
"""
new_setting: str = "default"
old_setting: str | None = None
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.

158
CLAUDE.md
View File

@@ -1,157 +1 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Pipecat is an open-source Python framework for building real-time voice and multimodal conversational AI agents. It orchestrates audio/video, AI services, transports, and conversation pipelines using a frame-based architecture.
## Common Commands
```bash
# Setup development environment
uv sync --group dev --all-extras --no-extra gstreamer --no-extra local
# Install pre-commit hooks
uv run pre-commit install
# Run all tests
uv run pytest
# Run a single test file
uv run pytest tests/test_name.py
# Run a specific test
uv run pytest tests/test_name.py::test_function_name
# Preview changelog
uv run towncrier build --draft --version Unreleased
# Lint and format check
uv run ruff check
uv run ruff format --check
# Update dependencies (after editing pyproject.toml)
uv lock && uv sync
```
## Architecture
### Frame-Based Pipeline Processing
All data flows as **Frame** objects through a pipeline of **FrameProcessors**:
```
[Processor1] → [Processor2] → ... → [ProcessorN]
```
**Key components:**
- **Frames** (`src/pipecat/frames/frames.py`): Data units (audio, text, video) and control signals. Flow DOWNSTREAM (input→output) or UPSTREAM (acknowledgments/errors).
- **FrameProcessor** (`src/pipecat/processors/frame_processor.py`): Base processing unit. Each processor receives frames, processes them, and pushes results downstream.
- **Pipeline** (`src/pipecat/pipeline/pipeline.py`): Chains processors together.
- **ParallelPipeline** (`src/pipecat/pipeline/parallel_pipeline.py`): Runs multiple pipelines in parallel.
- **Transports** (`src/pipecat/transports/`): Transports are frame processors used for external I/O layer (Daily WebRTC, LiveKit WebRTC, WebSocket, Local). Abstract interface via `BaseTransport`, `BaseInputTransport` and `BaseOutputTransport`.
- **Pipeline Task (`src/pipecat/pipeline/task.py`)**: Runs and manages a pipeline. Pipeline tasks send the first frame, `StartFrame`, to the pipeline in order for processors to know they can start processing and pushing frames. Pipeline tasks internally create a pipeline with two additional processors, a source processor before the user-defined pipeline and a sink processor at the end. Those are used for multiple things: error handling, pipeline task level events, heartbeat monitoring, etc.
- **Pipeline Runner (`src/pipecat/pipeline/runner.py`)**: High-level entry point for executing pipeline tasks. Handles signal management (SIGINT/SIGTERM) for graceful shutdown and optional garbage collection. Run a single pipeline task with `await runner.run(task)` or multiple concurrently with `await asyncio.gather(runner.run(task1), runner.run(task2))`.
- **Services** (`src/pipecat/services/`): 60+ AI provider integrations (STT, TTS, LLM, etc.). Extend base classes: `AIService`, `LLMService`, `STTService`, `TTSService`, `VisionService`.
- **Serializers** (`src/pipecat/serializers/`): Convert frames to/from wire formats for WebSocket transports. `FrameSerializer` base class defines `serialize()` and `deserialize()`. Telephony serializers (Twilio, Plivo, Vonage, Telnyx, Exotel, Genesys) handle provider-specific protocols and audio encoding (e.g., μ-law).
- **RTVI** (`src/pipecat/processors/frameworks/rtvi.py`): Real-Time Voice Interface protocol bridging clients and the pipeline. `RTVIProcessor` handles incoming client messages (text input, audio, function call results). `RTVIObserver` converts pipeline frames to outgoing messages: user/bot speaking events, transcriptions, LLM/TTS lifecycle, function calls, metrics, and audio levels.
- **Observers** (`src/pipecat/observers/`): Monitor frame flow without modifying the pipeline. Passed to `PipelineTask` via the `observers` parameter. Implement `on_process_frame()` and `on_push_frame()` callbacks.
### Important Patterns
- **Context Aggregation**: `LLMContext` accumulates messages for LLM calls; `UserResponse` aggregates user input
- **Turn Management**: Turn management is done through `LLMUserAggregator` and
`LLMAssistantAggregator`, created with `LLMContextAggregatorPair`
- **User turn strategies**: Detection of when the user starts and stops speaking is done via user turn start/stop strategies. They push `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame` respectively.
- **Interruptions**: Interruptions are usually triggered by a user turn start strategy (e.g. `VADUserTurnStartStrategy`) but they can be triggered by other processors as well, in which case the user turn start strategies don't need to. An `InterruptionFrame` carries an optional `asyncio.Event` that is set when the frame reaches the pipeline sink. If a processor stops an `InterruptionFrame` from propagating downstream (i.e., doesn't push it), it **must** call `frame.complete()` to avoid stalling `push_interruption_task_frame_and_wait()` callers.
- **Uninterruptible Frames**: These are frames that will not be removed from internal queues even if there's an interruption. For example, `EndFrame` and `StopFrame`.
- **Events**: Most classes in Pipecat have `BaseObject` as the very base class. `BaseObject` has support for events. Events can run in the background in an async task (default) or synchronously (`sync=True`) if we want immediate action. Synchronous event handlers need to execute fast.
- **Async Task Management**: Always use `self.create_task(coroutine, name)` instead of raw `asyncio.create_task()`. The `TaskManager` automatically tracks tasks and cleans them up on processor shutdown. Use `await self.cancel_task(task, timeout)` for cancellation.
- **Error Handling**: Use `await self.push_error(msg, exception, fatal)` to push errors upstream. Services should use `fatal=False` (the default) so application code can handle errors and take action (e.g. switch to another service).
### Key Directories
| Directory | Purpose |
| -------------------------- | -------------------------------------------------- |
| `src/pipecat/frames/` | Frame definitions (100+ types) |
| `src/pipecat/processors/` | FrameProcessor base + aggregators, filters, audio |
| `src/pipecat/pipeline/` | Pipeline orchestration |
| `src/pipecat/services/` | AI service integrations (60+ providers) |
| `src/pipecat/transports/` | Transport layer (Daily, LiveKit, WebSocket, Local) |
| `src/pipecat/serializers/` | Frame serialization for WebSocket protocols |
| `src/pipecat/observers/` | Pipeline observers for monitoring frame flow |
| `src/pipecat/audio/` | VAD, filters, mixers, turn detection, DTMF |
| `src/pipecat/turns/` | User turn management |
## Code Style
- **Docstrings**: Google-style. Classes describe purpose; `__init__` has `Args:` section; dataclasses use `Parameters:` section.
- **Linting**: Ruff (line length 100). Pre-commit hooks enforce formatting.
- **Type hints**: Required for complex async code.
- **Dataclass vs Pydantic**: Use `@dataclass` for frames and internal pipeline data (high-frequency, no validation needed). Use Pydantic `BaseModel` for configuration, parameters, metrics, and external API data (benefits from validation and serialization). Specifically:
- `@dataclass`: Frame types, context aggregator pairs, internal data containers
- `BaseModel`: Service `InputParams`, transport/VAD/turn params, metrics data, API request/response models, serializer params
### Docstring Example
```python
class MyService(LLMService):
"""Description of what the service does.
More detailed description.
Event handlers available:
- on_connected: Called when we are connected
Example::
@service.event_handler("on_connected")
async def on_connected(service, frame):
...
"""
def __init__(self, param1: str, **kwargs):
"""Initialize the service.
Args:
param1: Description of param1.
**kwargs: Additional arguments passed to parent.
"""
super().__init__(**kwargs)
```
## Service Implementation
When adding a new service:
1. Extend the appropriate base class (`STTService`, `TTSService`, `LLMService`, etc.)
2. Implement required abstract methods
3. Handle necessary frames
4. By default, all frames should be pushed in the direction they came
5. Push `ErrorFrame` on failures
6. Add metrics tracking via `MetricsData` if relevant
7. Follow the pattern of existing services in `src/pipecat/services/`
## Testing
Test utilities live in `src/pipecat/tests/utils.py`. Use `run_test()` to send frames through a pipeline and assert expected output frames in each direction. Use `SleepFrame(sleep=N)` to add delays between frames.
@AGENTS.md

View File

@@ -0,0 +1 @@
- Lowered the per-message log in `SmallWebRTCInputTransport._handle_app_message` from `debug` to `trace`. App messages can be high-frequency and were noisy at debug level; set the loguru level to `TRACE` to see them again.

View File

@@ -0,0 +1 @@
- Changed the default model for `GrokRealtimeLLMService` to `grok-voice-think-fast-1.0`, xAI's recommended Voice Agent model. The previous default of `grok-voice-fast-1.0` has been deprecated by xAI and is being removed.

1
changelog/4401.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `GrokRealtimeLLMService` ignoring the configured model. The model was stored in `Settings` but never sent to xAI, so every session silently fell back to xAI's server-side default. The model is now passed via the `?model=` query parameter on the WebSocket URL as xAI's Voice Agent API requires.

1
changelog/4404.added.md Normal file
View File

@@ -0,0 +1 @@
- Added an opt-in `add_tool_change_messages` flag to the LLM aggregators (set via `LLMContextAggregatorPair(..., add_tool_change_messages=True)`) that appends a developer-role message to the context whenever `LLMSetToolsFrame` changes the set of advertised standard tools. Helps the LLM stay coherent across mid-conversation tool changes, mitigating several flavors of tool-call-related hallucination: calling tools that have been removed, avoiding tools that have been re-added, and hallucinating output (made-up answers or tool-call-shaped non-tool-calls) when tools are unavailable.

View File

@@ -0,0 +1 @@
- Added `LLMTurnCompletionUserTurnStopStrategy` in `pipecat.turns.user_stop`. When installed, the strategy gates `on_user_turn_stopped` on a `UserTurnInferenceCompletedFrame` (a new fieldless system frame emitted by any component that can judge turn completeness — e.g. the `UserTurnCompletionLLMServiceMixin` on `✓`). A `finalization_timeout` provides a safety net if no completion frame ever arrives.

View File

@@ -0,0 +1 @@
- Added `deferred(strategy)` and `DeferredUserTurnStopStrategy` in `pipecat.turns.user_stop`. Wraps a stop strategy so it fires only the inference-triggered event and suppresses `on_user_turn_stopped`, leaving finalization to another strategy in the chain such as `LLMTurnCompletionUserTurnStopStrategy`.

View File

@@ -0,0 +1 @@
- Added `FilterIncompleteUserTurnStrategies` in `pipecat.turns.user_turn_strategies` — a `UserTurnStrategies` specialization that wraps the detector chain with `deferred(...)` and appends `LLMTurnCompletionUserTurnStopStrategy` as the finalizer. Common case: `user_turn_strategies=FilterIncompleteUserTurnStrategies()`. Pass `config=UserTurnCompletionConfig(...)` to customize timeouts and prompts.

View File

@@ -0,0 +1 @@
- Added `ExternalUserTurnCompletionStopStrategy` in `pipecat.turns.user_stop` — a generic stop strategy that finalizes the user turn whenever a `UserTurnInferenceCompletedFrame` arrives, regardless of which component produced it. `LLMTurnCompletionUserTurnStopStrategy` now extends this base; future producers (Flux, custom end-of-turn classifiers, etc.) can use the base directly or subclass it to add producer-specific setup.

1
changelog/4405.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `on_user_turn_inference_triggered`, a new event on the user turn controller, processor, aggregator and stop strategies that fires when a strategy has enough signal to start LLM inference. By default it fires together with `on_user_turn_stopped`; a gating strategy can fire only the inference-triggered event and defer finalization to a peer.

View File

@@ -0,0 +1 @@
- Deprecated `LLMUserAggregatorParams.filter_incomplete_user_turns`. Use `user_turn_strategies=FilterIncompleteUserTurnStrategies()` (or add `LLMTurnCompletionUserTurnStopStrategy` to a custom `user_turn_strategies.stop`) instead. Setting the legacy flag still works for one release: the aggregator emits a `DeprecationWarning` and rewires the strategies as if you had passed `FilterIncompleteUserTurnStrategies` directly.

1
changelog/4405.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `on_user_turn_stopped` firing prematurely when `filter_incomplete_user_turns` was enabled. The event now fires only after the LLM confirms the user turn is complete (`✓`); previously the smart-turn detector's tentative stop was bubbling up before the LLM had a chance to veto it, causing observers, transcript appenders and UI indicators to receive an early — and sometimes duplicated — signal.

6
changelog/4407.added.md Normal file
View File

@@ -0,0 +1,6 @@
- Added first-class RTVI support for the UI Agent Protocol:
- Adds `ui-event`, `ui-snapshot`, and `ui-cancel-task` client-to-server messages, plus `ui-command` and `ui-task` server-to-client messages, with paired `*Data` / `*Message` pydantic models.
- Adds built-in command payload models for `Toast`, `Navigate`, `ScrollTo`, `Highlight`, `Focus`, `Click`, `SetInputValue`, and `SelectText`; matching default handlers live in `@pipecat-ai/client-react`.
- Adds `RTVIProcessor.on_ui_message` for inbound `ui-event`, `ui-snapshot`, and `ui-cancel-task` messages.
- Adds five UI pipeline frames, mirroring the `client-message` frame-and-event pattern: downstream code pushes `RTVIUICommandFrame` / `RTVIUITaskFrame` for the observer to wrap into outbound `UICommandMessage` / `UITaskMessage` envelopes, while the processor pushes inbound `RTVIUIEventFrame`, `RTVIUISnapshotFrame`, and `RTVIUICancelTaskFrame` alongside `on_ui_message`.
- Bumps the RTVI `PROTOCOL_VERSION` from `1.2.0` to `1.3.0`.

1
changelog/4414.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `TTSSpeakFrame(append_to_context=True)` greetings sometimes splitting across two assistant messages in the LLM context and not surfacing in `on_assistant_turn_stopped`. The `LLMAssistantPushAggregationFrame` emitted at the end of a TTS context now carries a PTS just past the last word so it can't overtake clock-queued `TTSTextFrame`s in the transport's output, and `LLMAssistantAggregator` now triggers `on_assistant_turn_started`/`on_assistant_turn_stopped` when it receives the frame outside an LLM response cycle (restoring v0.0.104 behavior for greeting transcripts).

1
changelog/4415.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` producing merged words (e.g. `bookLook`) when using Flash models. Flash often splits sentences mid-stream into alignment chunks that begin with a real inter-word space, but the previous fix unconditionally stripped that space from every chunk. Leading spaces are now stripped only on the first alignment chunk of an utterance, so subsequent chunks correctly flush partial words across boundaries.

1
changelog/4416.added.md Normal file
View File

@@ -0,0 +1 @@
- AWS Transcribe STT, Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor now resolve credentials via the standard boto3 provider chain (EC2 instance profiles, EKS pod roles / IRSA, ECS task roles, SSO, `~/.aws/credentials`) when explicit credentials and `AWS_*` environment variables are absent. Services running with IAM roles no longer need to export static credentials.

1
changelog/4416.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed AWS Polly TTS, Bedrock LLM, and the Bedrock AgentCore processor erroring out when only one of `AWS_ACCESS_KEY_ID` / `AWS_SECRET_ACCESS_KEY` was set in the environment. The half-populated kwargs are no longer forwarded to aioboto3; partial env-var configurations now fall through to the boto3 credential chain like fully-unset configurations do.

View File

@@ -0,0 +1 @@
- Fixed a path traversal issue in the development runner's `/files/{filename:path}` download endpoint. Previously, when the runner was started with `--folder`, a request like `/files/..%2F..%2Fetc%2Fpasswd` could escape the configured folder because `%2F`-encoded separators bypassed Starlette's path normalisation. The endpoint now resolves the joined path and rejects any filename that escapes the allowed base with a 403, and also returns 404 (instead of an implicit `null` 200) when `--folder` is unset.

View File

@@ -0,0 +1 @@
- Changed the default Inworld TTS model from `inworld-tts-1.5-max` to `inworld-tts-2` (Realtime TTS-2) across `InworldHttpTTSService`, `InworldTTSService`, and the `InworldRealtimeLLMService` cascade. Existing users can pin the prior model explicitly via the `model`/`tts_model` argument; both `inworld-tts-1.5-max` and `inworld-tts-1.5-mini` remain valid model IDs.

1
changelog/4424.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `ElevenLabsTTSService` and `ElevenLabsHttpTTSService` writing romanized/normalized text to the LLM context. With non-Latin input (e.g., Chinese), the assistant transcript was getting populated with pinyin (`Ni Hao !` instead of `你好!`), which then degraded subsequent LLM turns. The services now consume `alignment` by default and only switch to `normalizedAlignment` / `normalized_alignment` when `pronunciation_dictionary_locators` is configured (where `alignment` has overlapping restarts that produce duplicated/garbled words, per #4316). Both fields are read with preferred-with-fallback semantics since each is nullable per the API schema.

1
changelog/4426.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `keyterms` support to ElevenLabs STT services so Scribe V2 callers can bias transcription for both file-based and realtime transcription.

View File

@@ -0,0 +1 @@
- Deprecated `ResampyResampler` in favor of `SOXRAudioResampler` (or the `create_file_resampler()` / `create_stream_resampler()` factories). Instantiating `ResampyResampler` now emits a `DeprecationWarning`. The class will be removed in Pipecat 2.0 along with the default `resampy` and `numba` dependencies.

View File

@@ -0,0 +1 @@
- Changed the default model for `GrokLLMService` from `grok-3` to `grok-4.20-non-reasoning`. xAI is retiring `grok-3` on May 15, 2026.

1
changelog/4430.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `watchdog_min_timeout` parameter to `DeepgramFluxSTT` and `DeepgramFluxSageMakerSTT` (default `0.5` seconds) to control the minimum silence duration before the watchdog sends a silence packet to prevent dangling turns. The actual threshold is `max(chunk_duration * 2, watchdog_min_timeout)`, so it also adapts automatically to the audio chunk size in use.

View File

@@ -0,0 +1 @@
- `DeepgramFluxSTT` watchdog silence threshold is now dynamic: `max(chunk_duration * 2, watchdog_min_timeout)` instead of a fixed 500 ms. This prevents false silence injections when large audio chunks are sent at lower frequency.

1
changelog/4431.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a deadlock in `TTSService` that could permanently stall pipeline processing when all three conditions occurred together: `pause_frame_processing=True`, an interruption arrived before any TTS audio was played, and an `UninterruptibleFrame` (e.g. `TTSUpdateSettingsFrame`, `FunctionCallResultFrame`) was in the processing queue at that moment. The process task would block on `__process_event.wait()` indefinitely because `BotStoppedSpeakingFrame` never arrives (no audio was played) and the interruption handler did not resume processing. Affects services using `pause_frame_processing=True` such as ElevenLabs, Rime, AsyncAI, Gradium, and ResembleAI.

View File

@@ -0,0 +1 @@
- `ElevenLabsTTSService` now sends `close_context` to the server as soon as the turn is complete (on `on_turn_context_completed`) rather than waiting until all audio has finished playing back. The `isFinal` message from ElevenLabs is now used to signal `TTSStoppedFrame` and clean up the audio context, improving turn transition timing.

1
changelog/4434.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed interruptions being delayed when a slow non-uninterruptible frame was processing and an uninterruptible frame was waiting in the queue. The bot would stall until the slow frame finished instead of cancelling it immediately on interruption.

1
changelog/4435.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `TTSService` dropping uninterruptible frames (e.g. `FunctionCallResultFrame`) from its internal serialization queue when an interruption occurs. Previously, the queue was recreated on every interruption, silently discarding any queued frames. The queue is now reset instead of recreated, preserving uninterruptible frames so they are always delivered downstream.

1
changelog/4440.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed a race condition in the Daily transport that caused `AttributeError: 'NoneType' object has no attribute 'send_app_message'` when tearing down a pipeline. Both `DailyInputTransport` and `DailyOutputTransport` share the same `DailyTransportClient` and both call `cleanup()`, which was releasing the underlying `CallClient` on the first call — leaving the second caller with a `None` client.

1
changelog/4441.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Restored `cancel_on_interruption=False` support for `AWSNovaSonicLLMService` and `OpenAIRealtimeLLMService`. These services previously honored the flag by simply not cancelling in-flight function calls on interruption; the introduction of the new async-tool mechanism (which threads started/intermediate/final messages through the LLM context) broke that path because the realtime services didn't know how to interpret those messages. Note that new-style streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are not supported on these realtime services. Similar fixes for other impacted realtime services are forthcoming.

1
changelog/4443.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed two misspelled Gemini TTS voice names in `GeminiTTSService.AVAILABLE_VOICES`.

1
changelog/4446.change.md Normal file
View File

@@ -0,0 +1 @@
- Updated `InworldHttpTTSService` and `InworldTTSService` to use PCM audio encoding by default, which returns audio bytes without headers.

1
changelog/4447.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Extended the `cancel_on_interruption=False` regression fix to `GrokRealtimeLLMService`, `AzureRealtimeLLMService`, and `UltravoxRealtimeLLMService`. Grok and Azure use the same approach as in #4441 (each service detects async-tool messages in the LLM context and routes the final result to its formal tool-result channel; Azure inherits transitively from `OpenAIRealtimeLLMService`). Ultravox needed a different approach because its API freezes the conversation between `client_tool_invocation` and the matching `client_tool_result` — for async-registered functions it now ships a placeholder `client_tool_result` immediately when the function is invoked (to unfreeze the conversation), then injects the real result as user-side text once the tool finishes. Streamed intermediate results (`FunctionCallResultProperties(is_final=False)`) are still not supported on any of these realtime services. `GeminiLiveLLMService` and `InworldRealtimeLLMService` are excluded for now: Gemini Live's async-tool path needs deeper investigation, and Inworld appears to have a pre-existing problem with even simple tool calling on its Realtime API.

1
changelog/4448.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `cancel_on_interruption=False` support for `GeminiLiveLLMService` on models that support Gemini's NON_BLOCKING tool mechanism (currently Gemini 2.x); the conversation now continues while the tool runs. On models that don't yet support NON_BLOCKING (Gemini 3.x), the service surfaces a one-time warning explaining the limitation. (Note: an intermittent 1008 error can occasionally fire on Gemini 2.5 during long-running tool calls; we auto-reconnect.)

View File

@@ -0,0 +1 @@
- Moved `create_task`, `cancel_task`, the `task_manager` property, and `setup(task_manager)` up from `FrameProcessor` to `BaseObject`. Custom `BaseObject` subclasses (turn strategies, controllers, etc.) now inherit these methods directly instead of reimplementing the task manager wiring. Owners propagate the task manager to their child `BaseObject`s via `await child.setup(task_manager)`.

View File

@@ -0,0 +1 @@
- Changed the default OpenAI Realtime input audio transcription model from `gpt-4o-transcribe` to `gpt-realtime-whisper` for both `OpenAIRealtimeSTTService` and `OpenAIRealtimeLLMService`. The new model does not accept the `prompt` parameter; if a prompt is supplied alongside `gpt-realtime-whisper`, it is dropped automatically and a warning is logged. To keep using prompt hints, explicitly pin `model="gpt-4o-transcribe"` (or `"gpt-4o-mini-transcribe"`).

View File

@@ -0,0 +1 @@
- Updated the default model for `CartesiaTTSService` and `CartesiaHttpTTSService` from `sonic-3` to `sonic-3.5`.

View File

@@ -0,0 +1 @@
- Added NVIDIA Magpie TTS services via AWS SageMaker: `NvidiaSageMakerHTTPTTSService` (single HTTP invocation, streams raw PCM back) and `NvidiaSageMakerWebsocketTTSService` (persistent HTTP/2 bidi-stream with full interruption support via `InterruptibleTTSService`).

1
changelog/4464.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `NvidiaSageMakerWebsocketSTTService` for streaming speech recognition using NVIDIA Nemotron ASR via an AWS SageMaker bidirectional-stream endpoint. Produces `InterimTranscriptionFrame` and `TranscriptionFrame` frames, is VAD-aware, and automatically reconnects on error.

1
changelog/4465.fixed.md Normal file
View File

@@ -0,0 +1 @@
- Fixed `OpenAIRealtimeLLMService` handling of multi-output-item responses (observed with `gpt-realtime-2`). A single response can now contain more than one audio item, and the first item's `audio.done` may arrive after the second item's deltas have started. Deltas still arrive strictly in playback order, so we continue to forward them as received (matching OpenAI's reference implementation). The fix removes spurious warnings, ensures truncation always targets the latest audio item, and emits a single bracketing `TTSStartedFrame`/`TTSStoppedFrame` pair per assistant turn (the Stopped is now pushed on `response.done`).

1
changelog/4470.added.md Normal file
View File

@@ -0,0 +1 @@
- Added support for `reasoning` configuration on `OpenAIRealtimeLLMService`, for use with reasoning-capable Realtime models such as `gpt-realtime-2`.

View File

@@ -0,0 +1 @@
- Changed the default model for `OpenAIRealtimeLLMService` from `gpt-realtime-1.5` to `gpt-realtime-2`.

1
changelog/4480.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `wait_for_transcript_to_end_user_turn` on `LLMUserAggregatorParams` for pipelines where local turn detection drives a realtime service like Gemini Live. Set it to False to avoid unnecessary latency from transcript delay — the realtime service consumes user audio directly, so we don't need user transcripts in context before it can respond. The option makes it so that (1) turn strategies do not consider user transcripts, letting the user turn end sooner, and (2) user transcripts are then handled by the aggregator: a simple timer gives it time to gather those transcripts after the user turn ends, and once gathered, the aggregator emits a new `on_user_turn_message_finalized` event with the new user context message. The new event also fires in the default mode (coinciding with `on_user_turn_stopped`), so consumers that want the populated user transcript can subscribe to it uniformly. See `examples/realtime/realtime-gemini-live-local-vad.py` for the full pattern.

View File

@@ -132,6 +132,10 @@ NOVITA_API_KEY=...
# NVIDIA
NVIDIA_API_KEY=...
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
SAGEMAKER_ASR_ENDPOINT_NAME=...
SAGEMAKER_MAGPIE_ENDPOINT_NAME=...
# OpenAI
OPENAI_API_KEY=...

View File

@@ -0,0 +1,232 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual validation harness for the ``add_tool_change_messages`` feature.
When tools change mid-conversation, LLMs can produce a few different
flavors of tool-call-related hallucination:
- **Forward hallucination** — calling a tool that has been removed.
- **Negative hallucination** — refusing to call a tool that has been
re-added (because recent context is full of "I can't" responses).
- **Hallucinated output when tools are unavailable** — making up an
answer rather than declining gracefully, or producing JSON that
*looks* like a tool call but is actually just an assistant text
response.
The ``add_tool_change_messages`` feature mitigates these by appending a
developer-role message to the conversation whenever ``LLMSetToolsFrame``
changes the set of advertised tools, so the LLM stays in sync with what's
actually available.
This harness exercises all of those flavors by flipping the advertised
tool set on a turn counter:
Phase 0 (turns 14): weather tool ACTIVE — confirm baseline.
Phase 1 (turns 58): tool REMOVED — keep asking for weather.
Phase 2 (turn 9+): tool RE-ADDED — does the LLM call it again?
Set ``ADD_TOOL_CHANGE_MESSAGES=0`` to disable the mitigation and see the
unmitigated behavior. The default is ON so a fresh run shows the feature
working.
Defaults to Llama 3.1 8B Instruct via a locally-running Ollama —
anecdotally one of the more hallucination-prone of the easily accessible
models. Pull the model once with ``ollama pull llama3.1:8b`` and make
sure ``ollama serve`` is running. Swap the LLM service to validate other
providers.
Run with::
uv run examples/features/features-add-tool-change-messages.py
ADD_TOOL_CHANGE_MESSAGES=0 uv run examples/features/features-add-tool-change-messages.py
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, LLMSetToolsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import NOT_GIVEN, LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ollama.llm import OLLamaLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# Default ON so a fresh run shows the feature working. Set to "0" to A/B
# against the unmitigated behavior.
ADD_TOOL_CHANGE_MESSAGES = os.environ.get("ADD_TOOL_CHANGE_MESSAGES", "1") == "1"
async def fetch_weather_from_api(params: FunctionCallParams):
await params.result_callback({"conditions": "nice", "temperature": "75"})
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(
f"Starting add_tool_change_messages demo bot "
f"(ADD_TOOL_CHANGE_MESSAGES={ADD_TOOL_CHANGE_MESSAGES})"
)
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OLLamaLLMService(
settings=OLLamaLLMService.Settings(
# Llama 3.1 8B Instruct is anecdotally one of the more
# hallucination-prone of the easily accessible models — exactly
# what we want for this validation harness. Pull it with
# ``ollama pull llama3.1:8b`` and make sure ``ollama serve``
# is running.
model="llama3.1:8b",
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"If the user asks for the current weather, use the `get_current_weather` "
"function if it's available. IMPORTANT: if you do not have access to the function, "
"say something along the lines of 'Sorry, I can't check the weather right now.'."
),
),
)
llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
add_tool_change_messages=ADD_TOOL_CHANGE_MESSAGES,
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
# Phase controller: roughly 4 turns per phase.
user_turn_count = 0
REMOVE_AT_TURN = 5 # tool gone for turn N onward
READD_AT_TURN = 9 # tool back for turn N onward
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message):
nonlocal user_turn_count
user_turn_count += 1
logger.info(f"=== User turn {user_turn_count} complete ===")
if user_turn_count == REMOVE_AT_TURN - 1:
logger.info(
"=== Phase 1: weather tool REMOVED. Keep asking about the weather "
"to exercise hallucination scenarios. ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=NOT_GIVEN))
elif user_turn_count == READD_AT_TURN - 1:
logger.info(
"=== Phase 2: weather tool RE-ADDED. Ask for the weather again — "
"does the LLM call it, or keep refusing? (THIS IS THE TEST.) ==="
)
await task.queue_frame(LLMSetToolsFrame(tools=weather_tools))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Phase 0: weather tool ACTIVE. Ask for the weather a few times "
"to confirm it's working. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite them "
"to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,187 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Manual demonstration of the missing-handler (developer-error) recovery path.
When a tool is advertised to the LLM via ``tools``/``LLMContext`` but
the developer forgets to call ``llm.register_function(...)`` to wire up
its handler, the LLM happily emits a tool call and then... nothing
happens on the Pipecat side, leaving the conversation stuck.
Pipecat's recovery path (``LLMService._missing_function_call_handler``)
catches this case:
- Logs a ``logger.error`` distinguishing **developer error** (tool advertised
but no handler registered) from a hallucination (tool not advertised),
pointing at the missing ``register_function`` call.
- Returns a neutral terminal tool result
(``LLMService.MISSING_FUNCTION_CALL_MESSAGE_TEMPLATE``: "The function
`X` is not currently available.") so the call still terminates with a
normal tool result instead of leaving the conversation stuck.
This example is **deliberately broken**: the weather schema is in
``tools`` but ``register_function`` is *not* called. Ask the bot about
the weather and observe:
1. The LLM emits a tool call for ``get_current_weather``.
2. ``logger.error`` fires with "advertised … but has no registered handler
— did you forget to call register_function()?"
3. The terminal tool result is fed back to the LLM.
4. The LLM responds in voice based on that result (typically something
like "the weather function isn't available right now").
Uses the OpenAI LLM service with defaults. Swap to another provider to
validate this behavior elsewhere.
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
weather_tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(audio_in_enabled=True, audio_out_enabled=True),
"twilio": lambda: FastAPIWebsocketParams(audio_in_enabled=True, audio_out_enabled=True),
"webrtc": lambda: TransportParams(audio_in_enabled=True, audio_out_enabled=True),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting missing-handler demo bot (no handler is registered on purpose)")
stt = DeepgramSTTService(api_key=os.environ["DEEPGRAM_API_KEY"])
tts = CartesiaTTSService(
api_key=os.environ["CARTESIA_API_KEY"],
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. Your responses "
"will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. Respond briefly and naturally. "
"Always use the get_current_weather function to answer questions "
"about the current weather."
),
),
)
# *** DELIBERATELY OMITTED ***
# The whole point of this example is to demonstrate the missing-handler
# recovery path. Re-add this line to wire the tool up correctly:
#
# llm.register_function("get_current_weather", fetch_weather_from_api)
context = LLMContext(tools=weather_tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(enable_metrics=True, enable_usage_metrics=True),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
logger.info(
"=== Ask for the weather. Watch for a logger.error about the missing "
"handler, and listen for the LLM's response based on the recovery "
"message. ==="
)
context.add_message(
{
"role": "developer",
"content": (
"Please introduce yourself briefly to the user, then invite "
"them to ask about the weather."
),
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -29,7 +29,7 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.stt import OpenAIRealtimeSTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -69,13 +69,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAISTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAISTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related weather, such as temperature and conditions. And restaurant names.",
),
)
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
tts = OpenAITTSService(
api_key=os.environ["OPENAI_API_KEY"],

View File

@@ -25,7 +25,7 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.stt import OpenAIRealtimeSTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -63,13 +63,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAISTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAISTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related weather, such as temperature and conditions. And restaurant names.",
),
)
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
tts = OpenAITTSService(
api_key=os.environ["OPENAI_API_KEY"],

View File

@@ -0,0 +1,184 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the AWS Nova Sonic LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Nova Sonic via the
formal toolResult channel so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
llm = AWSNovaSonicLLMService(
secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
region=os.environ["AWS_REGION"],
session_token=os.getenv("AWS_SESSION_TOKEN"),
settings=AWSNovaSonicLLMService.Settings(
voice="tiffany",
system_instruction=system_instruction,
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -46,11 +46,6 @@ async def fetch_weather_from_api(params: FunctionCallParams):
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
# Simulate a long network delay.
# You can continue chatting while waiting for this to complete.
# With Nova 2 Sonic (the default model), the assistant will respond
# appropriately once the function call is complete.
await asyncio.sleep(5)
await params.result_callback(
{
"conditions": "nice",
@@ -150,9 +145,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
# Register function for function calls
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function(
"get_current_weather", fetch_weather_from_api, cancel_on_interruption=False
)
llm.register_function("get_current_weather", fetch_weather_from_api)
# Set up context and context management.
context = LLMContext(tools=tools)

View File

@@ -0,0 +1,195 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Azure Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Azure Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = AzureRealtimeLLMService(
api_key=os.environ["AZURE_REALTIME_API_KEY"],
base_url=os.environ["AZURE_REALTIME_BASE_URL"],
settings=AzureRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(model="whisper-1"),
)
),
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -4,15 +4,25 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Gemini Live LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Gemini Live as a
FunctionResponse so the model can integrate it naturally into its next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -31,33 +41,55 @@ load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. google_search: Use this tool to search the web for information.
"""
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
@@ -77,42 +109,6 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.environ["GOOGLE_API_KEY"],
settings=GeminiLiveLLMService.Settings(
@@ -121,13 +117,12 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
tools=tools,
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
# You can provide the system instructions and tools in the context rather
# than as arguments to GeminiLiveLLMService, but note that doing so will
# trigger a (fast) reconnection when the GeminiLiveLLMService first
# receives the context (i.e. when we send the LLMRunFrame below).
context = LLMContext()
# Server-side VAD is enabled by default; no local VAD is added.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
@@ -154,7 +149,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
@@ -166,7 +160,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)

View File

@@ -20,6 +20,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserMessageFinalizedMessage,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
@@ -70,10 +71,25 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
},
],
)
# `wait_for_transcript_to_end_user_turn=False` is the right setting
# for pipelines like this one — local turn detection driving a
# realtime service. It avoids unnecessary latency from transcript
# delay: the realtime service consumes user audio directly, so
# we don't need user transcripts in context before it can respond.
# With this option:
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
wait_for_transcript_to_end_user_turn=False,
),
)
@@ -107,8 +123,23 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client disconnected")
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage
):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")

View File

@@ -6,10 +6,13 @@
import os
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -23,6 +26,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -30,6 +34,32 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. google_search: Use this tool to search the web for information.
"""
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
@@ -51,23 +81,55 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.environ["GOOGLE_API_KEY"],
settings=GeminiLiveLLMService.Settings(
system_instruction=system_instruction,
voice="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."
),
# inference_on_context_initialization=False,
tools=tools,
)
context = LLMContext(
[
{
"role": "user",
"content": "Say hello. Then ask if I want to hear a joke.",
},
],
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
context = LLMContext()
# Server-side VAD is enabled by default; no local VAD is added.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
@@ -94,6 +156,9 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")

View File

@@ -0,0 +1,179 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Grok Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to Grok Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.xai.realtime.events import SessionProperties
from pipecat.services.xai.realtime.llm import GrokRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
# Note: Grok has built-in server-side VAD, so we don't need local VAD.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GrokRealtimeLLMService(
api_key=os.environ["XAI_API_KEY"],
settings=GrokRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
voice="Ara",
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,198 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the OpenAI Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
While the call is in flight the conversation continues; the result arrives
later via the async-tool mechanism and is forwarded to OpenAI Realtime as a
``function_call_output`` so the model can integrate it naturally into its
next turn.
"""
import asyncio
import os
import random
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioNoiseReduction,
InputAudioTranscription,
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
tools = ToolsSchema(standard_tools=[weather_function])
system_instruction = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
system_instruction=system_instruction,
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=SemanticTurnDetection(),
noise_reduction=InputAudioNoiseReduction(type="near_field"),
)
),
),
),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,182 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
LLMUserAggregatorParams,
UserMessageFinalizedMessage,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# `turn_detection=False` disables OpenAI Realtime's server-side VAD,
# so this pipeline's local turn detection drives turn boundaries.
# The service then sends `input_audio_buffer.commit` +
# `response.create` when it sees `UserStoppedSpeakingFrame`.
llm = OpenAIRealtimeLLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeLLMService.Settings(
session_properties=SessionProperties(
audio=AudioConfiguration(
input=AudioInput(
transcription=InputAudioTranscription(),
turn_detection=False,
),
),
),
),
)
context = LLMContext(
[
{
"role": "developer",
"content": "Say hello. Then ask if I want to hear a joke.",
},
],
)
# `wait_for_transcript_to_end_user_turn=False` is the right setting
# for pipelines like this one — local turn detection driving a
# realtime service. It avoids unnecessary latency from transcript
# delay: the realtime service consumes user audio directly, so
# we don't need user transcripts in context before it can respond.
# With this option:
#
# - Turn strategies do not consider user transcripts, so the user
# turn ends sooner.
# - User transcripts are handled by the aggregator: a simple
# post-turn transcript wait gives them time to arrive after the
# user turn ends, then the aggregator emits
# `on_user_turn_message_finalized` with the new user context
# message.
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
wait_for_transcript_to_end_user_turn=False,
),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
# `on_user_turn_stopped` fires at the end of the user turn. With
# `wait_for_transcript_to_end_user_turn=False`, no user
# transcripts have arrived yet at this point, so
# `message.content` is empty. Logged here to make the end-of-turn
# signal visible alongside the later finalization event.
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
logger.info(f"User turn ended (strategy: {type(strategy).__name__})")
# `on_user_turn_message_finalized` fires when the user message has
# been finalized into the context. Here it fires later than
# `on_user_turn_stopped`, after the aggregator's post-turn
# transcript wait completes.
@user_aggregator.event_handler("on_user_turn_message_finalized")
async def on_user_turn_message_finalized(
aggregator, strategy, message: UserMessageFinalizedMessage
):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}user: {message.content}"
logger.info(f"Transcript: {line}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
line = f"{timestamp}assistant: {message.content}"
logger.info(f"Transcript: {line}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -232,6 +232,20 @@ Remember, your responses should be short. Just one or two sentences, usually. Re
# [LLMUpdateSettingsFrame(settings=SessionProperties(tools=new_tools).model_dump())]
# )
# Reasoning effort can be changed at runtime too. Only
# reasoning-capable Realtime models (e.g. gpt-realtime-2) support this.
# await task.queue_frames(
# [
# LLMUpdateSettingsFrame(
# delta=OpenAIRealtimeLLMService.Settings(
# session_properties=SessionProperties(
# reasoning=Reasoning(effort="xhigh"),
# ),
# )
# )
# ]
# )
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")

View File

@@ -0,0 +1,186 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with the Ultravox Realtime LLM service.
The ``get_current_weather`` tool is registered with
``cancel_on_interruption=False`` and simulates a slow API call (10s sleep).
Ultravox's API freezes the conversation between ``client_tool_invocation``
and the matching ``client_tool_result``, so the service ships a placeholder
``client_tool_result`` immediately when an async-registered function is
invoked (to unfreeze the conversation). When the real tool finishes, the
actual result is injected as user-side text so the model picks it up.
"""
import asyncio
import datetime
import os
import random
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.ultravox.llm import OneShotInputParams, UltravoxRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import SpeechTimeoutUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call so we can demonstrate that the
# conversation continues while the tool is in flight.
await asyncio.sleep(10)
temperature = (
random.randint(60, 85)
if params.arguments["format"] == "fahrenheit"
else random.randint(15, 30)
)
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"location": params.arguments["location"],
"format": params.arguments["format"],
"timestamp": datetime.datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
required=["location", "format"],
)
system_prompt = (
"You are a friendly assistant. The user and you will engage in a spoken "
"dialog exchanging the transcripts of a natural real-time conversation. "
"Keep your responses short, generally two or three sentences for chatty "
"scenarios. When the user asks for the weather, call get_current_weather. "
"While you wait for the result, keep chatting with the user. When the "
"result arrives, share it with the user naturally."
)
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = UltravoxRealtimeLLMService(
params=OneShotInputParams(
api_key=os.environ["ULTRAVOX_API_KEY"],
system_prompt=system_prompt,
temperature=0.3,
max_duration=datetime.timedelta(minutes=3),
),
one_shot_selected_tools=ToolsSchema(standard_tools=[weather_function]),
)
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
)
context = LLMContext([])
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
user_turn_strategies=UserTurnStrategies(
stop=[SpeechTimeoutUserTurnStopStrategy()],
),
vad_analyzer=SileroVADAnalyzer(),
),
)
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -49,13 +49,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAIRealtimeSTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeSTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related to dogs, such as breed names.",
),
)
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
tl = TranscriptionLogger()
vad_processor = VADProcessor(vad_analyzer=SileroVADAnalyzer())

View File

@@ -10,7 +10,7 @@ Demonstrates LLM-based turn completion detection to suppress bot responses when
the user was cut off mid-thought. The LLM outputs one of three markers:
- ✓ (complete): User finished their thought, respond normally
- ○ (incomplete short): User was cut off, wait ~5s then prompt
- ◐ (incomplete long): User needs time to think, wait ~15s then prompt
- ◐ (incomplete long): User needs time to think, wait ~10s then prompt
When incomplete is detected, the bot's response is suppressed. After the timeout
expires, the LLM is automatically prompted to re-engage the user.
@@ -41,6 +41,7 @@ from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_turn_strategies import FilterIncompleteUserTurnStrategies
load_dotenv(override=True)
@@ -83,23 +84,28 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)
context = LLMContext()
# `FilterIncompleteUserTurnStrategies` pairs the default detector
# chain with `LLMTurnCompletionUserTurnStopStrategy`: detectors
# trigger LLM inference but the public `on_user_turn_stopped` event
# fires only when the LLM confirms ✓. The LLM marks each response
# with one of:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 10s, then prompt)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(
vad_analyzer=SileroVADAnalyzer(),
# Enable turn completion filtering - the LLM will output:
# ✓ = complete (respond normally)
# ○ = incomplete short (wait 5s, then prompt)
# ◐ = incomplete long (wait 15s, then prompt)
filter_incomplete_user_turns=True,
# Optional: customize turn completion behavior
# turn_completion_config=TurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=15.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
user_turn_strategies=FilterIncompleteUserTurnStrategies(
# Optional: customize turn completion behavior
# config=UserTurnCompletionConfig(
# incomplete_short_timeout=5.0,
# incomplete_long_timeout=10.0,
# incomplete_short_prompt="Custom prompt...",
# incomplete_long_prompt="Custom prompt...",
# instructions="Custom turn completion instructions...",
# ),
),
),
)

View File

@@ -0,0 +1,129 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
# For a full example of how to deploy to SageMaker, see:
# https://github.com/pipecat-ai/pipecat-examples/tree/main/nvidia_sagemaker_example/deployment/aws-sagemaker-nvidia
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.nvidia.llm import NvidiaLLMService
from pipecat.services.nvidia.sagemaker.stt import NvidiaSageMakerSTTService
from pipecat.services.nvidia.sagemaker.tts import NvidiaSageMakerTTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = NvidiaSageMakerSTTService(
endpoint_name=os.environ["SAGEMAKER_ASR_ENDPOINT_NAME"],
region=os.getenv("AWS_REGION", "us-west-2"),
)
llm = NvidiaLLMService(
api_key=os.environ["NVIDIA_API_KEY"],
settings=NvidiaLLMService.Settings(
model="meta/llama-3.3-70b-instruct",
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
tts = NvidiaSageMakerTTSService(
endpoint_name=os.environ["SAGEMAKER_MAGPIE_ENDPOINT_NAME"],
region=os.getenv("AWS_REGION", "us-west-2"),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -25,7 +25,6 @@ from pipecat.runner.utils import create_transport
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAIRealtimeSTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -53,14 +52,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAIRealtimeSTTService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAIRealtimeSTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related to dogs, such as breed names.",
language=Language.EN,
),
)
stt = OpenAIRealtimeSTTService(api_key=os.environ["OPENAI_API_KEY"])
tts = OpenAITTSService(
api_key=os.environ["OPENAI_API_KEY"],
@@ -72,7 +64,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = OpenAILLMService(
api_key=os.environ["OPENAI_API_KEY"],
settings=OpenAILLMService.Settings(
system_instruction="You are very knowledgable about dogs. Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points. Respond to what the user said in a creative and helpful way.",
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)

View File

@@ -223,12 +223,11 @@ TESTS_REALTIME = [
# ("realtime/realtime-azure.py", EVAL_WEATHER),
("realtime/realtime-openai-text.py", EVAL_WEATHER),
("realtime/realtime-openai-live-video.py", EVAL_VISION_CAMERA),
("realtime/realtime-gemini-live.py", EVAL_SIMPLE_MATH),
("realtime/realtime-gemini-live.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-local-vad.py", EVAL_SIMPLE_MATH),
("realtime/realtime-gemini-live-function-calling.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-video.py", EVAL_VISION_CAMERA),
("realtime/realtime-gemini-live-google-search.py", EVAL_ONLINE_SEARCH),
("realtime/realtime-gemini-live-vertex-function-calling.py", EVAL_WEATHER),
("realtime/realtime-gemini-live-vertex.py", EVAL_WEATHER),
("realtime/realtime-aws-nova-sonic.py", EVAL_SIMPLE_MATH),
("realtime/realtime-ultravox.py", EVAL_ORDER),
("realtime/realtime-grok.py", EVAL_WEATHER),

View File

@@ -139,6 +139,36 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
return formatted_standard_tools + custom_gemini_tools
@staticmethod
def to_function_response_dict(content: Any) -> dict[str, Any]:
"""Convert a tool-result content value to Gemini's FunctionResponse.response shape.
Gemini's ``FunctionResponse.response`` field requires a dict, so
non-dict values (e.g. plain strings, JSON-encoded scalars, or
sentinel strings like ``"COMPLETED"`` used when a function returned
no value) are wrapped as ``{"value": <value>}``. JSON strings that
decode to a dict are passed through as-is.
Args:
content: The tool-result content. Typically the JSON-encoded
return value of a function, but can also be a plain string
(e.g. ``"COMPLETED"``) or already-parsed dict.
Returns:
A dict suitable for ``FunctionResponse.response``.
"""
if isinstance(content, dict):
return content
if not isinstance(content, str):
return {"value": content}
try:
decoded = json.loads(content)
except (json.JSONDecodeError, ValueError):
return {"value": content}
if isinstance(decoded, dict):
return decoded
return {"value": decoded}
def get_messages_for_logging(self, context: LLMContext) -> list[dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about Gemini.
@@ -382,16 +412,7 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
)
elif role == "tool":
role = "user"
try:
response = json.loads(msg["content"])
if isinstance(response, dict):
response_dict = response
else:
response_dict = {"value": response}
except Exception as e:
# Response might not be JSON-deserializable.
# This occurs with a UserImageFrame, for example, where we get a plain "COMPLETED" string.
response_dict = {"value": msg["content"]}
response_dict = self.to_function_response_dict(msg["content"])
# Get function name from mapping using tool_call_id, or fallback
tool_call_id = msg.get("tool_call_id")

View File

@@ -10,6 +10,8 @@ This module provides an audio resampler that uses the resampy library
for high-quality audio sample rate conversion.
"""
import warnings
import numpy as np
import resampy
@@ -21,6 +23,11 @@ class ResampyResampler(BaseAudioResampler):
This resampler uses the resampy library's Kaiser windowing filter
for high-quality audio resampling with good performance characteristics.
.. deprecated:: 1.2.0
ResampyResampler is deprecated and will be removed in Pipecat 2.0.
Use SOXRAudioResampler, create_file_resampler(), or create_stream_resampler()
instead.
"""
def __init__(self, **kwargs):
@@ -29,7 +36,15 @@ class ResampyResampler(BaseAudioResampler):
Args:
**kwargs: Additional keyword arguments (currently unused).
"""
pass
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"ResampyResampler is deprecated and will be removed in Pipecat 2.0. "
"Use SOXRAudioResampler, create_file_resampler(), or "
"create_stream_resampler() instead.",
DeprecationWarning,
stacklevel=2,
)
async def resample(self, audio: bytes, in_rate: int, out_rate: int) -> bytes:
"""Resample audio data using resampy library.

View File

@@ -339,6 +339,40 @@ class LLMTextFrame(TextFrame):
self.includes_inter_frame_spaces = True
@dataclass
class LLMMarkerFrame(DataFrame):
"""Sideband marker emitted by an LLM service.
A marker is short, structured assistant output that should be
persisted in the conversation context but should not flow through
the standard text path (TTS, transcript). The assistant aggregator
writes the marker to the context so the LLM can self-condition on
prior markers on subsequent turns.
The primary use today is the ``filter_incomplete_user_turns``
protocol, where ``UserTurnCompletionLLMServiceMixin`` emits the
turn-completion markers ✓ / ○ / ◐ on every response. The frame is
intentionally generic so other components — STT services with
built-in turn signals, end-of-turn classifiers, custom annotations,
etc. — can use the same mechanism to inject sideband signals into
the assistant context.
Parameters:
marker: The marker payload (typically a short string such as a
single character).
append_to_context_immediately: If True, the marker is written
to the context as its own standalone assistant message as
soon as it's received. If False, the marker is appended to
the running assistant aggregation and flushed to the
context together with the following text as a single
message (e.g. for the ✓ case the context message ends up
as "✓ <response>").
"""
marker: str
append_to_context_immediately: bool = True
@dataclass
class AggregatedTextFrame(TextFrame):
"""Text frame representing an aggregation of TextFrames.
@@ -661,6 +695,11 @@ class FunctionCallResultProperties:
is_final: Whether this is the final result for the function call. When
``False`` the result is treated as an intermediate update. Defaults to ``True``.
Only meaningful for async function calls (``cancel_on_interruption=False``).
Note: realtime LLM services do not support streamed intermediate
results; they deliver only the final result to the provider. An
intermediate result reported to a realtime service is dropped
and an error is raised. Use a non-realtime LLM service if your
tool needs to stream intermediate results.
"""
run_llm: bool | None = None
@@ -970,6 +1009,24 @@ class UserSpeakingFrame(SystemFrame):
pass
@dataclass
class UserTurnInferenceCompletedFrame(SystemFrame):
"""Frame indicating that the user turn is semantically complete.
Emitted by any component that can judge conversational turn
completeness — for example an LLM with turn-completion markers, an
STT service with built-in turn detection, or a dedicated
end-of-turn classifier. Stop strategies that gate the
user-turn-stop event on an external completeness signal (e.g.
``LLMTurnCompletionUserTurnStopStrategy``) consume this frame to
finalize the turn. Producers should emit this frame only when they
judge the turn complete; an absence of this frame means the turn is
not yet considered complete.
"""
pass
@dataclass
class VADUserStartedSpeakingFrame(SystemFrame):
"""Frame emitted when VAD definitively detects user started speaking.

View File

@@ -303,7 +303,7 @@ class PipelineTask(BasePipelineTask):
# This task maneger will handle all the asyncio tasks created by this
# PipelineTask and its frame processors.
self._task_manager = task_manager or TaskManager()
self._pipeline_task_manager = task_manager or TaskManager()
# This queue is the queue used to push frames to the pipeline.
self._push_queue = asyncio.Queue()
@@ -386,7 +386,7 @@ class PipelineTask(BasePipelineTask):
# The task observer acts as a proxy to the provided observers. This way,
# we only need to pass a single observer (using the StartFrame) which
# then just acts as a proxy.
self._observer = TaskObserver(observers=observers, task_manager=self._task_manager)
self._observer = TaskObserver(observers=observers)
# These events can be used to check which frames make it to the source
# or sink processors. Instead of calling the event handlers for every
@@ -654,32 +654,24 @@ class PipelineTask(BasePipelineTask):
async def _create_tasks(self):
"""Create and start all pipeline processing tasks."""
self._process_push_task = self._task_manager.create_task(
self._process_push_queue(), f"{self}::_process_push_queue"
)
self._process_push_task = self.create_task(self._process_push_queue())
return self._process_push_task
def _maybe_start_heartbeat_tasks(self):
"""Start heartbeat tasks if heartbeats are enabled and not already running."""
if self._params.enable_heartbeats and self._heartbeat_push_task is None:
self._heartbeat_push_task = self._task_manager.create_task(
self._heartbeat_push_handler(), f"{self}::_heartbeat_push_handler"
)
self._heartbeat_monitor_task = self._task_manager.create_task(
self._heartbeat_monitor_handler(), f"{self}::_heartbeat_monitor_handler"
)
self._heartbeat_push_task = self.create_task(self._heartbeat_push_handler())
self._heartbeat_monitor_task = self.create_task(self._heartbeat_monitor_handler())
def _maybe_start_idle_task(self):
"""Start idle monitoring task if idle timeout is configured."""
if self._idle_timeout_secs:
self._idle_monitor_task = self._task_manager.create_task(
self._idle_monitor_handler(), f"{self}::_idle_monitor_handler"
)
self._idle_monitor_task = self.create_task(self._idle_monitor_handler())
async def _cancel_tasks(self):
"""Cancel all running pipeline tasks."""
if self._process_push_task:
await self._task_manager.cancel_task(self._process_push_task)
await self.cancel_task(self._process_push_task)
self._process_push_task = None
await self._maybe_cancel_heartbeat_tasks()
@@ -691,17 +683,17 @@ class PipelineTask(BasePipelineTask):
return
if self._heartbeat_push_task:
await self._task_manager.cancel_task(self._heartbeat_push_task)
await self.cancel_task(self._heartbeat_push_task)
self._heartbeat_push_task = None
if self._heartbeat_monitor_task:
await self._task_manager.cancel_task(self._heartbeat_monitor_task)
await self.cancel_task(self._heartbeat_monitor_task)
self._heartbeat_monitor_task = None
async def _maybe_cancel_idle_task(self):
"""Cancel idle monitoring task if it is running."""
if self._idle_monitor_task:
await self._task_manager.cancel_task(self._idle_monitor_task)
await self.cancel_task(self._idle_monitor_task)
self._idle_monitor_task = None
def _initial_metrics_frame(self) -> MetricsFrame:
@@ -759,12 +751,14 @@ class PipelineTask(BasePipelineTask):
async def _setup(self, params: PipelineTaskParams):
"""Set up the pipeline task and all processors."""
await super().setup(self._pipeline_task_manager)
mgr_params = TaskManagerParams(loop=params.loop)
self._task_manager.setup(mgr_params)
self.task_manager.setup(mgr_params)
setup = FrameProcessorSetup(
clock=self._clock,
task_manager=self._task_manager,
task_manager=self.task_manager,
observer=self._observer,
pipeline_task=self,
# Populate the deprecated `tool_resources` field for backwards
@@ -780,6 +774,7 @@ class PipelineTask(BasePipelineTask):
await self._load_setup_files()
# Start task observer.
await self._observer.setup(self.task_manager)
await self._observer.start()
async def _cleanup(self, cleanup_pipeline: bool):
@@ -1019,7 +1014,7 @@ class PipelineTask(BasePipelineTask):
def _print_dangling_tasks(self):
"""Log any dangling tasks that haven't been properly cleaned up."""
tasks = [t.get_name() for t in self._task_manager.current_tasks()]
tasks = [t.get_name() for t in self.task_manager.current_tasks()]
if tasks:
logger.warning(f"{self} dangling tasks detected: {tasks}")

View File

@@ -17,7 +17,6 @@ from typing import Any
from attr import dataclass
from pipecat.observers.base_observer import BaseObserver, FrameProcessed, FramePushed
from pipecat.utils.asyncio.task_manager import BaseTaskManager
@dataclass
@@ -62,19 +61,16 @@ class TaskObserver(BaseObserver):
self,
*,
observers: list[BaseObserver] | None = None,
task_manager: BaseTaskManager,
**kwargs,
):
"""Initialize the TaskObserver.
Args:
observers: List of observers to manage. Defaults to empty list.
task_manager: Task manager for creating and managing observer tasks.
**kwargs: Additional arguments passed to the base observer.
"""
super().__init__(**kwargs)
self._observers = observers or []
self._task_manager = task_manager
self._proxies: dict[BaseObserver, Proxy] | None = (
None # Becomes a dict after start() is called
)
@@ -106,7 +102,7 @@ class TaskObserver(BaseObserver):
# Remove the proxy so it doesn't get called anymore.
del self._proxies[observer]
# Cancel the proxy task right away.
await self._task_manager.cancel_task(proxy.task)
await self.cancel_task(proxy.task)
# Remove the observer from the list.
if observer in self._observers:
@@ -122,7 +118,7 @@ class TaskObserver(BaseObserver):
return
for proxy in self._proxies.values():
await self._task_manager.cancel_task(proxy.task)
await self.cancel_task(proxy.task)
async def cleanup(self):
"""Cleanup all proxy observers."""
@@ -157,9 +153,8 @@ class TaskObserver(BaseObserver):
def _create_proxy(self, observer: BaseObserver) -> Proxy:
"""Create a proxy for a single observer."""
queue = asyncio.Queue()
task = self._task_manager.create_task(
self._proxy_task_handler(queue, observer),
f"TaskObserver::{observer}::_proxy_task_handler",
task = self.create_task(
self._proxy_task_handler(queue, observer), f"{observer}::_proxy_task_handler"
)
proxy = Proxy(queue=queue, task=task, observer=observer)
return proxy

View File

@@ -0,0 +1,286 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Helpers for the async-tool message protocol used in LLM contexts.
When a function is registered with ``cancel_on_interruption=False``, the
``LLMUserContextAggregator`` / ``LLMAssistantContextAggregator`` pair appends
async-tool messages to the conversation context as the underlying task
progresses:
- A ``started`` message (``role="tool"``) is appended immediately when the
tool starts running.
- An ``intermediate`` message (``role="developer"``) is appended each time an
intermediate result is reported via
``result_callback(..., FunctionCallResultProperties(is_final=False))``.
- A ``final`` message (``role="developer"``) is appended when the task
finishes.
This module is the single source of truth for the on-the-wire payload shape:
- The aggregator uses the ``build_*_message`` functions when injecting messages.
- Realtime LLM services use ``parse_message`` to detect async-tool messages
while iterating the context, then read ``payload.result`` and deliver it via
their formal tool-result channel.
Internally, ``AsyncToolMessagePayload`` is the canonical structured form;
the on-the-wire JSON string is always derived from it (never stored) so the
two representations can't drift.
Consumers are expected to import the module rather than its individual
functions, e.g.::
from pipecat.processors.aggregators import async_tool_messages
...
async_tool_messages.build_started_message(tool_call_id)
async_tool_messages.parse_message(msg)
"""
import json
from dataclasses import dataclass
from typing import Any, Literal
from pipecat.processors.aggregators.llm_context import LLMStandardMessage
AsyncToolMessageKind = Literal["started", "intermediate", "final"]
# --- Payload shape (private; canonical source of truth) ---------------------
# The ``type`` field that identifies an async-tool message payload. Both the
# builders and the parser use this constant; do not duplicate the literal.
_PAYLOAD_TYPE = "async_tool"
# Status value for started / intermediate messages (task still running).
_STATUS_RUNNING = "running"
# Status value for the final message (task complete).
_STATUS_FINISHED = "finished"
# Description shipped on the started message. The text is intentionally
# self-explanatory so a model reading the context can tell what's about to
# happen even without out-of-band knowledge of the protocol.
_STARTED_DESCRIPTION = (
"An asynchronous task associated with this tool_call_id has started "
"running. Expect results to arrive later as developer messages that look "
"roughly like this one (with 'type=async_tool' and a matching tool_call_id) "
"but with a 'result' field. Note that there *may* be more than one result "
"(i.e., a stream of results), but there doesn't have to be (there may be "
"only one). The last result will come in a message with 'status=finished'."
)
# Description shipped on each intermediate-result message.
_INTERMEDIATE_DESCRIPTION = (
"This is an intermediate result for the asynchronous task associated with "
"this tool_call_id. The task is still running. More intermediate results "
"may follow, or the next result may be the final one with "
"'status=finished'."
)
# Description shipped on the final-result message.
_FINAL_DESCRIPTION = (
"This is the final result for the asynchronous task associated with this "
"tool_call_id. The task has completed. No further results will arrive for "
"this tool_call_id."
)
@dataclass(frozen=True)
class AsyncToolMessagePayload:
"""The structured contents of an async-tool message in an LLM context.
Parameters:
kind: Which of the three async-tool message stages this is.
tool_call_id: The id of the tool invocation this payload relates to.
status: ``"running"`` for started/intermediate, ``"finished"`` for
the final message.
description: Human-readable description from the payload. May be empty.
result: For ``intermediate`` and ``final`` messages, the JSON-encoded
result string (or the literal ``"COMPLETED"`` if the function
returned no value). ``None`` for ``started`` messages.
"""
kind: AsyncToolMessageKind
tool_call_id: str
status: Literal["running", "finished"]
description: str
result: str | None
# --- Internal: payload ↔ on-the-wire forms -----------------------------------
def _payload_to_json(payload: AsyncToolMessagePayload) -> str:
"""Serialize a payload to its on-the-wire JSON string form.
Fields that don't apply to the payload's kind are omitted (notably
``result`` is left out of ``started`` payloads, since the task hasn't
produced a result yet).
"""
obj: dict[str, Any] = {
"type": _PAYLOAD_TYPE,
"status": payload.status,
"tool_call_id": payload.tool_call_id,
"description": payload.description,
}
if payload.result is not None:
obj["result"] = payload.result
return json.dumps(obj)
def _payload_to_message(payload: AsyncToolMessagePayload) -> LLMStandardMessage:
"""Wrap a payload in the LLM context message shape that matches its kind.
- ``started``: ``role="tool"`` plus ``tool_call_id`` at the top level
(so the message can sit alongside other regular tool-result messages).
- ``intermediate`` / ``final``: ``role="developer"``; ``tool_call_id``
lives only inside the JSON payload.
"""
content = _payload_to_json(payload)
if payload.kind == "started":
return {
"role": "tool",
"content": content,
"tool_call_id": payload.tool_call_id,
}
return {
"role": "developer",
"content": content,
}
# --- Builders ----------------------------------------------------------------
def build_started_message(tool_call_id: str) -> LLMStandardMessage:
"""Build a ``started`` async-tool message for an LLM context.
Append the returned message to the LLM context immediately when an async
function call (registered with ``cancel_on_interruption=False``) starts
running. The message lets the model know a task is in flight and that its
results will arrive later in subsequent ``developer``-role messages.
Args:
tool_call_id: The id of the tool invocation this message is for.
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return _payload_to_message(
AsyncToolMessagePayload(
kind="started",
tool_call_id=tool_call_id,
status=_STATUS_RUNNING,
description=_STARTED_DESCRIPTION,
result=None,
)
)
def build_intermediate_result_message(tool_call_id: str, result: str) -> LLMStandardMessage:
"""Build an intermediate-result async-tool message for an LLM context.
Append the returned message to the LLM context each time the running async
function reports a non-final result via
``result_callback(..., FunctionCallResultProperties(is_final=False))``.
Args:
tool_call_id: The id of the tool invocation the result is for.
result: The JSON-encoded result string (caller is responsible for
encoding the function's actual return value, typically via
``json.dumps``).
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return _payload_to_message(
AsyncToolMessagePayload(
kind="intermediate",
tool_call_id=tool_call_id,
status=_STATUS_RUNNING,
description=_INTERMEDIATE_DESCRIPTION,
result=result,
)
)
def build_final_result_message(tool_call_id: str, result: str) -> LLMStandardMessage:
"""Build a final-result async-tool message for an LLM context.
Append the returned message to the LLM context when the async function
finishes. After this message no further async-tool messages will arrive
for this ``tool_call_id``.
Args:
tool_call_id: The id of the tool invocation the result is for.
result: The JSON-encoded result string, or the literal ``"COMPLETED"``
sentinel when the function returned ``None`` (matching the same
convention used for synchronous tool calls).
Returns:
A message ready to pass to ``LLMContext.add_message``.
"""
return _payload_to_message(
AsyncToolMessagePayload(
kind="final",
tool_call_id=tool_call_id,
status=_STATUS_FINISHED,
description=_FINAL_DESCRIPTION,
result=result,
)
)
# --- Parsing -----------------------------------------------------------------
def parse_message(message: LLMStandardMessage) -> AsyncToolMessagePayload | None:
"""Decode an async-tool message payload, or return None if not async-tool.
Args:
message: A standard message from the LLM context. Callers iterating
over ``LLMContext.get_messages()`` should filter out
``LLMSpecificMessage`` entries first; only ``LLMStandardMessage``
values can carry async-tool payloads.
Returns:
An ``AsyncToolMessagePayload`` if the message is a recognized
async-tool payload, otherwise ``None``.
"""
role = message.get("role")
if role not in ("tool", "developer"):
return None
content = message.get("content")
if not isinstance(content, str):
return None
try:
payload = json.loads(content)
except (json.JSONDecodeError, ValueError):
return None
if not isinstance(payload, dict) or payload.get("type") != _PAYLOAD_TYPE:
return None
tool_call_id = payload.get("tool_call_id")
status = payload.get("status")
if not isinstance(tool_call_id, str) or status not in (_STATUS_RUNNING, _STATUS_FINISHED):
return None
description = payload.get("description", "")
if not isinstance(description, str):
description = ""
result = payload.get("result")
if result is not None and not isinstance(result, str):
result = None
if result is None:
kind: AsyncToolMessageKind = "started"
elif status == _STATUS_FINISHED:
kind = "final"
else:
kind = "intermediate"
return AsyncToolMessagePayload(
kind=kind,
tool_call_id=tool_call_id,
status=status,
description=description,
result=result,
)

File diff suppressed because it is too large Load Diff

View File

@@ -17,7 +17,7 @@ import asyncio
import dataclasses
import traceback
import warnings
from collections.abc import Awaitable, Callable, Coroutine
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from enum import Enum
from typing import (
@@ -217,9 +217,6 @@ class FrameProcessor(BaseObject):
# Clock
self._clock: BaseClock | None = None
# Task Manager
self._task_manager: BaseTaskManager | None = None
# Observer
self._observer: BaseObserver | None = None
@@ -368,20 +365,6 @@ class FrameProcessor(BaseObject):
"""
return self._report_only_initial_ttfb
@property
def task_manager(self) -> BaseTaskManager:
"""Get the task manager for this processor.
Returns:
The task manager instance.
Raises:
Exception: If the task manager is not initialized.
"""
if not self._task_manager:
raise Exception(f"{self} TaskManager is still not initialized.")
return self._task_manager
@property
def pipeline_task(self) -> PipelineTask | None:
"""Get the :class:`PipelineTask` this processor is running in.
@@ -511,43 +494,14 @@ class FrameProcessor(BaseObject):
await self.stop_processing_metrics()
await self.stop_text_aggregation_metrics()
def create_task(self, coroutine: Coroutine, name: str | None = None) -> asyncio.Task:
"""Create a new task managed by this processor.
Args:
coroutine: The coroutine to run in the task.
name: Optional name for the task.
Returns:
The created asyncio task.
"""
if name:
name = f"{self}::{name}"
else:
name = f"{self}::{coroutine.cr_code.co_name}"
return self.task_manager.create_task(coroutine, name)
async def cancel_task(self, task: asyncio.Task, timeout: float | None = 1.0):
"""Cancel a task managed by this processor.
A default timeout if 1 second is used in order to avoid potential
freezes caused by certain libraries that swallow
`asyncio.CancelledError`.
Args:
task: The task to cancel.
timeout: Optional timeout for task cancellation.
"""
await self.task_manager.cancel_task(task, timeout)
async def setup(self, setup: FrameProcessorSetup):
"""Set up the processor with required components.
Args:
setup: Configuration object containing setup parameters.
"""
await super().setup(setup.task_manager)
self._clock = setup.clock
self._task_manager = setup.task_manager
self._observer = setup.observer
self._pipeline_task = setup.pipeline_task
@@ -555,7 +509,7 @@ class FrameProcessor(BaseObject):
self.__create_input_task()
if self._metrics is not None:
await self._metrics.setup(self._task_manager)
await self._metrics.setup(self.task_manager)
async def cleanup(self):
"""Clean up processor resources."""
@@ -877,14 +831,19 @@ class FrameProcessor(BaseObject):
current_is_uninterruptible = isinstance(
self.__process_current_frame, UninterruptibleFrame
)
if current_is_uninterruptible or self.__process_queue.has_uninterruptible:
# We don't want to cancel an UninterruptibleFrame (either the
# one currently being processed or one waiting in the queue),
# so we simply cleanup the queue keeping only
# UninterruptibleFrames.
if current_is_uninterruptible:
# The frame currently being processed is uninterruptible, so we
# must not cancel it. Just flush non-uninterruptible frames from
# the queue; any uninterruptible ones will be kept and processed
# after the current frame finishes.
self.__reset_process_queue()
else:
# Cancel and re-create the process task.
# Cancel and re-create the process task. Previously this branch
# was skipped when the queue contained an uninterruptible frame,
# which caused slow non-uninterruptible frames to block
# interruptions. Uninterruptible queued frames are safe here
# because __create_process_task calls __reset_process_queue
# internally, which always preserves them.
await self.__cancel_process_task()
self.__create_process_task()
except Exception as e:

View File

@@ -10,6 +10,11 @@ from pipecat.processors.frameworks.rtvi.frames import (
RTVIClientMessageFrame,
RTVIServerMessageFrame,
RTVIServerResponseFrame,
RTVIUICancelTaskFrame,
RTVIUICommandFrame,
RTVIUIEventFrame,
RTVIUISnapshotFrame,
RTVIUITaskFrame,
)
from pipecat.processors.frameworks.rtvi.observer import (
RTVIFunctionCallReportLevel,
@@ -26,4 +31,9 @@ __all__ = [
"RTVIProcessor",
"RTVIServerMessageFrame",
"RTVIServerResponseFrame",
"RTVIUICancelTaskFrame",
"RTVIUICommandFrame",
"RTVIUIEventFrame",
"RTVIUISnapshotFrame",
"RTVIUITaskFrame",
]

View File

@@ -10,6 +10,7 @@ from dataclasses import dataclass
from typing import Any
from pipecat.frames.frames import SystemFrame
from pipecat.processors.frameworks.rtvi.models import UITaskData
@dataclass
@@ -27,6 +28,132 @@ class RTVIServerMessageFrame(SystemFrame):
return f"{self.name}(data: {self.data})"
@dataclass
class RTVIUICommandFrame(SystemFrame):
"""A frame for sending a UI command to the client.
Pipeline-side counterpart of the ``ui-command`` RTVI message.
The observer wraps the ``command`` + ``payload`` into a
``UICommandMessage`` envelope before pushing it to the transport,
so the wire shape is:
``{label, type: "ui-command", data: {command, payload}}``.
Parameters:
command: App-defined command (e.g. ``"toast"``,
``"navigate"``, or any app-specific command).
payload: App-defined payload. Pydantic command models
(``Toast``, ``Navigate``, ``ScrollTo``, ...) should be
converted to a plain dict via ``model_dump()`` before
being placed here; an arbitrary dict works as well.
"""
command: str = ""
payload: Any = None
def __str__(self):
"""String representation of the UI command frame."""
return f"{self.name}(command: {self.command})"
@dataclass
class RTVIUITaskFrame(SystemFrame):
"""A frame for sending a UI task lifecycle envelope to the client.
Pipeline-side counterpart of the ``ui-task`` RTVI message. The
observer wraps the ``data`` into a ``UITaskMessage`` envelope
before pushing it to the transport, so the wire shape is:
``{label, type: "ui-task", data: <one of the four kinds>}``.
Parameters:
data: One of the four task-lifecycle data models from
``rtvi.models`` (``UITaskGroupStartedData``,
``UITaskUpdateData``, ``UITaskCompletedData``, or
``UITaskGroupCompletedData``). The ``kind`` field on
each discriminates which lifecycle phase this is.
"""
data: UITaskData | None = None
def __str__(self):
"""String representation of the UI task frame."""
kind = getattr(self.data, "kind", "?")
return f"{self.name}(kind: {kind})"
@dataclass
class RTVIUIEventFrame(SystemFrame):
"""An inbound UI event from the client.
Pushed downstream by ``RTVIProcessor`` whenever a ``ui-event``
message arrives from the client, alongside firing the
``on_ui_message`` event handler. Mirrors the
frame-and-event pattern used by ``client-message``: pipeline
observers and processors that want to react to UI events at the
pipeline level can match on this frame; code that subscribes to
events instead (like the bridge in ``pipecat-ai-subagents``)
keeps using the event handler.
Parameters:
msg_id: The RTVI message id, as set by the client.
event: App-defined event (the ``data.event`` field).
payload: App-defined payload (the ``data.payload`` field).
"""
msg_id: str = ""
event: str = ""
payload: Any = None
def __str__(self):
"""String representation of the UI event frame."""
return f"{self.name}(event: {self.event})"
@dataclass
class RTVIUISnapshotFrame(SystemFrame):
"""An inbound accessibility-snapshot from the client.
Pushed downstream by ``RTVIProcessor`` whenever a ``ui-snapshot``
message arrives, alongside firing ``on_ui_message``. Carries
the serialized accessibility tree the client took of its DOM.
Parameters:
msg_id: The RTVI message id, as set by the client.
tree: The serialized accessibility tree.
"""
msg_id: str = ""
tree: Any = None
def __str__(self):
"""String representation of the UI snapshot frame."""
return f"{self.name}"
@dataclass
class RTVIUICancelTaskFrame(SystemFrame):
"""An inbound user-task-group cancellation request from the client.
Pushed downstream by ``RTVIProcessor`` whenever a
``ui-cancel-task`` message arrives, alongside firing
``on_ui_message``. The server-side framework should look up the
matching task group and cancel it (subject to whatever
cancellable policy the group was registered with).
Parameters:
msg_id: The RTVI message id, as set by the client.
task_id: The task group id the client wants cancelled.
reason: Optional human-readable reason.
"""
msg_id: str = ""
task_id: str = ""
reason: str | None = None
def __str__(self):
"""String representation of the UI cancel-task frame."""
return f"{self.name}(task_id: {self.task_id})"
@dataclass
class RTVIClientMessageFrame(SystemFrame):
"""A frame for sending messages from the client to the RTVI server.

View File

@@ -20,14 +20,14 @@ from typing import (
Literal,
)
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict
from pipecat.frames.frames import (
AggregationType,
)
# -- Constants --
PROTOCOL_VERSION = "1.2.0"
PROTOCOL_VERSION = "1.3.0"
MESSAGE_LABEL = "rtvi-ai"
MessageLiteral = Literal["rtvi-ai"]
@@ -549,3 +549,474 @@ class SystemLogMessage(BaseModel):
label: MessageLiteral = MESSAGE_LABEL
type: Literal["system-log"] = "system-log"
data: TextMessageData
# -- UI Agent Protocol -------------------------------------------------------
#
# A structured RTVI message vocabulary that lets server-side AI agents
# observe and drive a GUI app on the client side. The protocol covers
# five first-class RTVI message types:
#
# ui-event client-to-server event message
# ui-command server-to-client command message
# ui-snapshot client-to-server accessibility snapshot
# ui-cancel-task client-to-server cancellation request
# ui-task server-to-client task lifecycle envelope
#
# This section is data only (constants and payload models, no
# behavior). Higher-level frameworks like ``pipecat-ai-subagents``
# build the agent abstractions on top, and single-LLM Pipecat apps can
# target the same wire format directly via custom tools that emit
# typed RTVI messages with these types. The matching client-side
# implementation lives in ``@pipecat-ai/client-js`` and
# ``@pipecat-ai/client-react``.
# The wire-format ``type`` strings (``"ui-event"``, ``"ui-command"``,
# ``"ui-snapshot"``, ``"ui-cancel-task"``, ``"ui-task"``) are pinned
# as ``Literal[...]`` field defaults on the corresponding ``*Message``
# pydantic class below, matching the convention used for every other
# RTVI message type in this module.
# Each ``ui-task`` envelope carries a ``kind`` field that the client's
# task reducer dispatches on. The four kinds form the lifecycle of a
# user-facing task group:
#
# group_started → task_update* → task_completed × N → group_completed
#
# where N is the number of workers in the group. The kind strings are
# pinned as ``Literal[...]`` defaults on the matching ``UITask*Data``
# class below.
# -- UI envelope data classes --
class UIEventData(BaseModel):
"""Inner ``data`` for a ``ui-event`` message.
Parameters:
event: App-defined event.
payload: App-defined payload, schemaless by design.
"""
event: str
payload: Any | None = None
class UICommandData(BaseModel):
"""Inner ``data`` for a ``ui-command`` message.
Parameters:
command: App-defined command.
payload: App-defined payload (already a plain dict by the
time it lands on the wire). The standard command payload models
below produce the right shape via ``model_dump()``.
"""
command: str
payload: Any | None = None
class A11yNode(BaseModel):
"""One node in the UI accessibility snapshot tree.
Mirrors the client-side ``A11yNode`` wire shape. Extra fields are
allowed so clients can add platform-specific or future metadata
without breaking older servers.
Parameters:
ref: Stable client-assigned element reference.
role: ARIA-style role for the node.
name: Optional accessible name.
value: Optional current value for inputs/progress/etc.
state: Optional short state tags (e.g. ``"focused"``,
``"disabled"``, ``"offscreen"``).
level: Optional heading level.
colcount: Optional column count for grid-like containers.
rowcount: Optional row count for grid-like containers.
children: Optional child nodes.
"""
model_config = ConfigDict(extra="allow")
ref: str
role: str
name: str | None = None
value: str | None = None
state: list[str] | None = None
level: int | None = None
colcount: int | None = None
rowcount: int | None = None
children: list["A11yNode"] | None = None
class A11ySelection(BaseModel):
"""The user's current text selection in the UI snapshot.
Extra fields are allowed for forward compatibility with client
snapshot additions.
Parameters:
ref: Ref of the element that carries the selection.
text: Selected text.
start_offset: Optional selection start offset.
end_offset: Optional selection end offset.
"""
model_config = ConfigDict(extra="allow")
ref: str
text: str
start_offset: int | None = None
end_offset: int | None = None
class A11ySnapshot(BaseModel):
"""Client accessibility snapshot sent in a ``ui-snapshot`` message.
Mirrors the client-side ``A11ySnapshot`` wire shape. Extra fields
are allowed so clients can add compatible metadata over time.
Parameters:
root: Root accessibility node.
captured_at: Client-side epoch milliseconds when captured.
selection: Optional current text selection.
"""
model_config = ConfigDict(extra="allow")
root: A11yNode
captured_at: int
selection: A11ySelection | None = None
class UISnapshotData(BaseModel):
"""Inner ``data`` for a ``ui-snapshot`` message.
The accessibility snapshot tree mirrors the client-side
``A11ySnapshot`` wire shape and is kept forward-compatible by
allowing extra fields on the snapshot models.
Parameters:
tree: The serialized accessibility tree.
"""
tree: A11ySnapshot
class UICancelTaskData(BaseModel):
"""Inner ``data`` for a ``ui-cancel-task`` message.
Parameters:
task_id: The task group id the client wants cancelled.
reason: Optional human-readable reason.
"""
task_id: str
reason: str | None = None
class UITaskGroupStartedData(BaseModel):
"""``data`` for a ``ui-task`` envelope with kind ``group_started``.
Parameters:
kind: Always ``"group_started"``.
task_id: Shared task identifier for the group.
agents: Names of the agents the work was dispatched to.
label: Optional human-readable label for the group.
cancellable: Whether the client may request cancellation.
at: Epoch milliseconds when the group started.
"""
kind: Literal["group_started"] = "group_started"
task_id: str
agents: list[str] | None = None
label: str | None = None
cancellable: bool = True
at: int = 0
class UITaskUpdateData(BaseModel):
"""``data`` for a ``ui-task`` envelope with kind ``task_update``.
Parameters:
kind: Always ``"task_update"``.
task_id: The shared task identifier.
agent_name: The worker that produced the update.
data: The worker's update payload, forwarded verbatim.
at: Epoch milliseconds when the update was emitted.
"""
kind: Literal["task_update"] = "task_update"
task_id: str
agent_name: str
data: Any | None = None
at: int = 0
class UITaskCompletedData(BaseModel):
"""``data`` for a ``ui-task`` envelope with kind ``task_completed``.
Parameters:
kind: Always ``"task_completed"``.
task_id: The shared task identifier.
agent_name: The worker that produced the response.
status: Completion status string.
response: The worker's response payload.
at: Epoch milliseconds when the response was received.
"""
kind: Literal["task_completed"] = "task_completed"
task_id: str
agent_name: str
status: str
response: Any | None = None
at: int = 0
class UITaskGroupCompletedData(BaseModel):
"""``data`` for a ``ui-task`` envelope with kind ``group_completed``.
Parameters:
kind: Always ``"group_completed"``.
task_id: The shared task identifier.
at: Epoch milliseconds when the group completed.
"""
kind: Literal["group_completed"] = "group_completed"
task_id: str
at: int = 0
#: Discriminated union over the four task-lifecycle data shapes,
#: keyed by the ``kind`` field.
UITaskData = (
UITaskGroupStartedData | UITaskUpdateData | UITaskCompletedData | UITaskGroupCompletedData
)
# -- UI envelope message classes --
class UIEventMessage(BaseModel):
"""RTVI ``ui-event`` message (client → server)."""
label: MessageLiteral = MESSAGE_LABEL
type: Literal["ui-event"] = "ui-event"
id: str
data: UIEventData
class UICommandMessage(BaseModel):
"""RTVI ``ui-command`` message (server → client)."""
label: MessageLiteral = MESSAGE_LABEL
type: Literal["ui-command"] = "ui-command"
data: UICommandData
class UISnapshotMessage(BaseModel):
"""RTVI ``ui-snapshot`` message (client → server)."""
label: MessageLiteral = MESSAGE_LABEL
type: Literal["ui-snapshot"] = "ui-snapshot"
id: str
data: UISnapshotData
class UICancelTaskMessage(BaseModel):
"""RTVI ``ui-cancel-task`` message (client → server)."""
label: MessageLiteral = MESSAGE_LABEL
type: Literal["ui-cancel-task"] = "ui-cancel-task"
id: str
data: UICancelTaskData
class UITaskMessage(BaseModel):
"""RTVI ``ui-task`` message (server → client).
The ``data`` field is one of the four task-lifecycle
discriminated by the ``kind`` field.
"""
label: MessageLiteral = MESSAGE_LABEL
type: Literal["ui-task"] = "ui-task"
data: UITaskData
# -- UI command payloads --
#
# These models describe commands that have matching default React
# handlers in ``@pipecat-ai/client-react``'s ``standardHandlers``.
# Apps can use them as-is, override the client handler to customize
# rendering, or ignore them entirely and define their own command
# names.
#
# Server-side helpers that send commands accept these models directly.
# ``BaseModel.model_dump()`` converts them to the plain-dict shape
# that travels over the wire.
class Toast(BaseModel):
"""A transient notification surface shown on the client.
Parameters:
title: Required headline.
subtitle: Optional second line beneath the title.
description: Optional body text.
image_url: Optional leading image.
duration_ms: Optional dismiss timer. Client default applies
when None.
"""
title: str
subtitle: str | None = None
description: str | None = None
image_url: str | None = None
duration_ms: int | None = None
class Navigate(BaseModel):
"""Client-side navigation to a named view.
Parameters:
view: App-defined view name (route, screen id, tab key, etc.).
params: Optional view-specific parameters.
"""
view: str
params: dict | None = None
class ScrollTo(BaseModel):
"""Scroll a target element into view.
The client resolves the target by ``ref`` first (a snapshot ref
like ``"e42"`` assigned by the a11y walker), then falls back to
``target_id`` (``document.getElementById``). Supply whichever you
have; ``ref`` is the normal choice when acting on a node from
``<ui_state>``.
Parameters:
ref: Snapshot ref from ``<ui_state>``.
target_id: Element id registered on the client.
behavior: Optional scroll behavior hint. Typical values:
``"smooth"`` or ``"instant"``. Clients may ignore.
"""
ref: str | None = None
target_id: str | None = None
behavior: str | None = None
class Highlight(BaseModel):
"""Briefly emphasize a target element (flash, glow, pulse).
Parameters:
ref: Snapshot ref from ``<ui_state>``.
target_id: Element id registered on the client.
duration_ms: Optional highlight duration. Client default
applies when None.
"""
ref: str | None = None
target_id: str | None = None
duration_ms: int | None = None
class Focus(BaseModel):
"""Move input focus to a target element.
Parameters:
ref: Snapshot ref from ``<ui_state>``.
target_id: Element id registered on the client.
"""
ref: str | None = None
target_id: str | None = None
class Click(BaseModel):
"""Click an element on the client.
Closes the form-fill loop for non-text inputs (checkboxes, radios)
and exposes the rest of the action vocabulary (submit buttons,
links, app-specific clickable nodes). The standard handler
silently no-ops on ``disabled`` targets so the agent can't bypass
UI affordances the user is meant to control.
For native ``<select>``, prefer ``SetInputValue`` (clicking
options doesn't reliably change the selection); for custom
comboboxes (ARIA listbox + popup), apps wire their own command
matching the library's interaction model.
Parameters:
ref: Snapshot ref from ``<ui_state>``.
target_id: Element id registered on the client. Used as a
fallback when ``ref`` is not set or has gone stale.
"""
ref: str | None = None
target_id: str | None = None
class SetInputValue(BaseModel):
"""Write a value into a text input or textarea on the client.
Use this for form-filling: the agent has decided what should go
into a field (clarifying answer, tax form entry, etc.) and asks
the client to populate it. With ``replace=True`` (the default),
the existing value is overwritten; with ``replace=False`` the
value is appended.
The standard handler silently no-ops on ``disabled``, ``readonly``,
and ``<input type="hidden">`` targets so the agent can't write
into fields the user can't.
Parameters:
value: The text to write.
ref: Snapshot ref from ``<ui_state>``. Typically the ref of
an ``<input>`` or ``<textarea>``.
target_id: Element id registered on the client. Used as a
fallback when ``ref`` is not set or has gone stale.
replace: When True (the default), overwrite the current
value. When False, append to it.
"""
value: str = ""
ref: str | None = None
target_id: str | None = None
replace: bool = True
class SelectText(BaseModel):
"""Select text on the page so the user can see what the agent means.
Mirror of the ``selection`` field surfaced in the snapshot. Use
this to point the user's attention at a specific paragraph or
range after the agent has decided what it's referring to.
With ``start_offset`` and ``end_offset`` omitted, the entire
target's text content is selected (``Range.selectNodeContents``
for document elements; ``el.select()`` for ``<input>`` /
``<textarea>``).
Parameters:
ref: Snapshot ref from ``<ui_state>``. Typically the ref of
a paragraph or input element.
target_id: Element id registered on the client. Used as a
fallback when ``ref`` is not set or has gone stale.
start_offset: Character offset within the target's text
where the selection should start. For ``<input>`` and
``<textarea>`` this is the value offset; for document
elements it is computed against the concatenation of
descendant text nodes in document order.
end_offset: End character offset, exclusive. Same coordinate
system as ``start_offset``.
"""
ref: str | None = None
target_id: str | None = None
start_offset: int | None = None
end_offset: int | None = None

View File

@@ -58,6 +58,8 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi.frames import (
RTVIServerMessageFrame,
RTVIServerResponseFrame,
RTVIUICommandFrame,
RTVIUITaskFrame,
)
from pipecat.transports.base_output import BaseOutputTransport
from pipecat.utils.string import match_endofsentence
@@ -430,6 +432,15 @@ class RTVIObserver(BaseObserver):
elif isinstance(frame, RTVIServerMessageFrame):
message = RTVI.ServerMessage(data=frame.data)
await self.send_rtvi_message(message)
elif isinstance(frame, RTVIUICommandFrame):
message = RTVI.UICommandMessage(
data=RTVI.UICommandData(command=frame.command, payload=frame.payload)
)
await self.send_rtvi_message(message)
elif isinstance(frame, RTVIUITaskFrame):
if frame.data is not None:
message = RTVI.UITaskMessage(data=frame.data)
await self.send_rtvi_message(message)
elif isinstance(frame, RTVIServerResponseFrame):
if frame.error is not None:
await self._send_error_response(frame)

View File

@@ -32,7 +32,12 @@ from pipecat.frames.frames import (
SystemFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi.frames import RTVIClientMessageFrame
from pipecat.processors.frameworks.rtvi.frames import (
RTVIClientMessageFrame,
RTVIUICancelTaskFrame,
RTVIUIEventFrame,
RTVIUISnapshotFrame,
)
from pipecat.processors.frameworks.rtvi.observer import RTVIObserver, RTVIObserverParams
from pipecat.services.llm_service import (
FunctionCallParams, # TODO(aleix): we shouldn't import `services` from `processors`
@@ -76,6 +81,7 @@ class RTVIProcessor(FrameProcessor):
self._register_event_handler("on_bot_started")
self._register_event_handler("on_client_ready")
self._register_event_handler("on_client_message")
self._register_event_handler("on_ui_message")
self._input_transport = None
self._transport = transport
@@ -288,6 +294,41 @@ class RTVIProcessor(FrameProcessor):
case "client-message":
data = RTVI.RawClientMessageData.model_validate(message.data)
await self._handle_client_message(message.id, data)
case "ui-event":
event_data = RTVI.UIEventData.model_validate(message.data or {})
await self.push_frame(
RTVIUIEventFrame(
msg_id=message.id,
event=event_data.event,
payload=event_data.payload,
)
)
await self._call_event_handler(
"on_ui_message",
RTVI.UIEventMessage(id=message.id, data=event_data),
)
case "ui-snapshot":
snapshot_data = RTVI.UISnapshotData.model_validate(message.data or {})
await self.push_frame(
RTVIUISnapshotFrame(msg_id=message.id, tree=snapshot_data.tree)
)
await self._call_event_handler(
"on_ui_message",
RTVI.UISnapshotMessage(id=message.id, data=snapshot_data),
)
case "ui-cancel-task":
cancel_data = RTVI.UICancelTaskData.model_validate(message.data or {})
await self.push_frame(
RTVIUICancelTaskFrame(
msg_id=message.id,
task_id=cancel_data.task_id,
reason=cancel_data.reason,
)
)
await self._call_event_handler(
"on_ui_message",
RTVI.UICancelTaskMessage(id=message.id, data=cancel_data),
)
case "llm-function-call-result":
data = RTVI.LLMFunctionCallResultData.model_validate(message.data)
await self._handle_function_call_result(data)

View File

@@ -27,7 +27,7 @@ try:
gi.require_version("Gst", "1.0")
gi.require_version("GstApp", "1.0")
from gi.repository import Gst, GstApp
from gi.repository import Gst, GstApp # pyright: ignore[reportAttributeAccessIssue]
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(

View File

@@ -20,7 +20,6 @@ from pipecat.metrics.metrics import (
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -40,36 +39,12 @@ class FrameProcessorMetrics(BaseObject):
processing times, and usage statistics.
"""
super().__init__()
self._task_manager = None
self._start_ttfb_time = 0
self._start_processing_time = 0
self._start_text_aggregation_time = 0
self._last_ttfb_time = 0
self._should_report_ttfb = True
async def setup(self, task_manager: BaseTaskManager):
"""Set up the metrics collector with a task manager.
Args:
task_manager: The task manager for handling async operations.
"""
self._task_manager = task_manager
async def cleanup(self):
"""Clean up metrics collection resources."""
await super().cleanup()
@property
def task_manager(self) -> BaseTaskManager:
"""Get the associated task manager.
Returns:
The task manager instance for async operations.
"""
if self._task_manager is None:
raise RuntimeError("task_manager not set; call setup() first")
return self._task_manager
@property
def ttfb(self) -> float | None:
"""Get the current TTFB value in seconds.

View File

@@ -209,6 +209,17 @@ def _configure_server_app(args: argparse.Namespace):
logger.warning(f"Unknown transport type: {args.transport}")
def _resolve_download_path(folder: str, filename: str) -> Path:
"""Resolve a download path and ensure it stays within the downloads folder."""
allowed_base = Path(folder).resolve()
file_path = (allowed_base / filename).resolve()
if not file_path.is_relative_to(allowed_base):
raise HTTPException(status_code=403, detail="Access denied")
return file_path
def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
"""Set up WebRTC-specific routes."""
try:
@@ -250,16 +261,16 @@ def _setup_webrtc_routes(app: FastAPI, args: argparse.Namespace):
async def download_file(filename: str):
"""Handle file downloads."""
if not args.folder:
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
return
logger.warning(f"Attempting to download {filename}, but downloads folder not setup.")
raise HTTPException(404)
file_path = Path(args.folder) / filename
if not os.path.exists(file_path):
file_path = _resolve_download_path(args.folder, filename)
if not file_path.exists():
raise HTTPException(404)
media_type, _ = mimetypes.guess_type(file_path)
return FileResponse(path=file_path, media_type=media_type, filename=filename)
return FileResponse(path=file_path, media_type=media_type, filename=file_path.name)
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(

View File

@@ -509,35 +509,29 @@ async def create_transport(
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"telnyx": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"plivo": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
"exotel": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
# add_wav_header and serializer will be set automatically
),
}

View File

@@ -12,7 +12,6 @@ Amazon Bedrock AgentCore Runtime and streams their responses as LLMTextFrames.
import asyncio
import json
import os
from collections.abc import Callable
import aioboto3
@@ -27,6 +26,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.aws.utils import resolve_credentials
def default_context_to_payload_transformer(
@@ -122,8 +122,11 @@ class AWSAgentCoreProcessor(FrameProcessor):
Args:
agentArn: The Amazon Web Services Resource Name (ARN) of the agent.
aws_access_key: AWS access key ID. If None, uses default credentials.
aws_secret_key: AWS secret access key. If None, uses default credentials.
aws_access_key: AWS access key ID. If None, falls back to
environment variables and the default boto3 credential chain
(instance profiles, IRSA, ECS task roles, SSO, etc.).
aws_secret_key: AWS secret access key. Same fallback behaviour as
``aws_access_key``.
aws_session_token: AWS session token for temporary credentials.
aws_region: AWS region.
context_to_payload_transformer: Optional callable to transform
@@ -139,13 +142,13 @@ class AWSAgentCoreProcessor(FrameProcessor):
self._agentArn = agentArn
self._aws_session = aioboto3.Session()
# Store AWS session parameters for creating client in async context
self._aws_params = {
"aws_access_key_id": aws_access_key or os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": aws_secret_key or os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"),
"region_name": aws_region or os.getenv("AWS_REGION", "us-east-1"),
}
# Resolve credentials using the shared chain (explicit → env → boto3).
self._aws_params = resolve_credentials(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
aws_session_token=aws_session_token,
region=aws_region,
).to_boto_kwargs()
# Set transformers with defaults
self._context_to_payload_transformer = (
@@ -204,7 +207,8 @@ class AWSAgentCoreProcessor(FrameProcessor):
# aioboto3's `client()` is an async context manager but its stubs don't
# advertise `__aenter__` / `__aexit__` in a way pyright can see.
async with self._aws_session.client( # pyright: ignore[reportGeneralTypeIssues]
"bedrock-agentcore", **self._aws_params
"bedrock-agentcore",
**self._aws_params, # pyright: ignore[reportArgumentType]
) as client:
# Invoke the AgentCore agent
response = await client.invoke_agent_runtime(

View File

@@ -13,7 +13,6 @@ function calling.
import asyncio
import json
import os
import re
from dataclasses import dataclass, field
from typing import Any
@@ -36,6 +35,7 @@ from pipecat.frames.frames import (
from pipecat.metrics.metrics import LLMTokenUsage
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.utils import resolve_credentials
from pipecat.services.llm_service import LLMService
from pipecat.services.settings import NOT_GIVEN, LLMSettings, _NotGiven, assert_given
from pipecat.utils.tracing.service_decorators import traced_llm
@@ -135,8 +135,11 @@ class AWSBedrockLLMService(LLMService[AWSBedrockLLMAdapter]):
.. deprecated:: 0.0.105
Use ``settings=AWSBedrockLLMService.Settings(model=...)`` instead.
aws_access_key: AWS access key ID. If None, uses default credentials.
aws_secret_key: AWS secret access key. If None, uses default credentials.
aws_access_key: AWS access key ID. If None, falls back to
environment variables and the default boto3 credential chain
(instance profiles, IRSA, ECS task roles, SSO, etc.).
aws_secret_key: AWS secret access key. Same fallback behaviour as
``aws_access_key``.
aws_session_token: AWS session token for temporary credentials.
aws_region: AWS region for the Bedrock service.
params: Model parameters and configuration.
@@ -215,14 +218,14 @@ class AWSBedrockLLMService(LLMService[AWSBedrockLLMAdapter]):
self._aws_session = aioboto3.Session()
# Store AWS session parameters for creating client in async context
self._aws_params = {
"aws_access_key_id": aws_access_key or os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": aws_secret_key or os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"),
"region_name": aws_region or os.getenv("AWS_REGION", "us-east-1"),
"config": client_config,
}
# Resolve credentials using the shared chain (explicit → env → boto3).
resolved = resolve_credentials(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
aws_session_token=aws_session_token,
region=aws_region,
)
self._aws_params = {**resolved.to_boto_kwargs(), "config": client_config}
self._retry_timeout_secs = retry_timeout_secs
self._retry_on_timeout = retry_on_timeout

View File

@@ -49,6 +49,7 @@ from pipecat.frames.frames import (
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.aggregators import async_tool_messages
from pipecat.processors.aggregators.llm_context import LLMContext, LLMSpecificMessage
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.session_continuation import (
@@ -620,6 +621,45 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]):
# standard tool-result messages — skip them.
if isinstance(message, LLMSpecificMessage):
continue
# Async-tool messages live alongside regular tool messages in the
# context; detect and route them before the regular logic so we
# don't try to send the async-tool envelope JSON as a tool result.
async_payload = async_tool_messages.parse_message(message)
if async_payload is not None:
if async_payload.tool_call_id in self._completed_tool_calls:
continue
if async_payload.kind == "started":
# The provider already issued the tool call and natively
# awaits a result; nothing to send for the started marker.
continue
if async_payload.kind == "intermediate":
logger.error(
f"{self}: Nova Sonic does not support streamed async "
f"tool results; dropping intermediate result for "
f"tool_call_id={async_payload.tool_call_id}. Use a "
f"non-realtime LLM service if your tool needs to "
f"stream intermediate results."
)
await self.push_error(
error_msg="Nova Sonic does not support streamed async tool results.",
)
continue
if async_payload.kind == "final":
# Deliver via the formal toolResult channel — same path
# as a synchronous tool result, just delayed.
if send_new_results:
await self._send_tool_result(
async_payload.tool_call_id, async_payload.result
)
self._completed_tool_calls.add(async_payload.tool_call_id)
continue
# Defensive: any async-tool message must not fall through
# to the regular tool-result block below, even if it
# carries a kind we don't recognize.
continue
# Look for newly-completed "regular" (as opposed to async-tool) results
if message.get("role") == "tool" and message.get("content") not in [
"IN_PROGRESS",
"CANCELLED",
@@ -875,6 +915,8 @@ class AWSNovaSonicLLMService(LLMService[AWSNovaSonicLLMAdapter]):
if not self._stream or not self._prompt_name:
return
logger.debug(f"Sending tool result to Nova Sonic for tool_call_id={tool_call_id}")
content_name = str(uuid.uuid4())
result_content_start = f'''

View File

@@ -63,8 +63,8 @@ class SageMakerBidiClient:
self,
endpoint_name: str,
region: str,
model_invocation_path: str = "",
model_query_string: str = "",
model_invocation_path: str | None = "",
model_query_string: str | None = "",
):
"""Initialize the SageMaker BiDi client.

View File

@@ -11,7 +11,6 @@ speech-to-text transcription with support for multiple languages and audio forma
"""
import json
import os
import random
import string
from collections.abc import AsyncGenerator
@@ -29,7 +28,12 @@ from pipecat.frames.frames import (
StartFrame,
TranscriptionFrame,
)
from pipecat.services.aws.utils import build_event_message, decode_event, get_presigned_url
from pipecat.services.aws.utils import (
build_event_message,
decode_event,
get_presigned_url,
resolve_credentials,
)
from pipecat.services.settings import STTSettings, assert_given
from pipecat.services.stt_latency import AWS_TRANSCRIBE_TTFS_P99
from pipecat.services.stt_service import WebsocketSTTService
@@ -81,9 +85,12 @@ class AWSTranscribeSTTService(WebsocketSTTService):
"""Initialize the AWS Transcribe STT service.
Args:
api_key: AWS secret access key. If None, uses AWS_SECRET_ACCESS_KEY environment variable.
aws_access_key_id: AWS access key ID. If None, uses AWS_ACCESS_KEY_ID environment variable.
aws_session_token: AWS session token for temporary credentials. If None, uses AWS_SESSION_TOKEN environment variable.
api_key: AWS secret access key. If None, falls back to environment
variables and the default boto3 credential chain (instance
profiles, IRSA, ECS task roles, SSO, etc.).
aws_access_key_id: AWS access key ID. Same fallback behaviour as
``api_key``.
aws_session_token: AWS session token for temporary credentials.
region: AWS region for the service.
sample_rate: Audio sample rate in Hz. If None, uses the pipeline sample rate.
AWS Transcribe only supports 8000 or 16000 Hz; other values are
@@ -129,11 +136,19 @@ class AWSTranscribeSTTService(WebsocketSTTService):
self._show_speaker_label = False
self._enable_channel_identification = False
# Resolve credentials using the shared chain (explicit → env → boto3).
resolved = resolve_credentials(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=api_key,
aws_session_token=aws_session_token,
region=region,
)
self._credentials = {
"aws_access_key_id": aws_access_key_id or os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": api_key or os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"),
"region": region or os.getenv("AWS_REGION", "us-east-1"),
"aws_access_key_id": resolved.access_key,
"aws_secret_access_key": resolved.secret_key,
"aws_session_token": resolved.session_token,
"region": resolved.region,
}
self._receive_task = None

View File

@@ -10,7 +10,6 @@ This module provides integration with Amazon Polly for text-to-speech synthesis,
supporting multiple languages, voices, and SSML features.
"""
import os
from collections.abc import AsyncGenerator
from dataclasses import dataclass, field
@@ -23,6 +22,7 @@ from pipecat.frames.frames import (
Frame,
TTSAudioRawFrame,
)
from pipecat.services.aws.utils import resolve_credentials
from pipecat.services.settings import NOT_GIVEN, TTSSettings, _NotGiven
from pipecat.services.tts_service import TTSService
from pipecat.transcriptions.language import Language, resolve_language
@@ -191,8 +191,11 @@ class AWSPollyTTSService(TTSService):
"""Initializes the AWS Polly TTS service.
Args:
api_key: AWS secret access key. If None, uses AWS_SECRET_ACCESS_KEY environment variable.
aws_access_key_id: AWS access key ID. If None, uses AWS_ACCESS_KEY_ID environment variable.
api_key: AWS secret access key. If None, falls back to environment
variables and the default boto3 credential chain (instance
profiles, IRSA, ECS task roles, SSO, etc.).
aws_access_key_id: AWS access key ID. Same fallback behaviour as
``api_key``.
aws_session_token: AWS session token for temporary credentials.
region: AWS region for Polly service. Defaults to 'us-east-1'.
voice_id: Voice ID to use for synthesis. Defaults to 'Joanna'.
@@ -250,13 +253,13 @@ class AWSPollyTTSService(TTSService):
**kwargs,
)
# Get credentials from environment variables if not provided
self._aws_params = {
"aws_access_key_id": aws_access_key_id or os.getenv("AWS_ACCESS_KEY_ID"),
"aws_secret_access_key": api_key or os.getenv("AWS_SECRET_ACCESS_KEY"),
"aws_session_token": aws_session_token or os.getenv("AWS_SESSION_TOKEN"),
"region_name": region or os.getenv("AWS_REGION", "us-east-1"),
}
# Resolve credentials using the shared chain (explicit → env → boto3).
self._aws_params = resolve_credentials(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=api_key,
aws_session_token=aws_session_token,
region=region,
).to_boto_kwargs()
self._aws_session = aioboto3.Session()
@@ -348,7 +351,8 @@ class AWSPollyTTSService(TTSService):
# aioboto3's `client()` is an async context manager but its stubs
# don't advertise `__aenter__` / `__aexit__` to pyright.
async with self._aws_session.client( # pyright: ignore[reportGeneralTypeIssues]
"polly", **self._aws_params
"polly",
**self._aws_params, # pyright: ignore[reportArgumentType]
) as polly:
response = await polly.synthesize_speech(**filtered_params)
if "AudioStream" in response:

Some files were not shown because too many files have changed in this diff Show More