Compare commits

...

222 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
6bf63a7f2f Use modern Python type syntax in new DTMF code
Replace `Optional[X]` with `X | None`, `List[X]` with `list[X]`, and
`Dict[str, Any]` with `dict[str, Any]` in the DTMF frame definitions
and Daily transport code added by this branch.
2026-04-16 09:03:35 -07:00
Aleix Conchillo Flaqué
8d4feede23 Split #4313 changelog into one entry per file 2026-04-16 08:55:03 -07:00
Aleix Conchillo Flaqué
b11a3bc43f Add method field to Daily DTMF output frames
Lets callers specify Daily's DTMF delivery method (e.g. "rfc2833"
or "info") alongside `session_id` and `digit_duration_ms`. Forwarded
to Daily's `send_dtmf` as `method`.
2026-04-16 08:55:03 -07:00
Aleix Conchillo Flaqué
f094ce80fb Add to_string helper on output DTMF frames
Mirrors the existing `from_string` classmethod and lets callers
turn a frame's `buttons` list back into a dial string like `"123#"`.
`__str__` and the Daily transport's native DTMF path reuse it.
2026-04-15 15:14:47 -07:00
Aleix Conchillo Flaqué
9fbe1bf2a3 Document button as a convenience shortcut, not a deprecation
The single-key `button` field on `OutputDTMFFrame` and
`OutputDTMFUrgentFrame` is kept as a first-class ergonomic shortcut
for the common single-keypress case, equivalent to
`buttons=[button]`. `buttons` takes precedence when both are set.
2026-04-15 15:09:01 -07:00
Aleix Conchillo Flaqué
d8b0e78bc8 Represent DTMF sequences as list[KeypadEntry] via buttons field
Replaces the string-based `tones` field with a type-safe
`buttons: list[KeypadEntry]` on `OutputDTMFFrame` and
`OutputDTMFUrgentFrame`, matching the existing singular `button`
field on `InputDTMFFrame`. A `from_string` classmethod builds the
list from a dial string like `"123#"` (invalid characters raise
ValueError from the `KeypadEntry` constructor).

The base output audio fallback now iterates `frame.buttons`
directly, LiveKit sends `frame.buttons[0].value`, and the Daily
transport joins the button values into the single string Daily's
`send_dtmf` expects.
2026-04-15 15:05:45 -07:00
Aleix Conchillo Flaqué
675b7df408 Add tones to OutputDTMFFrame and simplify DTMF frame hierarchy
Introduces a new `tones` field on `OutputDTMFFrame` and
`OutputDTMFUrgentFrame` for sending multi-digit DTMF sequences and
deprecates the existing single-key `button` field. When only `button`
is set, it is used as a single-character `tones` string for backward
compatibility.

`DTMFFrame` is kept as an empty marker class so both input and output
DTMF frames can still be identified via isinstance. `InputDTMFFrame`
keeps its required `button` field (single keypress semantics).

The Daily-specific `DailyOutputDTMFFrame` and
`DailyOutputDTMFUrgentFrame` frames no longer need to override
`button` and simply add `session_id` and `digit_duration_ms`, which
are forwarded to Daily's `send_dtmf` as `sessionId` and
`digitDurationMs`.

The base output audio fallback now iterates `tones` and generates a
tone per character; LiveKit's native DTMF path sends `tones[0]` since
its API is single-tone.
2026-04-15 14:48:02 -07:00
Aleix Conchillo Flaqué
30f39d7395 Add DailyOutputDTMFFrame and DailyOutputDTMFUrgentFrame
Introduces Daily-specific DTMF output frames that carry explicit
`tones`, `session_id` and `digit_duration_ms` fields, forwarded to
Daily's `send_dtmf` as `tones`, `sessionId` and `digitDurationMs`.
The inherited `button` and `transport_destination` fields are
ignored for these frames in the Daily transport.
2026-04-15 14:20:08 -07:00
Aleix Conchillo Flaqué
fe2ef9c712 Add changelog for #4313 2026-04-15 10:43:28 -07:00
Aleix Conchillo Flaqué
173cf39aee Add send_dtmf() to DailyTransport
Exposes the Daily call client's DTMF sending capability so
applications can send tones during a call (e.g. IVR navigation).
2026-04-15 10:43:28 -07:00
Aleix Conchillo Flaqué
457f55e99a Merge pull request #4297 from pipecat-ai/changelog-1.0.0
Release 1.0.0 - Changelog Update
2026-04-14 12:08:35 -07:00
aconchillo
f8318289d4 Update changelog for version 1.0.0 2026-04-14 12:06:43 -07:00
Aleix Conchillo Flaqué
958d90819f Merge pull request #4294 from pipecat-ai/ac/fix-assistant-turn-stopped-event
Fix on_assistant_turn_stopped not firing for tool-call-only responses
2026-04-14 10:09:55 -07:00
Aleix Conchillo Flaqué
403235eb48 Add changelog for #4294 2026-04-14 10:07:19 -07:00
Aleix Conchillo Flaqué
698c2ba92e Fix on_assistant_turn_stopped not firing for empty LLM responses
When the LLM returned zero text tokens (e.g. it was interrupted before producing
tokens or about to push tokens), push_aggregation() returned an empty string and
on_assistant_turn_stopped was never emitted. This left consumers waiting for an
event that would never arrive.

Now on_assistant_turn_stopped always fires, with an empty content string when
the LLM produced no text tokens.

Fixes #4292
2026-04-14 10:07:19 -07:00
Mark Backman
f013d5632b Merge pull request #4293 from pipecat-ai/mb/fix-elevenlabs-tts-enable-logging
Fix ElevenLabs TTS boolean params and add missing features
2026-04-14 12:58:31 -04:00
Mark Backman
570849955c Merge pull request #4295 from pipecat-ai/mb/context-summarization-index-0
Fix context summarization failing with mid-conversation system messages
2026-04-14 12:24:47 -04:00
Mark Backman
84b885682f Add changelog for #4295 2026-04-14 11:49:31 -04:00
Mark Backman
989fb4deaa Fix context summarization failing with mid-conversation system messages
Only treat messages[0] as the initial system prompt when determining the
summarization range. Previously, the code scanned the entire context for
the first system-role message, which caused failures when the only system
message was a mid-conversation injection (e.g. "The user has been quiet").
In that case summary_start exceeded summary_end, producing an empty range
and "No messages to summarize" errors.

Fixes #4286
2026-04-14 11:48:50 -04:00
dhruvladia-sarvam
ab74605a26 Sarvam TTS request id added to agent logs (#4278)
- Added trace logging to correlate Sarvam request_id with context_id
2026-04-14 11:02:05 -04:00
Mark Backman
49998d252b Add changelog for #4293 2026-04-14 10:13:12 -04:00
Mark Backman
84566c1110 Remove unused ElevenLabsOutputFormat and add missing sample rates
Remove dead ElevenLabsOutputFormat type alias. Add pcm_32000 and
pcm_48000 to output_format_from_sample_rate to match the ElevenLabs API.
2026-04-14 10:11:31 -04:00
Mark Backman
45aa95fa10 Fix ElevenLabs boolean query params and add enable_logging to HTTP service
The enable_logging and enable_ssml_parsing URL params used truthy checks,
so False was treated the same as None (both skipped). Also, Python's
str(False) produces "False" but the API expects lowercase "false".

Additionally, add enable_logging support to ElevenLabsHttpTTSService
which was missing entirely.
2026-04-14 10:04:23 -04:00
Mark Backman
d1f7af0330 Merge pull request #4283 from pipecat-ai/mb/user-stop-transcript-improvements 2026-04-13 19:27:05 -04:00
Mark Backman
31b5a64382 Merge pull request #4282 from pipecat-ai/mb/cartesia-stt-settings-update
Reconnect Cartesia STT websocket on settings change
2026-04-13 18:18:36 -04:00
Mark Backman
d20013d7a6 Add changelog for #4283 2026-04-13 18:12:04 -04:00
Mark Backman
804e3ea9ec Trigger turn stop immediately when transcript arrives after p99 timeout
When the STT p99 timeout fires without a transcript, the turn stop
strategy previously did nothing — falling through to the 5-second
user_turn_stop_timeout. Now, a _timeout_expired flag tracks when the
timeout has elapsed so that a late transcript triggers the turn stop
immediately instead of waiting for the fallback.
2026-04-13 18:11:32 -04:00
Aleix Conchillo Flaqué
a14d257cf2 update pytest to >=9 2026-04-13 15:08:47 -07:00
Aleix Conchillo Flaqué
a8660aabfe update uv.lock 2026-04-13 15:06:25 -07:00
Aleix Conchillo Flaqué
7dc763d512 Merge pull request #4272 from pipecat-ai/pk/llm-context-get-messages-elide-large-values
Add truncate_large_values to LLMContext.get_messages()
2026-04-13 15:04:41 -07:00
Mark Backman
36b15c92ef Add changelog for #4282 2026-04-13 17:29:39 -04:00
Mark Backman
64ed0aae13 Reconnect Cartesia STT websocket when settings change at runtime
Previously settings updates were ignored with a TODO comment. Now when
model/language changes via STTUpdateSettingsFrame the service disconnects
and reconnects with the new query parameters.

Key changes:
- Implement _update_settings to disconnect/reconnect on changes
- Check `is not State.OPEN` in run_stt to catch CLOSING state
- Send `done` command before closing for clean session shutdown
- Capture websocket reference in _disconnect_websocket to prevent a
  concurrent _connect from having its new connection nulled by a stale
  finally block
2026-04-13 17:28:34 -04:00
Mark Backman
be81dac723 Merge pull request #4280 from pipecat-ai/mb/resolve-vuln-2026-04-13
Update uv.lock resolving langchain-core and cryptography vulnerabilities
2026-04-13 11:58:25 -04:00
Mark Backman
d942a713af Update uv.lock resolving langchain-core and cryptography vulnerabilities 2026-04-13 11:09:31 -04:00
Filipi da Silva Fuchter
e248c4c049 Merge pull request #4249 from sathwikareddy02/nvidia-tts-update
Add stitching support and enhancements for NvidiaTTSService
2026-04-13 09:39:48 -03:00
filipi87
1d5dcf1698 Invoking to remove the audio context when there is no more audio to receive. 2026-04-13 09:34:13 -03:00
sathwika
f45a410f56 refactor/simplify NvidiaTTSService synthesis stream shutdown 2026-04-13 14:35:17 +05:30
Paul Kompfner
e38647151d Fix language: binary data is replaced with placeholders, not truncated 2026-04-11 14:39:25 -04:00
Paul Kompfner
1a02b5d61a Rename elide_large_values to truncate_large_values 2026-04-11 14:29:05 -04:00
Aleix Conchillo Flaqué
4254c1f0e0 Merge pull request #4273 from pipecat-ai/ac/test-fixes
Fix LLM test constructors and wake phrase test race
2026-04-10 21:27:00 -07:00
Aleix Conchillo Flaqué
f91a113de7 tests: yield in wake phrase strategy setup to let tasks start
The strategy schedules background tasks during setup. Fast-running
tests could observe state before those tasks had a chance to run;
yielding once via asyncio.sleep(0) ensures they do.
2026-04-10 17:37:50 -07:00
Aleix Conchillo Flaqué
e553bb010f tests: migrate LLM tests to Settings-based constructor API
Replace the old `model=` / `params=InputParams(...)` style with the
new `settings=<Service>.Settings(...)` form across LLM service tests.
2026-04-10 17:37:49 -07:00
Paul Kompfner
245339e885 Add changelog for #4272 2026-04-10 16:37:49 -04:00
Paul Kompfner
812cdc6822 Add elide_large_values to LLMContext.get_messages()
Enable callers to get a compact version of context messages suitable
for serialization, logging, and debugging tools. For standard
messages, known binary data (base64 images, audio) is fully elided.
For LLM-specific messages, long string values are recursively
truncated. Adapter get_messages_for_logging() methods now use this.
2026-04-10 16:35:36 -04:00
Aleix Conchillo Flaqué
153814ecc2 scripts/evals: create recording subdirectories when saving audio
Example files can live under subdirectories (e.g. foundational/01.py),
so the recording path needs its parent directory created before the
audio file is written.
2026-04-10 13:19:20 -07:00
Filipi da Silva Fuchter
b1204cc430 Merge pull request #4241 from pipecat-ai/filipi/async_tools_cancellable
Enable async tool cancellation feature.
2026-04-10 15:28:01 -03:00
filipi87
c542167065 Refactored on_function_calls_cancelled to use FunctionCallFromLLM. 2026-04-10 15:06:39 -03:00
Aleix Conchillo Flaqué
02116c58de Merge pull request #4244 from omChauhanDev/fix/vad-stuck-speaking-on-mute
fix VAD stuck in SPEAKING state when audio stops mid-speech
2026-04-10 10:46:53 -07:00
Aleix Conchillo Flaqué
dcd21e7ff4 Rework audio idle detection with timestamp-based adaptive sleep
Replaces the per-frame asyncio.Event signaling with a monotonic
timestamp updated on each audio frame. The handler sleeps until the
next deadline (last_audio_time + timeout), recomputing on each wake-up
to account for audio arriving during sleep.

This avoids waking the handler on every audio frame (~50/s at 20ms
chunks), and guarantees detection latency is bounded by timeout rather
than 2 * timeout.

Also renames audio_starvation_timeout to audio_idle_timeout and
associated identifiers for consistency with existing pipecat naming
(user_idle_timeout, etc.).
2026-04-10 10:35:18 -07:00
Aleix Conchillo Flaqué
5356f3028b Merge pull request #4271 from pipecat-ai/mb/fix-translation-readme
Fix translation example in README
2026-04-10 10:26:27 -07:00
Om Chauhan
cb2c1868b0 fix VAD stuck in SPEAKING state when audio stops mid-speech 2026-04-10 09:54:48 -07:00
Aleix Conchillo Flaqué
dac88c0a47 Merge pull request #4267 from pipecat-ai/ac/fix-observer-cleanup-ordering
Fix observer cleanup ordering to stop proxy tasks before closing resources
2026-04-10 09:05:33 -07:00
kompfner
8e5fe8afda Merge pull request #4067 from omChauhanDev/fix-gemini3-flash-thinking-default
fix: default thinking config for Gemini 3+ Flash models
2026-04-10 10:41:44 -04:00
kompfner
d07eebff20 Merge pull request #4248 from omChauhanDev/add-openai-custom-tools-support
Add custom_tools support for OpenAI adapters
2026-04-10 10:27:28 -04:00
Paul Kompfner
ef4dcca4f1 Update changelog to describe user-facing custom_tools support 2026-04-10 10:23:13 -04:00
Paul Kompfner
fc3307bc63 Use OpenAI SDK types for tool params in adapters and tests
These are TypedDicts (plain dicts at runtime), so no behavioral change
— just more descriptive type hints for readers. Use ToolParam instead
of FunctionToolParam for the Responses adapter to reflect that custom
non-function tools are supported. Use ChatCompletionToolParam instead
of Any for the completions adapter return type. Update tests to use
typed params in expected values.
2026-04-10 10:15:39 -04:00
Mark Backman
da9a55a430 Fix translation example in README 2026-04-10 09:13:42 -04:00
Filipi da Silva Fuchter
094d36904c Merge pull request #4268 from pipecat-ai/filipi/lemonslice_improments
LemonSlice transport updates - new events, extra params
2026-04-10 08:50:39 -03:00
sathwika
746fadc2b5 thread simplification + handling interuption 2026-04-10 17:18:22 +05:30
filipi87
8cce25d2d2 Fixing openai examples. 2026-04-10 08:25:50 -03:00
filipi87
891f00cb5f Using the on_function_calls_cancelled inside the examples. 2026-04-10 07:45:20 -03:00
filipi87
1ca094dad7 Not invoking on_function_calls_started for the cancel function, and creating on_function_calls_cancelled 2026-04-10 07:40:52 -03:00
filipi87
346c585290 Enabling the option to cancel the tools for all the async examples. 2026-04-10 07:31:51 -03:00
jp-lemon
c134110399 LemonSlice transport updates 2026-04-10 07:10:41 -03:00
Aleix Conchillo Flaqué
f9117e6d4a Add changelog for PIPECAT_OBSERVER_FILES removal 2026-04-09 17:39:54 -07:00
Aleix Conchillo Flaqué
360e4480e0 Remove deprecated _load_observer_files in favor of setup files 2026-04-09 17:38:46 -07:00
Aleix Conchillo Flaqué
9b7e15c9bc Add changelog for #4267 2026-04-09 16:55:40 -07:00
Aleix Conchillo Flaqué
00ea86fda8 Fix observer cleanup ordering to stop proxy tasks before closing resources
During pipeline shutdown, proxy tasks must be cancelled before observer
resources are cleaned up. Previously, stop() was called inside
_cancel_tasks() and start() was called in _start_tasks(), which could
lead to proxy tasks still consuming frames after observer resources
were closed.

Now the lifecycle is explicit in _handle_start_frame: start() after all
observers are loaded, and stop() before cleanup() on shutdown.

Also fixes misleading variable name in TaskObserver.cleanup() where
iterating self._proxies yields observer keys, not Proxy values.

Fixes #4195
2026-04-09 16:55:40 -07:00
Aleix Conchillo Flaqué
5f75728207 EventNotifier: update docstring with single-consumer use case 2026-04-09 16:21:42 -07:00
Aleix Conchillo Flaqué
9d274f0fb3 PipelineTask: update dangling task logging 2026-04-09 16:21:05 -07:00
Aleix Conchillo Flaqué
43ddbdf1ec Merge pull request #3797 from iamjr15/fix/idle-processor-event-race
Fix asyncio.Event race conditions in idle processors
2026-04-09 16:04:03 -07:00
iamjr15
565349d332 Fix asyncio.Event race conditions in idle processors
Move event.clear() from finally block to success path in
IdleFrameProcessor and UserIdleProcessor._idle_task_handler().
The finally block unconditionally cleared signals set during
async timeout callbacks, causing false-positive idle detection.

Closes #3402
2026-04-09 13:41:01 -07:00
filipi87
2dd1170229 Updating the Anthropic stream example to allow cancel the location tracking. 2026-04-09 17:26:51 -03:00
filipi87
5cf90cba98 Addressing PR review comments. 2026-04-09 17:11:04 -03:00
Aleix Conchillo Flaqué
981b7bdcb7 Merge pull request #4255 from omChauhanDev/fix/async-gc-collect
PipelineRunner: make _gc_collect async
2026-04-09 12:09:38 -07:00
Filipi da Silva Fuchter
c4320e7f07 Merge pull request #4265 from pipecat-ai/filipi/fix_elevenlabs_token_aggregation
Using the correct default for auto_mode based on text_aggregation_mode.
2026-04-09 15:30:36 -03:00
filipi87
ea0be4d39c Changelog for the elevenlabs fix. 2026-04-09 15:25:06 -03:00
filipi87
dca4e1090a Using the correct default for auto_mode based on text_aggregation_mode. 2026-04-09 15:21:30 -03:00
Cale Shapera
ec574edd53 Add Inworld Realtime Service (#4140)
* Add Inworld Realtime LLM service

Adds a WebSocket-based realtime service for Inworld's cascade
STT/LLM/TTS API with semantic VAD, function calling, and streaming
transcription support.

New files:
- src/pipecat/services/inworld/realtime/ (service, events)
- src/pipecat/adapters/services/inworld_realtime_adapter.py
- examples/foundational/19zb-inworld-realtime.py

Also includes:
- websockets dependency for inworld extra in pyproject.toml
- Adapter and settings tests matching OpenAI/Grok realtime patterns
- Fix for double-response when server-side VAD is enabled

* Prefer init-provided system instruction in Inworld Realtime

Adopt _resolve_system_instruction() from BaseLLMAdapter, matching the
pattern applied to OpenAI Realtime, Grok Realtime, Gemini Live, and
Nova Sonic in the pk/realtime-services-init-v-context-system-instructions-cleanup
branch.

* Update changelog entry with PR number

* Fix changelog format to use bullet point

* Polish PR: default model, example cleanup, changelog update

- Change default model from gpt-4.1-nano to gpt-4.1-mini
- Add function calling demo to example
- Remove demo-testing artifact from system instruction
- Mention Router support in changelog

* Address PR review feedback for Inworld Realtime

- Move example to examples/realtime/realtime-inworld.py
- Change initial context role from "user" to "developer"
- Remove explicit sample rates from example; sync them in
  _ensure_audio_config so Inworld gets the transport's actual rates
- Add audio race condition guard in _handle_evt_audio_delta (matches
  OpenAI realtime pattern)
- Convert remaining "system"/"developer" messages to "user" in adapter
- Add clarifying comment for local-VAD vs server-VAD metrics paths

* Simplify example, add provider tracking, remove local VAD path

- Remove function calling from example, switch model to xai/grok-4-1-fast-non-reasoning
- Add pipecat-realtime session key prefix and provider_data metadata
  for Inworld traffic attribution
- Remove local VAD code path (Inworld only supports server-side VAD)
- Use typed InputAudioBufferAppendEvent for audio sends

* Default TTS model to inworld-tts-1.5-max

* Remove dead shimmed tools code, set STT/VAD defaults

- Remove non-functional AdapterType.SHIM custom tools code from adapter
- Default STT model to assemblyai/u3-rt-pro
- Default VAD eagerness to low
2026-04-09 13:04:17 -04:00
filipi87
772fb57090 Enable async tool cancellation feature. 2026-04-09 10:29:23 -03:00
Filipi da Silva Fuchter
76601944c6 Merge pull request #4230 from pipecat-ai/filipi/async_tools_stream
Support for streaming multiple responses via function calls
2026-04-09 10:26:33 -03:00
filipi87
178985ec8a Refactoring the frame queue to avoid overhead. 2026-04-09 10:24:22 -03:00
filipi87
edc197d050 Creating a new example for async stream using Google. 2026-04-09 09:50:00 -03:00
filipi87
7ece8e3c4a Creating a new example for async stream using Anthropic. 2026-04-09 09:41:07 -03:00
filipi87
7b45a56119 Changelogs for the new feature and the fix. 2026-04-09 09:04:19 -03:00
filipi87
a544f885a3 Added new examples: function-calling-openai-async-stream.py and function-calling-openai-responses-async-stream.py 2026-04-09 09:04:06 -03:00
filipi87
375deac912 Support for streaming multiple responses via function calls. 2026-04-09 09:03:53 -03:00
filipi87
699ca38dc1 Allowing to check if a specific frame is in the queue. 2026-04-09 09:03:06 -03:00
filipi87
aeda60f761 Refactoring the FrameQueue to be able to track any Frame. 2026-04-09 09:02:47 -03:00
Om Chauhan
b010dd58d2 added changelog 2026-04-08 09:37:58 +05:30
Om Chauhan
225ea907d5 make PipelineRunner._gc_collect async 2026-04-08 09:27:18 +05:30
Om Chauhan
1443dfb070 added changelog 2026-04-08 08:48:26 +05:30
Om Chauhan
4bef85e363 added custom_tools support for OpenAI adapters 2026-04-08 08:40:03 +05:30
Mark Backman
0acfb4dd49 Merge pull request #4251 from pipecat-ai/mb/mistral-tts
Add Mistral Voxtral streaming TTS service
2026-04-07 12:50:48 -04:00
Mark Backman
8594401024 Add changelog for PR #4251 2026-04-07 12:32:06 -04:00
Mark Backman
aa7a014518 Add mistral voice example 2026-04-07 12:32:06 -04:00
Filipi da Silva Fuchter
27a8a973b1 Merge pull request #4201 from pipecat-ai/mb/handle-recurring-disconnects
Fix WebsocketService infinite reconnection loop
2026-04-07 11:02:24 -03:00
sathwika
8abda808ca Add Nvidia copyright header 2026-04-07 19:27:04 +05:30
Mark Backman
7f3f23dcb9 Add Mistral Voxtral streaming TTS service
Integrate with Mistral's Voxtral TTS API (voxtral-mini-tts-2603) using
HTTP streaming with Server-Sent Events. Converts base64-encoded float32
PCM chunks from the API to int16 for the Pipecat pipeline.
2026-04-07 09:39:36 -04:00
Filipi da Silva Fuchter
be509e5647 Merge pull request #4245 from kollaikal-rupesh/fix/mixer-cancel-cleanup
Stop audio mixer on pipeline cancellation
2026-04-07 10:36:18 -03:00
sathwika
9f0b18b03d Add changelog fragments for PR #4249 2026-04-07 18:18:55 +05:30
Filipi da Silva Fuchter
6eccd16543 Merge pull request #4217 from pipecat-ai/filipi/async_tools
Supporting async function calls.
2026-04-07 09:35:03 -03:00
filipi87
d8dc6bc7d0 New example for async function calls using Google. 2026-04-07 09:31:22 -03:00
filipi87
d12a8529e2 New example for async function calls using OpenAI responses. 2026-04-07 09:28:01 -03:00
filipi87
aa061f7e2c Renaming the openai and anthropic examples to async instead of delayed. 2026-04-07 09:23:45 -03:00
Filipi da Silva Fuchter
e863293198 Improving docstring description.
Co-authored-by: kompfner <paul@daily.co>
2026-04-07 08:14:39 -04:00
filipi87
9c7d5a9de2 Improving changelog description to mention group_parallel_tools. 2026-04-07 09:13:08 -03:00
Filipi da Silva Fuchter
a451c42dc7 Merge pull request #4247 from pipecat-ai/filipi/background_sound_example
Fixing the background sound example.
2026-04-07 09:06:14 -03:00
sathwika
bc009d8f98 Add stitching support and enhancements for NvidiaTTSService 2026-04-07 14:49:45 +05:30
Rupesh
67ee802772 Remove changelog entry per review feedback 2026-04-06 21:36:53 -07:00
filipi87
ceaa27ee6e Fixing the background sound example. 2026-04-06 18:25:30 -03:00
filipi87
42335e2ef0 Renaming to async_tool and providing description. 2026-04-06 09:56:48 -03:00
Rupesh
7585864113 Stop audio mixer on pipeline cancellation to prevent 100% CPU usage 2026-04-06 01:51:29 -07:00
kompfner
18852adc28 Merge pull request #4242 from pipecat-ai/pk/gemini-live-fix-session-resumption
Fix Gemini Live session resumption hanging after reconnect
2026-04-04 11:43:24 -04:00
Paul Kompfner
f11b6d7151 Fix Gemini Live session resumption hanging after reconnect
After a reconnect, _ready_for_realtime_input was never set back to True
because _create_initial_response (which sets the flag) is only called on
initial connection. This caused all audio/video/text to be silently
dropped after reconnecting, making the bot appear to hang.

Set the flag in _handle_session_ready when we detect a reconnect, either
via session_resumption_handle (server restores state) or via existing
context (rare case where connection drops before first resumption handle).
2026-04-03 18:27:10 -04:00
Paul Kompfner
9df1e18b43 Fix Gemini Live session resumption hanging after reconnect
After a reconnect, _ready_for_realtime_input was never set back to True
because _create_initial_response (which sets the flag) is only called on
initial connection. This caused all audio/video/text to be silently
dropped after reconnecting, making the bot appear to hang.

Set the flag in _handle_session_ready when context already exists
(i.e. reconnect case) since we don't need to go through
_create_initial_response again.
2026-04-03 16:32:03 -04:00
Mark Backman
b8f9a21e0c Merge pull request #4240 from pipecat-ai/mb/remove-old-files
Remove orphaned .dockerignore and CHANGELOG.md.template
2026-04-03 15:40:57 -04:00
Mark Backman
c18d997ad8 Remove orphaned .dockerignore and CHANGELOG.md.template 2026-04-03 14:55:25 -04:00
Mark Backman
56aaebe1b0 Merge pull request #4239 from pipecat-ai/mb/remove-deprecation-module-proxy
Remove DeprecatedModuleProxy and service re-export shims
2026-04-03 14:03:17 -04:00
Mark Backman
916af84974 Remove DeprecatedModuleProxy and service re-export shims
Remove the deprecation proxy infrastructure that allowed old-style flat
imports (e.g. `from pipecat.services.openai import OpenAILLMService`).
Users must now import from specific submodules
(`from pipecat.services.openai.llm import OpenAILLMService`), which is
already the established pattern across all internal code and 179+ examples.

- Strip 32 proxy `__init__.py` files to empty
- Strip 3 non-proxy files with bare star imports (minimax, sambanova, sarvam)
- Strip google/gemini_live `__init__.py` re-exports
- Remove DeprecatedModuleProxy class and helpers from services/__init__.py
- Remove ruff per-file ignore for services/__init__.py
- Fix 2 examples using old-style imports
2026-04-03 13:43:02 -04:00
Mark Backman
3e911b5fa0 Merge pull request #4236 from pipecat-ai/mb/more-deprecation-removals-2026-04-03
Remove deprecated fields, shims, and backward-compatibility code
2026-04-03 13:28:03 -04:00
Aleix Conchillo Flaqué
7c08779a2f Merge pull request #4234 from pipecat-ai/aleix/export-runner-app
Export FastAPI app from runner for custom routes
2026-04-03 09:45:39 -07:00
Mark Backman
988c08a5b6 Merge pull request #4238 from pipecat-ai/mb/fix-daily-utils-docs
Fix Pydantic v2 + Sphinx autodoc incompatibility for Daily utils
2026-04-03 12:39:09 -04:00
Mark Backman
7351298849 Fix Pydantic v2 + Sphinx autodoc incompatibility for Daily utils
Patch Pydantic's DICT_TYPES check in conf.py to accept Union-wrapped
dict types, fixing the autodoc import failure for models using
ConfigDict(extra="allow").
2026-04-03 12:00:11 -04:00
kompfner
392134be46 Merge pull request #4231 from pipecat-ai/pk/llm-messages-transform-frame
Add a `LLMMessagesTransformFrame` to facilitate programmatically edit…
2026-04-03 11:54:34 -04:00
Paul Kompfner
9266e1e7ad Remove comment referencing removed OpenAILLMContext 2026-04-03 11:53:57 -04:00
Mark Backman
e9eff4626f Merge pull request #4237 from pipecat-ai/mb/docstring-fixes-2026-04-03
Docstring fixes for docs auto-generation
2026-04-03 11:50:20 -04:00
Mark Backman
21aa50283e Update docs build script and README for current workflow
Make -W (warnings as errors) opt-in via --strict flag instead of
default, and update README to reflect uv-based workflow and current
directory structure.
2026-04-03 11:43:44 -04:00
Paul Kompfner
70469e3c0c Assert no LLMContextFrame when run_llm is not set in message frame tests 2026-04-03 11:34:58 -04:00
Paul Kompfner
6111df947e Test LLMAssistantAggregator handling of upstream message frames
Add tests for LLMRunFrame, LLMMessagesAppendFrame, LLMMessagesUpdateFrame,
and LLMMessagesTransformFrame sent upstream to LLMAssistantAggregator,
mirroring the existing LLMUserAggregator downstream tests. Add
frames_to_send_direction param to run_test helper to support this.
2026-04-03 11:34:58 -04:00
Paul Kompfner
4eebfd65d9 Add a LLMMessagesTransformFrame to facilitate programmatically editing context in a frame-based way.
The previous approach required the caller to directly grab a reference to the context object, grab a "snapshot" of its messages *at that point in time*, transform the messages, and then push an `LLMMessagesUpdateFrame` with the transformed messages. This approach can lead to problems: what if there had already been a change to the context queued in the pipeline? The transformed messages would simply overwrite it without consideration.
2026-04-03 11:34:50 -04:00
Mark Backman
c2358b273b Use Parameters instead of Attributes in docstrings to fix duplicate object warnings
Napoleon's Attributes section creates class-level attribute docs that
duplicate the __init__ parameter docs when napoleon_include_init_with_doc
is enabled. Using Parameters avoids the duplication.
2026-04-03 10:36:36 -04:00
Mark Backman
3a10a528c0 Remove deprecated fields, shims, and backward-compatibility code
- Remove expect_stripped_words from LLMAssistantAggregatorParams and related warnings
- Remove old multi-parameter on_push_frame observer signature support in TaskObserver
- Remove deprecated context field from UserImageRequestFrame
- Remove deprecated LiveKitTransportMessageFrame and LiveKitTransportMessageUrgentFrame
- Remove deprecated pipecat.turns.mute shim module
2026-04-03 10:10:51 -04:00
Mark Backman
f078b8b867 Fix Sphinx docstring RST formatting warnings
Replace Markdown code blocks with RST syntax in genesys.py, fix
deprecated directive transitions in nvidia and summarization modules,
remove stray bullet prefix in whisper arg docs, restructure code block
in turn completion mixin, and add deepgram mock to Sphinx conf.
2026-04-03 09:57:20 -04:00
Mark Backman
5490820338 Merge pull request #4235 from pipecat-ai/mb/deprecation-docs-cleanup
Clean up docs config after deprecation pass
2026-04-03 09:57:05 -04:00
Mark Backman
10697636c9 Add changelog for #4235 2026-04-03 09:52:31 -04:00
Mark Backman
e1638a9342 Clean up docs config after riva removal and add missing modules
Remove stale riva mock imports from autodoc_mock_imports since the riva
service was removed and nvidia-riva-client is installed during doc builds.
Add pipecat.turns and pipecat.extensions to import_core_modules() and
add Turns to the index.rst toctree. Regenerate uv.lock to reflect the
riva extra removal from pyproject.toml.
2026-04-03 09:52:31 -04:00
Mark Backman
bfffefa95c Remove leftover riva and remote-smart-turn references
Clean up deprecated extras from pyproject.toml and the docs
build script.
2026-04-03 09:29:29 -04:00
Mark Backman
fbb49ffc8d Merge pull request #4233 from pipecat-ai/mb/remove-unused-imports-2026-04-02
Remove unused imports across codebase
2026-04-03 07:26:13 -04:00
filipi87
eace782752 Renaming from async_tool to tool. 2026-04-03 08:20:14 -03:00
Mark Backman
b94071d37f Merge pull request #4232 from pipecat-ai/mb/more-deprecation-removals 2026-04-03 06:52:56 -04:00
Aleix Conchillo Flaqué
796a10fe9c Add changelog for #4234 2026-04-02 21:16:49 -07:00
Aleix Conchillo Flaqué
1ab07d312f Export FastAPI app from runner so custom routes can be added
Move the FastAPI instance to module level so other packages can import
it and register routes before main() is called. main() now configures
the existing app with transport-specific routes instead of creating a
new one.
2026-04-02 21:16:17 -07:00
Mark Backman
8adb38f87c Remove unused imports across codebase 2026-04-02 22:21:16 -04:00
Mark Backman
33f145d70a Add changelog fragments for #4232 2026-04-02 22:10:09 -04:00
Mark Backman
41e46ee69e Remove deprecated vad_events and should_interrupt from DeepgramSTTService
Deepgram's built-in VAD events were deprecated in 0.0.99 in favor of
Silero VAD. This removes vad_events from settings and LiveOptions,
the should_interrupt parameter, the vad_enabled property,
_on_speech_started/_on_utterance_end handlers, and simplifies
_on_message and process_frame accordingly.
2026-04-02 22:05:49 -04:00
Mark Backman
60933b7a56 Remove deprecated send_transcription_frames param and fix broken _warn_deprecated_param calls
Remove the send_transcription_frames parameter from OpenAI Realtime LLM
(deprecated since 0.0.92). Also fix undefined _warn_deprecated_param
calls in both OpenAI and xAI realtime services, replacing them with the
existing _warn_init_param_moved_to_settings method.
2026-04-02 21:58:57 -04:00
Mark Backman
64e09d592e Remove deprecated TranscriptionUserTurnStopStrategy alias
Replaced by SpeechTimeoutUserTurnStopStrategy since 0.0.102.
2026-04-02 21:57:03 -04:00
Mark Backman
883de8ab08 Remove dangling turn_analyzer docstring and unused imports from TransportParams 2026-04-02 21:56:11 -04:00
Mark Backman
793ed8f9e3 Remove deprecated UserBotLatencyLogObserver and UserIdleProcessor
UserBotLatencyLogObserver (deprecated 0.0.102) is replaced by
UserBotLatencyObserver. UserIdleProcessor (deprecated 0.0.100) is
replaced by LLMUserAggregator with user_idle_timeout.
2026-04-02 21:54:36 -04:00
Vanessa Pyne
d8ea33e1a4 Merge pull request #4034 from omChauhanDev/fix/mcp-persistent-session
fixed MCPClient to reuse session across tool calls
2026-04-02 18:51:31 -05:00
vipyne
1d7404ef21 Update MCP examples 2026-04-02 18:15:56 -05:00
Om Chauhan
dc909e2713 add changelog fragments 2026-04-02 18:06:28 -05:00
Om Chauhan
e22f9f84bb fixed MCPClient to reuse session across tool calls 2026-04-02 18:06:28 -05:00
filipi87
7af72eee3e Creating new delayed examples for openai and anthropic. 2026-04-02 18:40:41 -03:00
Aleix Conchillo Flaqué
57068f1b38 Merge pull request #4229 from pipecat-ai/aleix/deprecate-transport-vad-turn-analyzers
Remove deprecated transport VAD/turn analyzers and ExternalUserTurnStrategies
2026-04-02 14:30:12 -07:00
filipi87
bbb605accc Changelog entries for the fixes and improvements. 2026-04-02 16:58:42 -03:00
filipi87
929a0e33f4 Fixing the automated tests. 2026-04-02 16:58:28 -03:00
filipi87
3724ecd378 Supporting async function calls. 2026-04-02 16:58:19 -03:00
filipi87
4c8734c5e1 Fixing an issue where the BotOutputTransport was discarding the UninterruptibleFrames. 2026-04-02 16:57:46 -03:00
filipi87
283f6df205 Creating a FrameQueue so we can properly reset without discarding uninterruptible frames. 2026-04-02 16:57:22 -03:00
Aleix Conchillo Flaqué
a29be38f48 LLMUserAggregator: remove self-queued frame tracking
The _self_queued_frames set and _internal_queue_frame wrapper were used
to prevent re-processing SpeechControlParamsFrame that the aggregator
queued to itself. Now that the frame is no longer special-cased, this
tracking is unnecessary. Also removes unused FrameCallback import.
2026-04-02 12:42:06 -07:00
Aleix Conchillo Flaqué
976c644f90 Fix tests to expect SpeechControlParamsFrame from default turn strategy 2026-04-02 12:42:06 -07:00
Aleix Conchillo Flaqué
34aa37f395 Add changelog for #4229 2026-04-02 11:54:07 -07:00
Aleix Conchillo Flaqué
380867a87a LLMUserAggregator: remove auto ExternalUserTurnStrategies() 2026-04-02 11:52:26 -07:00
Aleix Conchillo Flaqué
cc3af59db4 transports: remove deprecated VAD and turn analyzers 2026-04-02 11:51:08 -07:00
Mark Backman
f93d13efff Merge pull request #4228 from pipecat-ai/mb/remove-turn-deprecations 2026-04-02 14:32:21 -04:00
Mark Backman
c28b7e8f26 Merge pull request #4219 from lukehalley/feat/bedrock-prompt-caching
feat(aws): add prompt caching support for Bedrock ConverseStream
2026-04-02 12:26:28 -04:00
Mark Backman
d1a2dee7a1 fix(aws): initialize enable_prompt_caching in default settings 2026-04-02 12:20:47 -04:00
Luke Halley
da1a1a59a4 feat(aws): handle LLMEnablePromptCachingFrame for runtime toggling
Add LLMEnablePromptCachingFrame handler to process_frame for parity
with AnthropicLLMService, enabling runtime toggling of prompt caching.
2026-04-02 12:13:46 -04:00
Luke Halley
134790b17c chore: add changelog fragment for PR #4219 2026-04-02 12:10:57 -04:00
Luke Halley
e5aa3bbc20 feat(aws): add prompt caching support for Bedrock ConverseStream
Adds `enable_prompt_caching` setting to `AWSBedrockLLMSettings`. When
enabled, appends `cachePoint` markers to system prompts and tool
definitions in ConverseStream requests.

This can reduce TTFT by up to 85% for multi-turn conversations where
the system prompt stays constant (e.g. voice agents, chat assistants).

Follows the same pattern as `AnthropicLLMService.enable_prompt_caching`.

Usage:
```python
llm = AWSBedrockLLMService(
    settings=AWSBedrockLLMSettings(
        model="au.anthropic.claude-haiku-4-5-20251001-v1:0",
        enable_prompt_caching=True,
    ),
)
```

See: https://docs.aws.amazon.com/bedrock/latest/userguide/prompt-caching.html
2026-04-02 12:10:57 -04:00
Mark Backman
3be0ea05ef Add changelog entries for #4228 2026-04-02 11:34:22 -04:00
Mark Backman
0c59819682 Remove allow_interruptions from voice-sarvam example
This was missed from the allow_interruptions removal commit.
2026-04-02 11:32:44 -04:00
Mark Backman
5b67dcd9e7 Remove deprecated EmulateUser{Started,Stopped}SpeakingFrame and emulated field
Remove EmulateUserStartedSpeakingFrame, EmulateUserStoppedSpeakingFrame
(deprecated since v0.0.99), and the emulated field from
UserStartedSpeakingFrame and UserStoppedSpeakingFrame. Clean up the
handling code in base_input.py and a stale comment in nova_sonic/llm.py.
2026-04-02 11:31:29 -04:00
Mark Backman
d503383c23 Remove deprecated interruption_strategies plumbing
The interruption_strategies mechanism was deprecated in v0.0.99 in favor
of LLMUserAggregator's user_turn_strategies. All evaluation logic was
already removed — this removes the remaining field definitions, property,
StartFrame propagation, conditional check in base_input.py, strategy
files, and test.
2026-04-02 11:19:17 -04:00
Mark Backman
fa30268b84 Remove deprecated TranscriptionMessage, ThoughtTranscriptionMessage, and TranscriptionUpdateFrame 2026-04-02 11:03:23 -04:00
Mark Backman
2a118084bd Remove deprecated transcript_processor module 2026-04-02 10:57:05 -04:00
Mark Backman
87e8ed109a Remove deprecated STTMuteFilter, STTMuteConfig, and STTMuteStrategy 2026-04-02 10:52:41 -04:00
Mark Backman
a5e1bbf4a3 Remove deprecated UserResponseAggregator class 2026-04-02 10:50:05 -04:00
Mark Backman
f8267f1ea6 Remove deprecated allow_interruptions parameter
This field was deprecated in v0.0.99 in favor of LLMUserAggregator's
user_turn_strategies / user_mute_strategies parameters. Since the default
was True (interruptions allowed), removing the guards keeps the current
default behavior.
2026-04-02 10:47:44 -04:00
Mark Backman
74acb0b7d0 Remove deprecated class_decorators tracing module 2026-04-02 10:31:15 -04:00
Mark Backman
41e3afbc2f Remove deprecated add_pattern_pair method from PatternPairAggregator 2026-04-02 10:28:01 -04:00
Aleix Conchillo Flaqué
d4824ffe8a Merge pull request #4225 from pipecat-ai/aleix/transport-and-other-deprecations
Remove deprecated transport module aliases and sync package
2026-04-01 19:43:22 -07:00
Mark Backman
2426f80789 Merge pull request #4220 from pipecat-ai/mb/more-service-deprecations
Remove more deprecated service parameters and shims
2026-04-01 22:23:39 -04:00
Mark Backman
5ce46df599 Use self.create_context_id() instead of raw uuid in CartesiaTTSService 2026-04-01 22:18:41 -04:00
Aleix Conchillo Flaqué
a6013ba437 update uv.lock 2026-04-01 19:12:39 -07:00
Aleix Conchillo Flaqué
279ca5a87b Add changelog for #4225 2026-04-01 19:04:11 -07:00
Aleix Conchillo Flaqué
c6f79592d8 remove deprecated sync package 2026-04-01 19:04:11 -07:00
Aleix Conchillo Flaqué
e74e497b8d transports: remove old deprecated modules 2026-04-01 19:04:11 -07:00
Aleix Conchillo Flaqué
d245b79bba Merge pull request #3984 from pipecat-ai/aleix/update-onnxruntime
Update onnxruntime to 1.24.3
2026-04-01 19:03:57 -07:00
Mark Backman
8a794424dd Update uv.lock 2026-04-01 19:05:17 -04:00
Aleix Conchillo Flaqué
f4743a6c91 require python >= 3.11 2026-04-01 19:02:34 -04:00
Aleix Conchillo Flaqué
ba32a48510 github: remove python 3.10 from compatibility chart 2026-04-01 19:02:34 -04:00
Aleix Conchillo Flaqué
a9cafa2a3b Add changelog for #3984 2026-04-01 19:02:34 -04:00
Aleix Conchillo Flaqué
58b1b7249e Update onnxruntime to 1.24.3
This version adds support for Python 3.14.
2026-04-01 19:02:32 -04:00
Aleix Conchillo Flaqué
db8e73e5ca Merge pull request #4224 from pipecat-ai/aleix/optional-function-call-timeout
Make function_call_timeout_secs optional
2026-04-01 14:39:10 -07:00
Mark Backman
170f6dfe8b Add changelog for #4220 2026-04-01 17:03:05 -04:00
Mark Backman
c763abc4ae Add deprecation version to update_options in GoogleSTTService 2026-04-01 17:03:05 -04:00
Mark Backman
197d96fc49 Remove deprecated enable_prompt_caching_beta from Anthropic InputParams 2026-04-01 17:03:05 -04:00
Mark Backman
c8e9bf77fd Remove deprecated simli_config and use_turn_server params from SimliVideoService 2026-04-01 17:03:05 -04:00
Mark Backman
48b25962e2 Remove deprecated english_normalization param from MiniMax TTS InputParams 2026-04-01 17:03:05 -04:00
Mark Backman
5d093c9ad7 Remove deprecated InputParams class from GoogleVertexLLMService
The location and project_id fields were deprecated since 0.0.90 in
favor of direct __init__ parameters. Now that InputParams is removed,
project_id is required and location defaults to "us-east4" directly
in the signature.
2026-04-01 17:03:05 -04:00
Mark Backman
d93f63deb5 Remove deprecated base_url param from GeminiLiveLLMService 2026-04-01 17:03:05 -04:00
Mark Backman
09a57972f5 Remove deprecated api_key param from GeminiTTSService 2026-04-01 17:03:05 -04:00
Mark Backman
f83d062df9 Remove deprecated InputParams alias from GladiaSTTService 2026-04-01 17:03:05 -04:00
Mark Backman
a2a42b8703 Remove deprecated confidence param from GladiaSTTService 2026-04-01 17:03:05 -04:00
Mark Backman
e60a72e2d4 Remove deprecated language param from GladiaInputParams 2026-04-01 17:03:05 -04:00
Mark Backman
83f4989a78 Remove deprecated model param from FishAudioTTSService 2026-04-01 17:03:05 -04:00
Mark Backman
5d2b288274 Remove deprecated url param from DeepgramSTTService 2026-04-01 17:03:05 -04:00
Mark Backman
52ece87ac9 Remove deprecated send_transcription_frames param from AWSNovaSonicLLMService 2026-04-01 17:03:05 -04:00
Mark Backman
bc4bbb1895 Remove deprecated PollyTTSService alias 2026-04-01 17:03:05 -04:00
Mark Backman
eb014fffc4 Flush Cartesia context on voice/model/language changes
Override _update_settings in CartesiaTTSService to flush the current
audio context and assign a new turn context ID when voice, model, or
language settings change. This prevents Context has closed errors
from Cartesia API, which locks these parameters per context.
2026-04-01 17:03:05 -04:00
Mark Backman
e74930b954 Remove deprecated text_aggregator and text_filter params from TTS
Remove the deprecated text_aggregator parameter from TTSService,
CartesiaTTSService, and RimeTTSService, and the deprecated text_filter
parameter from TTSService. Users should use LLMTextProcessor before
the TTS service instead. Update the voice-switching example to use
LLMTextProcessor with PatternPairAggregator.
2026-04-01 17:03:05 -04:00
Aleix Conchillo Flaqué
6ed4109da9 Add changelog for #4224 2026-04-01 13:58:45 -07:00
Aleix Conchillo Flaqué
53f809b7d5 Make function_call_timeout_secs optional and skip timeout task when unset
Change the default from 10s to None so deferred function calls can run
indefinitely when no timeout is configured. Only create the timeout
task when a timeout is actually provided (per-call or service-level).
2026-04-01 13:58:09 -07:00
Mark Backman
f6a3678f93 Improve tests 2026-03-30 12:46:30 -04:00
Mark Backman
3af93ed257 Add changelog for #4201 2026-03-30 12:31:26 -04:00
Mark Backman
f37bf989dd Make reconnection failure error non-fatal to allow service failover
A single service failing to reconnect should not kill the entire
pipeline. Non-fatal errors flow through the pipeline so application
code (e.g. ServiceSwitcher) can handle failover to a backup service.
2026-03-30 12:29:53 -04:00
Mark Backman
86a16d53bc Detect quick connection failures in WebsocketService to prevent infinite reconnection loops
When a WebSocket server accepts the handshake but immediately closes the
connection (e.g. invalid API key returning close code 1008), the existing
exponential backoff does not help because the handshake keeps succeeding.
This tracks how long each connection survives and emits a non-fatal
ErrorFrame after 3 consecutive sub-5s failures, allowing ServiceSwitcher
failover instead of killing the pipeline.

Fixes #3711
2026-03-30 12:23:11 -04:00
Om Chauhan
fa982a05c0 added changelog 2026-03-18 09:46:15 +05:30
Om Chauhan
419c7d4450 fix: default thinking config for Gemini 3+ Flash models 2026-03-18 09:33:54 +05:30
255 changed files with 10241 additions and 8721 deletions

View File

@@ -1,30 +0,0 @@
# flyctl launch added from .gitignore
**/.vscode
**/env
**/__pycache__
**/*~
**/venv
#*#
# Distribution / packaging
**/.Python
**/build
**/develop-eggs
**/dist
**/downloads
**/eggs
**/.eggs
**/lib
**/lib64
**/parts
**/sdist
**/var
**/wheels
**/share/python-wheels
**/*.egg-info
**/.installed.cfg
**/*.egg
**/MANIFEST
**/.DS_Store
**/.env
fly.toml

View File

@@ -14,7 +14,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ['3.10.19', '3.11.14', '3.12.12', '3.13.12']
python-version: ['3.11.15', '3.12.13', '3.13.12', '3.14.3']
name: Python ${{ matrix.python-version }}
steps:

View File

@@ -11,7 +11,7 @@ build:
jobs:
post_install:
- pip install uv
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra mlx-whisper
sphinx:
configuration: docs/api/conf.py

View File

@@ -7,6 +7,684 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- towncrier release notes start -->
## [1.0.0] - 2026-04-14
Migration guide: https://docs.pipecat.ai/pipecat/migration/migration-1.0
### Added
- Updated LemonSlice transport:
- Added `on_avatar_connected` and `on_avatar_disconnected` events triggered
when the avatar joins and leaves the room.
- Added `api_url` parameter to `LemonSliceNewSessionRequest` to allow
overriding the LemonSlice API endpoint.
- Added support for passing arbitrary named parameters to the LemonSlice
API endpoint.
(PR [#3995](https://github.com/pipecat-ai/pipecat/pull/3995))
- Added Inworld Realtime LLM service with WebSocket-based cascade STT/LLM/TTS,
semantic VAD, function calling, and Router support.
(PR [#4140](https://github.com/pipecat-ai/pipecat/pull/4140))
- ⚠️ Added WebSocket-based `OpenAIResponsesLLMService` as the new default for
the OpenAI Responses API. It maintains a persistent connection to
`wss://api.openai.com/v1/responses` and automatically uses
`previous_response_id` to send only incremental context, falling back to full
context on reconnection or cache miss. The previous HTTP-based implementation
is now available as `OpenAIResponsesHttpLLMService`.
(PR [#4141](https://github.com/pipecat-ai/pipecat/pull/4141))
- Added `group_parallel_tools` parameter to `LLMService` (default `True`). When
`True`, all function calls from the same LLM response batch share a group ID
and the LLM is triggered exactly once after the last call completes. Set to
`False` to trigger inference independently for each function call result as
it arrives.
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
- Added async function call support to `register_function()` and
`register_direct_function()` via `cancel_on_interruption=False`. When set to
`False`, the LLM continues the conversation immediately without waiting for
the function result. The result is injected back into the context as a
`developer` message once available, triggering a new LLM inference at that
point.
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
- Added `enable_prompt_caching` setting to `AWSBedrockLLMService` for Bedrock
ConverseStream prompt caching.
(PR [#4219](https://github.com/pipecat-ai/pipecat/pull/4219))
- Added support for streaming intermediate results from async function calls.
Call `result_callback` multiple times with
`properties=FunctionCallResultProperties(is_final=False)` to push incremental
updates, then call it once more (with `is_final=True`, the default) to
deliver the final result. Only valid for functions registered with
`cancel_on_interruption=False`.
(PR [#4230](https://github.com/pipecat-ai/pipecat/pull/4230))
- Added `LLMMessagesTransformFrame` to facilitate programmatically editing
context in a frame-based way.
The previous approach required the caller to directly grab a reference to
the context object, grab a "snapshot" of its messages _at that point in
time_, transform the messages, and then push an `LLMMessagesUpdateFrame` with
the transformed messages. This approach can lead to problems: what if there
had already been a change to the context queued in the pipeline? The
transformed messages would simply overwrite it without consideration.
(PR [#4231](https://github.com/pipecat-ai/pipecat/pull/4231))
- The development runner now exports a module-level `app` FastAPI instance
(`from pipecat.runner.run import app`) so you can register custom routes
before calling `main()`.
(PR [#4234](https://github.com/pipecat-ai/pipecat/pull/4234))
- `ToolsSchema` now accepts `custom_tools` for OpenAI LLM services
(`OpenAILLMService`, `OpenAIResponsesLLMService`,
`OpenAIResponsesHttpLLMService`, and `OpenAIRealtimeLLMService`), letting you
pass provider-specific tools like `tool_search` alongside standard function
tools.
(PR [#4248](https://github.com/pipecat-ai/pipecat/pull/4248))
- Added enhancements to `NvidiaTTSService`:
- Cross-sentence stitching: multiple sentences within an LLM turn are fed
into a single `SynthesizeOnline` gRPC stream for seamless audio across
sentence boundaries (requires Magpie TTS model v1.7.0+).
- `custom_dictionary` and `encoding` parameters for IPA-based custom
pronunciation and output audio encoding.
- Metrics generation (`can_generate_metrics` returns true) and
`stop_all_metrics()` when an audio context is interrupted.
- gRPC error handling around synthesis config retrieval
(`GetRivaSynthesisConfig`).
(PR [#4249](https://github.com/pipecat-ai/pipecat/pull/4249))
- Added `MistralTTSService` for streaming text-to-speech using Mistral's
Voxtral TTS API (`voxtral-mini-tts-2603`). Supports SSE-based audio streaming
with automatic resampling from the API's native 24kHz to any requested sample
rate. Requires the `mistral` optional extra (`pip install
pipecat-ai[mistral]`).
(PR [#4251](https://github.com/pipecat-ai/pipecat/pull/4251))
- Added `truncate_large_values` parameter to `LLMContext.get_messages()`. When
`True`, returns compact deep copies of messages with binary data (base64
images, audio) replaced by short placeholders and long string values in
LLM-specific messages recursively truncated. Useful for serialization,
logging, and debugging tools.
(PR [#4272](https://github.com/pipecat-ai/pipecat/pull/4272))
- `CartesiaSTTService` now supports runtime settings updates (e.g. changing
`language` or `model` via `STTUpdateSettingsFrame`). The service
automatically reconnects with the new parameters. Previously, settings
updates were silently ignored.
(PR [#4282](https://github.com/pipecat-ai/pipecat/pull/4282))
- Added `pcm_32000` and `pcm_48000` sample rate support to ElevenLabs TTS
services.
(PR [#4293](https://github.com/pipecat-ai/pipecat/pull/4293))
- Added `enable_logging` parameter to `ElevenLabsHttpTTSService`. Set to
`False` to enable zero retention mode (enterprise only).
(PR [#4293](https://github.com/pipecat-ai/pipecat/pull/4293))
### Changed
- Updated `onnxruntime` from 1.23.2 to 1.24.3, adding support for Python 3.14.
(PR [#3984](https://github.com/pipecat-ai/pipecat/pull/3984))
- MCPClient now requires async with MCPClient(...) as mcp: or explicit
start()/close() calls to manage the connection lifecycle.
(PR [#4034](https://github.com/pipecat-ai/pipecat/pull/4034))
- ⚠️ Updated `langchain` extra to require langchain 1.x (from 0.3.x),
langchain-community 0.4.x (from 0.3.x), and langchain-openai 1.x (from
0.3.x). If you pin these packages in your project, update your pins
accordingly.
(PR [#4192](https://github.com/pipecat-ai/pipecat/pull/4192))
- `WebsocketService` reconnection errors are now non-fatal. When a websocket
service exhausts its reconnection attempts (either via exponential backoff or
quick failure detection), it emits a non-fatal `ErrorFrame` instead of a
fatal one. This allows application-level failover (e.g. `ServiceSwitcher`) to
handle the failure instead of killing the entire pipeline.
(PR [#4201](https://github.com/pipecat-ai/pipecat/pull/4201))
- Changed `GrokLLMService` default model from `grok-3-beta` to `grok-3`, now
that the model is generally available.
(PR [#4209](https://github.com/pipecat-ai/pipecat/pull/4209))
- `GoogleImageGenService` now defaults to `imagen-4.0-generate-001` (previously
`imagen-3.0-generate-002`).
(PR [#4213](https://github.com/pipecat-ai/pipecat/pull/4213))
- ⚠️ `BaseOpenAILLMService.get_chat_completions()` now accepts an `LLMContext`
instead of `OpenAILLMInvocationParams`. If you override this method, update
your signature accordingly.
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- When multiple function calls are returned in a single LLM response, by
default (when `group_parallel_tools=True`) the LLM is now triggered exactly
once after the last call in the batch completes, rather than waiting for all
function calls.
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
- ⚠️ `LLMService.function_call_timeout_secs` now defaults to `None` instead of
`10.0`. Deferred function calls will run indefinitely unless a timeout is
explicitly set at the service level or per-call. If you relied on the
previous 10-second default, pass `function_call_timeout_secs=10.0`
explicitly.
(PR [#4224](https://github.com/pipecat-ai/pipecat/pull/4224))
- Updated `NvidiaTTSService`:
- Made `api_key` optional for local NIM deployments.
- Voice, language, and quality can be updated without reconnecting the gRPC
client; new values take effect on the next synthesis turn, not for the
current turn's in-flight requests.
- Replaced per-sentence synchronous `synthesize_online` calls with async
queue-backed gRPC streaming.
- Streaming now uses asyncio tasks with explicit gRPC cancellation on
interruption and stale-response filtering when a stream is aborted or
replaced.
- Renamed Riva references to Nemotron Speech in docs and messages.
- Disabled automatic TTS start frames at the service level
(`push_start_frame=False`) and emit `TTSStartedFrame` when a stitched
synthesis stream is started for a context.
(PR [#4249](https://github.com/pipecat-ai/pipecat/pull/4249))
### Removed
- ⚠️ Removed `OpenPipeLLMService` and the `openpipe` extra. OpenPipe was
acquired by CoreWeave and the package is no longer maintained. If you were
using `openpipe` as an LLM provider, switch to the underlying provider
directly (e.g. `openai`). The OpenPipe interface can still be used with
`OpenAILLMService` by specifying a `base_url`.
(PR [#4191](https://github.com/pipecat-ai/pipecat/pull/4191))
- ⚠️ Removed `NoisereduceFilter`. Use system-level noise reduction or a
service-based alternative instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated `vad_enabled` and `vad_audio_passthrough` transport
params.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated `camera_in_enabled`, `camera_in_is_live`,
`camera_in_width`, `camera_in_height`, `camera_out_enabled`,
`camera_out_is_live`, `camera_out_width`, `camera_out_height`, and
`camera_out_color` transport params. Use the `video_in_*` and `video_out_*`
equivalents instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `FrameProcessor.wait_for_task()`. Use `create_task()` and manage
tasks with the built-in `TaskManager` instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated transport frames: `TransportMessageFrame`,
`TransportMessageUrgentFrame`, `InputTransportMessageUrgentFrame`,
`DailyTransportMessageFrame`, and `DailyTransportMessageUrgentFrame`. Use
`OutputTransportMessageFrame`, `OutputTransportMessageUrgentFrame`,
`InputTransportMessageFrame`, `DailyOutputTransportMessageFrame`, and
`DailyOutputTransportMessageUrgentFrame` instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `create_default_resampler()` from `pipecat.audio.utils`.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `DailyRunner.configure_with_args()`. Use `PipelineRunner` with
`RunnerArguments` instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated `on_pipeline_ended`, `on_pipeline_cancelled`, and
`on_pipeline_stopped` events from `PipelineTask`. Use `on_pipeline_finished`
instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed single-argument function call support from `LLMService`. Functions
must use named parameters instead of a single `arguments` parameter.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `FalSmartTurnAnalyzer` and `LocalSmartTurnAnalyzer`.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `RTVIObserver.errors_enabled` parameter.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated RTVI models, frames, and processor methods including
`RTVIConfig`, `RTVIServiceConfig`, `RTVIServiceOptionConfig`, various
`RTVI*Data` models, `RTVIActionFrame`, and
`RTVIProcessor.handle_function_call`/`handle_function_call_start`. Use the
updated RTVI processor API instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated `KeypadEntryFrame` alias.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated interruption frames: `StartInterruptionFrame` and
`BotInterruptionFrame`. Use `InterruptionFrame` and `InterruptionTaskFrame`
instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `LLMService.request_image_frame()`. Push a `UserImageRequestFrame`
instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `TTSService.say()`. Push a `TTSSpeakFrame` into the pipeline
instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `KrispFilter`. The `krisp` extra has been removed from
`pyproject.toml`.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `AudioBufferProcessor.user_continuous_stream` parameter. Use
`user_audio_passthrough` instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed `LLMService.start_callback` parameter. Register an
`on_llm_response_start` event handler instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated `observers` field from `PipelineParams`. Pass observers
directly to `PipelineTask` constructor instead.
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
- ⚠️ Removed deprecated `pipecat.services.openai_realtime` package. Use
`pipecat.services.openai.realtime` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.google.llm_vertex` module. Use
`pipecat.services.google.vertex.llm` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `GoogleLLMOpenAIBetaService` from
`pipecat.services.google.openai`. Use `GoogleLLMService` from
`pipecat.services.google.llm` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `OpenAIRealtimeBetaLLMService` and
`AzureRealtimeBetaLLMService`. Use `OpenAIRealtimeLLMService` and
`AzureRealtimeLLMService` from `pipecat.services.openai.realtime` and
`pipecat.services.azure.realtime` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.ai_services` module. Import from
`pipecat.services.ai_service`, `pipecat.services.llm_service`,
`pipecat.services.stt_service`, `pipecat.services.tts_service`, etc. instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.gemini_multimodal_live` package. Use
`pipecat.services.google.gemini_live` instead. Note that class names no
longer include "Multimodal" (e.g. `GeminiMultimodalLiveLLMService`
`GeminiLiveLLMService`).
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.google.gemini_live.llm_vertex`
module. Use `pipecat.services.google.gemini_live.vertex.llm` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.nim` package. Use
`pipecat.services.nvidia.llm` instead (`NimLLMService``NvidiaLLMService`).
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.deepgram.stt_sagemaker` and
`pipecat.services.deepgram.tts_sagemaker` modules. Use
`pipecat.services.deepgram.sagemaker.stt` and
`pipecat.services.deepgram.sagemaker.tts` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.aws_nova_sonic` package. Use
`pipecat.services.aws.nova_sonic` instead.
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated `pipecat.services.riva` package. Use
`pipecat.services.nvidia.stt` and `pipecat.services.nvidia.tts` instead
(`RivaSTTService``NvidiaSTTService`, `RivaTTSService`
`NvidiaTTSService`).
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
- ⚠️ Removed deprecated compatibility modules:
`pipecat.services.openai_realtime_beta` (use
`pipecat.services.openai.realtime`),
`pipecat.services.openai_realtime.context`,
`pipecat.services.openai_realtime.frames`,
`pipecat.services.openai.realtime.context`,
`pipecat.services.openai.realtime.frames`,
`pipecat.services.gemini_multimodal_live` (use
`pipecat.services.google.gemini_live`),
`pipecat.services.aws_nova_sonic.context` (use
`pipecat.services.aws.nova_sonic`), `pipecat.services.google.openai` and
`pipecat.services.google.llm_openai` (use `pipecat.services.google.llm`).
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- ⚠️ Removed `VisionImageFrameAggregator` (from
`pipecat.processors.aggregators.vision_image_frame`). Vision/image handling
is now built into `LLMContext` (from
`pipecat.processors.aggregators.llm_context`). See the `12*` examples for the
recommended replacement pattern.
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- ⚠️ Removed `OpenAILLMContext`, `OpenAILLMContextFrame`, and
`OpenAILLMContext.from_messages()`. Use `LLMContext` (from
`pipecat.processors.aggregators.llm_context`) and `LLMContextFrame` (from
`pipecat.frames.frames`) instead. All services now exclusively use the
universal `LLMContext`.
From the developer's point of view, migrating will usually be a matter of
going from this:
```python
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
```
To this:
```python
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- ⚠️ Removed deprecated frame types `LLMMessagesFrame` and
`OpenAILLMContextAssistantTimestampFrame` from `pipecat.frames.frames`.
Instead of `LLMMessagesFrame`, use `LLMContextFrame` with the new messages,
or `LLMMessagesUpdateFrame` with `run_llm=True`.
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- ⚠️ Removed `GatedOpenAILLMContextAggregator` (from
`pipecat.processors.aggregators.gated_open_ai_llm_context`). Use
`GatedLLMContextAggregator` (from
`pipecat.processors.aggregators.gated_llm_context`) instead.
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- ⚠️ Removed deprecated service-specific context and aggregator machinery,
which was superseded by the universal `LLMContext` system.
Service-specific classes removed: `AnthropicLLMContext`,
`AnthropicContextAggregatorPair`, `AWSBedrockLLMContext`,
`AWSBedrockContextAggregatorPair`, `OpenAIContextAggregatorPair`, and their
user/assistant aggregators. Also removed `create_context_aggregator()` from
`LLMService`, `OpenAILLMService`, `AnthropicLLMService`, and
`AWSBedrockLLMService`.
Base aggregator classes removed (from
`pipecat.processors.aggregators.llm_response`): `BaseLLMResponseAggregator`,
`LLMContextResponseAggregator`, `LLMUserContextAggregator`,
`LLMAssistantContextAggregator`, `LLMUserResponseAggregator`,
`LLMAssistantResponseAggregator`.
From the developer's point of view, migrating will usually be a matter of
going from this:
```python
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
```
To this:
```python
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
- ⚠️ Removed deprecated service parameters and shims that have been replaced by
the `settings=Service.Settings(...)` pattern or direct `__init__` parameters:
- `PollyTTSService` alias (use `AWSTTSService`)
- `TTSService`: `text_aggregator`, `text_filter` init params
- `AWSNovaSonicLLMService`: `send_transcription_frames` init param
- `DeepgramSTTService`: `url` init param (use `base_url`)
- `FishAudioTTSService`: `model` init param (use `reference_id` or
`settings`)
- `GladiaSTTService`: `language` and `confidence` from `GladiaInputParams`,
`InputParams` class alias
- `GeminiTTSService`: `api_key` init param
- `GeminiLiveLLMService`: `base_url` init param (use `http_options`)
- `GoogleVertexLLMService`: `InputParams` class with
`location`/`project_id` fields (use direct init params); `project_id` is now
required, `location` defaults to `"us-east4"`
- `MiniMaxHttpTTSService`: `english_normalization` from `InputParams` (use
`text_normalization`)
- `SimliVideoService`: `simli_config` init param (use `api_key`/`face_id`),
`use_turn_server` init param; `api_key` and `face_id` are now required
- `AnthropicLLMService`: `enable_prompt_caching_beta` from `InputParams`
(use `enable_prompt_caching`)
(PR [#4220](https://github.com/pipecat-ai/pipecat/pull/4220))
- ⚠️ Removed deprecated `pipecat.transports.services` and
`pipecat.transports.network` module aliases. Update imports to use
`pipecat.transports.daily.transport`, `pipecat.transports.livekit.transport`,
`pipecat.transports.websocket.*`, `pipecat.transports.webrtc.*`, and
`pipecat.transports.daily.utils` respectively.
(PR [#4225](https://github.com/pipecat-ai/pipecat/pull/4225))
- ⚠️ Removed deprecated `pipecat.sync` package. Use `pipecat.utils.sync`
instead.
(PR [#4225](https://github.com/pipecat-ai/pipecat/pull/4225))
- ⚠️ Removed deprecated `TranscriptionMessage`, `ThoughtTranscriptionMessage`,
and `TranscriptionUpdateFrame` from `pipecat.frames.frames`.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `allow_interruptions` parameter from `PipelineParams`,
`StartFrame`, and `FrameProcessor`. Interruptions are now always allowed by
default. Use `LLMUserAggregator`'s `user_turn_strategies` /
`user_mute_strategies` parameters to control interruption behavior.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `STTMuteFilter`, `STTMuteConfig`, and `STTMuteStrategy`
from `pipecat.processors.filters.stt_mute_filter`. Use
`pipecat.turns.user_mute` strategies with `LLMUserAggregator`'s
`user_mute_strategies` parameter instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `pipecat.processors.transcript_processor` module
(`TranscriptProcessor`, `TranscriptProcessorConfig`). Use pipeline observers
instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `EmulateUserStartedSpeakingFrame` and
`EmulateUserStoppedSpeakingFrame` frames, and the `emulated` field from
`UserStartedSpeakingFrame` / `UserStoppedSpeakingFrame`.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `interruption_strategies` parameter from
`PipelineParams`, `StartFrame`, and `FrameProcessor`. Use
`LLMUserAggregator`'s `user_turn_strategies` parameter instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `pipecat.audio.interruptions` module
(`BaseInterruptionStrategy`, `MinWordsInterruptionStrategy`). Use
`pipecat.turns.user_start.MinWordsUserTurnStartStrategy` with
`LLMUserAggregator`'s `user_turn_strategies` parameter instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `pipecat.utils.tracing.class_decorators` module. Use
`pipecat.utils.tracing.service_decorators` instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `add_pattern_pair` method from `PatternPairAggregator`.
Use `add_pattern` instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed deprecated `UserResponseAggregator` class from
`pipecat.processors.aggregators.user_response`. Use `LLMUserAggregator`
instead.
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
- ⚠️ Removed `ExternalUserTurnStrategies` and the automatic fallback to it in
`LLMUserAggregator` when a `SpeechControlParamsFrame` was received from the
transport.
(PR [#4229](https://github.com/pipecat-ai/pipecat/pull/4229))
- ⚠️ Removed `vad_analyzer` and `turn_analyzer` parameters from
`TransportParams` and all transport input classes, along with all deprecated
VAD/turn analysis logic in `BaseInputTransport`. VAD and turn detection are
now handled entirely by `LLMUserAggregator`.
(PR [#4229](https://github.com/pipecat-ai/pipecat/pull/4229))
- ⚠️ Removed deprecated `TranscriptionUserTurnStopStrategy` alias (deprecated
in 0.0.102). Use `SpeechTimeoutUserTurnStopStrategy` instead.
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
- ⚠️ Removed deprecated `vad_events` setting and `should_interrupt` parameter
from `DeepgramSTTService` (deprecated in 0.0.99). Use Silero VAD for voice
activity detection instead.
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
- ⚠️ Removed deprecated `send_transcription_frames` parameter from
`OpenAIRealtimeLLMService` (deprecated in 0.0.92). Transcription frames are
always sent.
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
- ⚠️ Removed deprecated `UserIdleProcessor` (deprecated in 0.0.100). Use
`LLMUserAggregator` with the `user_idle_timeout` parameter instead.
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
- ⚠️ Removed deprecated `UserBotLatencyLogObserver` (deprecated in 0.0.102).
Use `UserBotLatencyObserver` with its `on_latency_measured` event handler
instead.
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
- ⚠️ Removed the `riva` install extra. Use `nvidia` instead (`pip install
"pipecat-ai[nvidia]"`).
(PR [#4235](https://github.com/pipecat-ai/pipecat/pull/4235))
- Removed the empty `remote-smart-turn` install extra (was already a no-op).
(PR [#4235](https://github.com/pipecat-ai/pipecat/pull/4235))
- ⚠️ Removed `DeprecatedModuleProxy` and all service `__init__.py` re-export
shims. Flat imports like `from pipecat.services.openai import
OpenAILLMService` no longer work. Use the full submodule path instead: `from
pipecat.services.openai.llm import OpenAILLMService`. This is already the
established pattern across all examples and internal code.
(PR [#4239](https://github.com/pipecat-ai/pipecat/pull/4239))
- ⚠️ Removed deprecated `PIPECAT_OBSERVER_FILES` environment variable support.
Use `PIPECAT_SETUP_FILES` instead.
(PR [#4267](https://github.com/pipecat-ai/pipecat/pull/4267))
### Fixed
- Fixed `IdleFrameProcessor` where `asyncio.Event` was unconditionally cleared
in a `finally` block instead of only on the success path.
(PR [#3796](https://github.com/pipecat-ai/pipecat/pull/3796))
- Fixed MCPClient opening a new connection for every tool call instead of
reusing the session.
(PR [#4034](https://github.com/pipecat-ai/pipecat/pull/4034))
- GoogleLLMService now applies a low-latency thinking default
(`thinking_level="minimal"`) for Gemini 3+ Flash models.
(PR [#4067](https://github.com/pipecat-ai/pipecat/pull/4067))
- Fixed `WebsocketService` entering an infinite reconnection loop when a server
accepts the WebSocket handshake but immediately closes the connection (e.g.
invalid API key, close code 1008). The service now detects connections that
fail repeatedly within seconds of being established and stops retrying after
3 consecutive quick failures.
(PR [#4201](https://github.com/pipecat-ai/pipecat/pull/4201))
- Fixed `InworldHttpTTSService` streaming responses crashing with
`UnicodeDecodeError` when multi-byte UTF-8 characters were split across chunk
boundaries. This caused TTS audio to cut off mid-sentence intermittently.
(PR [#4202](https://github.com/pipecat-ai/pipecat/pull/4202))
- Fixed a crash (`JSONDecodeError`) when a user interruption occurs while the
LLM is streaming function call arguments. Previously, the incomplete JSON
arguments were passed directly to `json.loads()`, causing an unhandled
exception. Affected services: OpenAI, Google (OpenAI-compatible), and
SambaNova.
(PR [#4203](https://github.com/pipecat-ai/pipecat/pull/4203))
- Fixed `BaseOutputTransport` discarding pending `UninterruptibleFrame` items
(e.g. function-call context updates) when an interruption arrived. The audio
task is now kept alive and only interruptible frames are drained when
uninterruptible frames are present in the queue.
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
- Fixed spurious LLM inference being triggered when a function call result
arrived while the user was actively speaking. The context frame is now
suppressed until the user stops speaking.
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
- Fixed `CartesiaTTSService` failing with "Context has closed" errors when
switching voice, model, or language via `TTSUpdateSettingsFrame`. The service
now automatically flushes the current audio context and opens a fresh one
when these settings change.
(PR [#4220](https://github.com/pipecat-ai/pipecat/pull/4220))
- Fixed duplicate LLM replies that could occur when multiple async function
call results arrived while an LLM request was already queued.
(PR [#4230](https://github.com/pipecat-ai/pipecat/pull/4230))
- Fixed undefined `_warn_deprecated_param` calls in `OpenAIRealtimeLLMService`
and `GrokRealtimeLLMService` for the deprecated `session_properties` init
parameter.
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
- Fixed Gemini Live bot hanging after a session resumption reconnect. Audio,
video, and text input were silently dropped after reconnecting because the
internal `_ready_for_realtime_input` flag was not being reset.
(PR [#4242](https://github.com/pipecat-ai/pipecat/pull/4242))
- Fixed `VADController` getting stuck in the `SPEAKING` state when audio frames
stop arriving mid-speech (e.g. user mutes mic). A new `audio_idle_timeout`
parameter (default 1s, set to 0 to disable) forces a transition back to
`QUIET` and emits `on_speech_stopped` when no audio is received while
speaking.
(PR [#4244](https://github.com/pipecat-ai/pipecat/pull/4244))
- Fixed `PipelineRunner._gc_collect()` blocking the event loop by running
`gc.collect()` synchronously. Now offloaded via `asyncio.to_thread` to avoid
stalling concurrent pipeline tasks.
(PR [#4255](https://github.com/pipecat-ai/pipecat/pull/4255))
- Fixed `ElevenLabsTTSService` incorrectly enabling `auto_mode` when using
`TextAggregationMode.TOKEN`. Auto mode disables server-side buffering and is
designed for complete sentences — enabling it with token streaming degraded
speech quality. The default is now derived automatically from the aggregation
strategy: `auto_mode=True` for `SENTENCE`, `auto_mode=False` for `TOKEN`.
Callers can still override by passing `auto_mode` explicitly.
(PR [#4265](https://github.com/pipecat-ai/pipecat/pull/4265))
- Fixed `ValueError: write to closed file` during pipeline shutdown when
observers were active. Observer proxy tasks are now cancelled before observer
resources are cleaned up.
(PR [#4267](https://github.com/pipecat-ai/pipecat/pull/4267))
- Fixed delayed turn completion when STT transcripts arrive after the p99
timeout. Previously, a late transcript (beyond the p99 window) would fall
through to the 5-second `user_turn_stop_timeout` fallback. Now the turn stop
triggers immediately when the late transcript arrives.
(PR [#4283](https://github.com/pipecat-ai/pipecat/pull/4283))
- Fixed `ElevenLabsTTSService` ignoring `enable_logging=False` and
`enable_ssml_parsing=False`. The truthy check treated `False` the same as
`None` (both skipped), and Python's `str(False)` produced `"False"` instead
of the lowercase `"false"` expected by the API.
(PR [#4293](https://github.com/pipecat-ai/pipecat/pull/4293))
- Fixed `on_assistant_turn_stopped` not resetting internal state when the LLM
returned no text tokens. Added `interrupted` field to
`AssistantTurnStoppedMessage` to indicate whether the assistant turn was
interrupted.
(PR [#4294](https://github.com/pipecat-ai/pipecat/pull/4294))
- Fixed `LLMContextSummarizer` failing with "No messages to summarize" when
using `system_instruction` instead of a system-role message at the start of
the context. The summarizer previously scanned the entire context for the
first system message, which could match a mid-conversation injection (e.g.
idle notifications) instead of the initial prompt, causing the summarization
range to be empty.
(PR [#4295](https://github.com/pipecat-ai/pipecat/pull/4295))
## [0.0.108] - 2026-03-27
### Added

View File

@@ -1,62 +0,0 @@
# Changelog
All notable changes to the **&lt;project name&gt;** SDK will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
Please make sure to add your changes to the appropriate categories:
## [Unreleased]
### Added
<!-- for new functionality -->
- n/a
### Changed
<!-- for changed functionality -->
- n/a
### Deprecated
<!-- for soon-to-be removed functionality -->
- n/a
### Removed
<!-- for removed functionality -->
- n/a
### Fixed
<!-- for fixed bugs -->
- n/a
### Performance
<!-- for performance-relevant changes -->
- n/a
### Security
<!-- for security-relevant changes -->
- n/a
### Other
<!-- for everything else -->
- n/a
## [0.1.0] - YYYY-MM-DD
Initial release.

View File

@@ -79,7 +79,7 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/simple-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
<br/>
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/daily-multi-translation"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/daily-multi-translation/image.png" width="400" /></a>&nbsp;
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/vision/vision-moondream.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/assets/moondream.png" width="400" /></a>
</p>
@@ -149,8 +149,8 @@ You can get started with Pipecat running on your local machine, then move your a
### Prerequisites
**Minimum Python Version:** 3.10
**Recommended Python Version:** 3.12
**Minimum Python Version:** 3.11
**Recommended Python Version:** >= 3.12
### Setup Steps

View File

@@ -1 +0,0 @@
- ⚠️ Added WebSocket-based `OpenAIResponsesLLMService` as the new default for the OpenAI Responses API. It maintains a persistent connection to `wss://api.openai.com/v1/responses` and automatically uses `previous_response_id` to send only incremental context, falling back to full context on reconnection or cache miss. The previous HTTP-based implementation is now available as `OpenAIResponsesHttpLLMService`.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `OpenPipeLLMService` and the `openpipe` extra. OpenPipe was acquired by CoreWeave and the package is no longer maintained. If you were using `openpipe` as an LLM provider, switch to the underlying provider directly (e.g. `openai`). The OpenPipe interface can still be used with `OpenAILLMService` by specifying a `base_url`.

View File

@@ -1 +0,0 @@
- ⚠️ Updated `langchain` extra to require langchain 1.x (from 0.3.x), langchain-community 0.4.x (from 0.3.x), and langchain-openai 1.x (from 0.3.x). If you pin these packages in your project, update your pins accordingly.

View File

@@ -1 +0,0 @@
- Fixed `InworldHttpTTSService` streaming responses crashing with `UnicodeDecodeError` when multi-byte UTF-8 characters were split across chunk boundaries. This caused TTS audio to cut off mid-sentence intermittently.

View File

@@ -1 +0,0 @@
- Fixed a crash (`JSONDecodeError`) when a user interruption occurs while the LLM is streaming function call arguments. Previously, the incomplete JSON arguments were passed directly to `json.loads()`, causing an unhandled exception. Affected services: OpenAI, Google (OpenAI-compatible), and SambaNova.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `observers` field from `PipelineParams`. Pass observers directly to `PipelineTask` constructor instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `on_pipeline_ended`, `on_pipeline_cancelled`, and `on_pipeline_stopped` events from `PipelineTask`. Use `on_pipeline_finished` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `AudioBufferProcessor.user_continuous_stream` parameter. Use `user_audio_passthrough` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `camera_in_enabled`, `camera_in_is_live`, `camera_in_width`, `camera_in_height`, `camera_out_enabled`, `camera_out_is_live`, `camera_out_width`, `camera_out_height`, and `camera_out_color` transport params. Use the `video_in_*` and `video_out_*` equivalents instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `RTVIObserver.errors_enabled` parameter.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `vad_enabled` and `vad_audio_passthrough` transport params.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `TTSService.say()`. Push a `TTSSpeakFrame` into the pipeline instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `DailyRunner.configure_with_args()`. Use `PipelineRunner` with `RunnerArguments` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated RTVI models, frames, and processor methods including `RTVIConfig`, `RTVIServiceConfig`, `RTVIServiceOptionConfig`, various `RTVI*Data` models, `RTVIActionFrame`, and `RTVIProcessor.handle_function_call`/`handle_function_call_start`. Use the updated RTVI processor API instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `FrameProcessor.wait_for_task()`. Use `create_task()` and manage tasks with the built-in `TaskManager` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `KrispFilter`. The `krisp` extra has been removed from `pyproject.toml`.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `LLMService.request_image_frame()`. Push a `UserImageRequestFrame` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `create_default_resampler()` from `pipecat.audio.utils`.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `FalSmartTurnAnalyzer` and `LocalSmartTurnAnalyzer`.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated transport frames: `TransportMessageFrame`, `TransportMessageUrgentFrame`, `InputTransportMessageUrgentFrame`, `DailyTransportMessageFrame`, and `DailyTransportMessageUrgentFrame`. Use `OutputTransportMessageFrame`, `OutputTransportMessageUrgentFrame`, `InputTransportMessageFrame`, `DailyOutputTransportMessageFrame`, and `DailyOutputTransportMessageUrgentFrame` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `KeypadEntryFrame` alias.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated interruption frames: `StartInterruptionFrame` and `BotInterruptionFrame`. Use `InterruptionFrame` and `InterruptionTaskFrame` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `LLMService.start_callback` parameter. Register an `on_llm_response_start` event handler instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed single-argument function call support from `LLMService`. Functions must use named parameters instead of a single `arguments` parameter.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `NoisereduceFilter`. Use system-level noise reduction or a service-based alternative instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.riva` package. Use `pipecat.services.nvidia.stt` and `pipecat.services.nvidia.tts` instead (`RivaSTTService``NvidiaSTTService`, `RivaTTSService``NvidiaTTSService`).

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.nim` package. Use `pipecat.services.nvidia.llm` instead (`NimLLMService``NvidiaLLMService`).

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.gemini_multimodal_live` package. Use `pipecat.services.google.gemini_live` instead. Note that class names no longer include "Multimodal" (e.g. `GeminiMultimodalLiveLLMService``GeminiLiveLLMService`).

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.aws_nova_sonic` package. Use `pipecat.services.aws.nova_sonic` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.openai_realtime` package. Use `pipecat.services.openai.realtime` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `OpenAIRealtimeBetaLLMService` and `AzureRealtimeBetaLLMService`. Use `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` from `pipecat.services.openai.realtime` and `pipecat.services.azure.realtime` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.deepgram.stt_sagemaker` and `pipecat.services.deepgram.tts_sagemaker` modules. Use `pipecat.services.deepgram.sagemaker.stt` and `pipecat.services.deepgram.sagemaker.tts` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `GoogleLLMOpenAIBetaService` from `pipecat.services.google.openai`. Use `GoogleLLMService` from `pipecat.services.google.llm` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.google.llm_vertex` module. Use `pipecat.services.google.vertex.llm` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.google.gemini_live.llm_vertex` module. Use `pipecat.services.google.gemini_live.vertex.llm` instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated `pipecat.services.ai_services` module. Import from `pipecat.services.ai_service`, `pipecat.services.llm_service`, `pipecat.services.stt_service`, `pipecat.services.tts_service`, etc. instead.

View File

@@ -1 +0,0 @@
- Changed `GrokLLMService` default model from `grok-3-beta` to `grok-3`, now that the model is generally available.

View File

@@ -1 +0,0 @@
- `GoogleImageGenService` now defaults to `imagen-4.0-generate-001` (previously `imagen-3.0-generate-002`).

View File

@@ -1 +0,0 @@
- ⚠️ `BaseOpenAILLMService.get_chat_completions()` now accepts an `LLMContext` instead of `OpenAILLMInvocationParams`. If you override this method, update your signature accordingly.

View File

@@ -1,22 +0,0 @@
- ⚠️ Removed deprecated service-specific context and aggregator machinery, which was superseded by the universal `LLMContext` system.
Service-specific classes removed: `AnthropicLLMContext`, `AnthropicContextAggregatorPair`, `AWSBedrockLLMContext`, `AWSBedrockContextAggregatorPair`, `OpenAIContextAggregatorPair`, and their user/assistant aggregators. Also removed `create_context_aggregator()` from `LLMService`, `OpenAILLMService`, `AnthropicLLMService`, and `AWSBedrockLLMService`.
Base aggregator classes removed (from `pipecat.processors.aggregators.llm_response`): `BaseLLMResponseAggregator`, `LLMContextResponseAggregator`, `LLMUserContextAggregator`, `LLMAssistantContextAggregator`, `LLMUserResponseAggregator`, `LLMAssistantResponseAggregator`.
From the developer's point of view, migrating will usually be a matter of going from this:
```python
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
```
To this:
```python
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated frame types `LLMMessagesFrame` and `OpenAILLMContextAssistantTimestampFrame` from `pipecat.frames.frames`. Instead of `LLMMessagesFrame`, use `LLMContextFrame` with the new messages, or `LLMMessagesUpdateFrame` with `run_llm=True`.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `GatedOpenAILLMContextAggregator` (from `pipecat.processors.aggregators.gated_open_ai_llm_context`). Use `GatedLLMContextAggregator` (from `pipecat.processors.aggregators.gated_llm_context`) instead.

View File

@@ -1 +0,0 @@
- ⚠️ Removed `VisionImageFrameAggregator` (from `pipecat.processors.aggregators.vision_image_frame`). Vision/image handling is now built into `LLMContext` (from `pipecat.processors.aggregators.llm_context`). See the `12*` examples for the recommended replacement pattern.

View File

@@ -1 +0,0 @@
- ⚠️ Removed deprecated compatibility modules: `pipecat.services.openai_realtime_beta` (use `pipecat.services.openai.realtime`), `pipecat.services.openai_realtime.context`, `pipecat.services.openai_realtime.frames`, `pipecat.services.openai.realtime.context`, `pipecat.services.openai.realtime.frames`, `pipecat.services.gemini_multimodal_live` (use `pipecat.services.google.gemini_live`), `pipecat.services.aws_nova_sonic.context` (use `pipecat.services.aws.nova_sonic`), `pipecat.services.google.openai` and `pipecat.services.google.llm_openai` (use `pipecat.services.google.llm`).

View File

@@ -1,18 +0,0 @@
- ⚠️ Removed `OpenAILLMContext`, `OpenAILLMContextFrame`, and `OpenAILLMContext.from_messages()`. Use `LLMContext` (from `pipecat.processors.aggregators.llm_context`) and `LLMContextFrame` (from `pipecat.frames.frames`) instead. All services now exclusively use the universal `LLMContext`.
From the developer's point of view, migrating will usually be a matter of going from this:
```python
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
```
To this:
```python
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
context = LLMContext(messages, tools)
context_aggregator = LLMContextAggregatorPair(context)
```

View File

@@ -0,0 +1 @@
- Added `buttons` field to `OutputDTMFFrame` and `OutputDTMFUrgentFrame` for sending multi-key DTMF sequences as a `list[KeypadEntry]`. Use `OutputDTMFFrame.from_string("123#")` (or the equivalent on `OutputDTMFUrgentFrame`) to build one from a dial string, and `to_string()` to convert back.

View File

@@ -0,0 +1 @@
- Added `DailyOutputDTMFFrame` and `DailyOutputDTMFUrgentFrame` frames. In addition to the inherited `buttons`, they accept `session_id`, `digit_duration_ms` and `method`, which are forwarded to Daily's `send_dtmf` as `sessionId`, `digitDurationMs` and `method`.

1
changelog/4313.added.md Normal file
View File

@@ -0,0 +1 @@
- Added `DailyTransport.send_dtmf()` to expose the Daily call client's DTMF sending capability, enabling applications to send tones during a call (e.g. IVR navigation).

View File

@@ -1,108 +1,60 @@
# Pipecat Documentation
# Pipecat API Documentation
This directory contains the source files for auto-generating Pipecat's server API reference documentation.
## Setup
1. Install documentation dependencies:
```bash
pip install -r requirements.txt
```
2. Make the build scripts executable:
```bash
chmod +x build-docs.sh rtd-test.py
```
This directory contains the source files for auto-generating Pipecat's API reference documentation.
## Building Documentation
From this directory, you can build the documentation in several ways:
### Local Build
From this directory:
```bash
# Using the build script (automatically opens docs when done)
./build-docs.sh
# Build docs (warnings shown but don't fail the build)
cd docs/api && uv run ./build-docs.sh
# Or directly with sphinx-build
sphinx-build -b html . _build/html -W --keep-going
# Build with strict mode (warnings treated as errors)
cd docs/api && uv run ./build-docs.sh --strict
```
### ReadTheDocs Test Build
The build script will:
To test the documentation build process exactly as it would run on ReadTheDocs:
```bash
./rtd-test.py
```
This script:
- Creates a fresh virtual environment
- Installs all dependencies as specified in requirements files
- Handles conflicting dependencies (like grpcio versions for Riva)
- Builds the documentation in an isolated environment
- Provides detailed logging of the build process
Use this script to verify your documentation will build correctly on ReadTheDocs before pushing changes.
## Viewing Documentation
The built documentation will be available at `_build/html/index.html`. To open:
```bash
# On MacOS
open _build/html/index.html
# On Linux
xdg-open _build/html/index.html
# On Windows
start _build/html/index.html
```
1. Install documentation dependencies via `uv sync --group docs`
2. Clean previous build output
3. Run `sphinx-build` to generate HTML documentation
4. Open the result in your browser (macOS)
## Directory Structure
```
.
├── api/ # Auto-generated API documentation
├── _build/ # Built documentation
├── _static/ # Static files (images, css, etc.)
├── conf.py # Sphinx configuration
├── api/ # Auto-generated API documentation (created during build)
├── _build/ # Built documentation output
├── conf.py # Sphinx configuration (mock imports, extensions, etc.)
├── index.rst # Main documentation entry point
├── requirements-base.txt # Base documentation dependencies
├── requirements-riva.txt # Riva-specific dependencies
├── build-docs.sh # Local build script
└── rtd-test.py # ReadTheDocs test build script
└── rtd-test.sh # ReadTheDocs test build script (uses pip, not uv)
```
## Notes
## How It Works
- Documentation is auto-generated from Python docstrings
- Service modules are automatically detected and included
- The build process matches our ReadTheDocs configuration
- Warnings are treated as errors (-W flag) to maintain consistency
- The --keep-going flag ensures all errors are reported
- Dependencies are split into multiple requirements files to handle version conflicts
- `conf.py` runs `sphinx-apidoc` during Sphinx's `setup()` phase to generate `.rst` files from Python source
- Sphinx autodoc imports each module to extract docstrings
- Modules with unavailable dependencies are listed in `autodoc_mock_imports` in `conf.py`
- Napoleon extension converts Google-style docstrings to reStructuredText
## Troubleshooting
If you encounter missing service modules:
**Module not appearing in docs:**
1. Verify the service is installed with its extras: `pip install pipecat-ai[service-name]`
2. Check the build logs for import errors
3. Ensure the service module is properly initialized in the package
4. Run `./rtd-test.py` to test in an isolated environment matching ReadTheDocs
1. Check the build output for `autodoc: failed to import` warnings
2. If the module has an unresolvable import dependency, add it to `autodoc_mock_imports` in `conf.py`
3. Verify the module is importable: `uv run python -c "import pipecat.module.name"`
For dependency conflicts:
**Duplicate object warnings:**
1. Check the requirements files for version specifications
2. Use `rtd-test.py` to verify dependency resolution
3. Consider adding service-specific requirements files if needed
These come from re-export modules or Sphinx discovering the same class through multiple import paths. Usually cosmetic.
For more information:
**Docstring formatting warnings:**
- [ReadTheDocs Configuration](.readthedocs.yaml)
- [Sphinx Documentation](https://www.sphinx-doc.org/)
Docstrings use reStructuredText, not Markdown. Common issues:
- Use `Example::` with indented code blocks, not `` ```python ``
- Ensure blank lines between directive content and subsequent sections
- Use `Parameters:` (not `Attributes:`) for dataclass field documentation to avoid duplicate entries

View File

@@ -1,8 +1,16 @@
#!/bin/bash
# Usage: ./build-docs.sh [--strict]
# --strict: Treat warnings as errors (default: warnings only)
SPHINX_OPTS=""
if [ "$1" = "--strict" ]; then
SPHINX_OPTS="-W --keep-going"
fi
# Build docs using uv
echo "Installing dependencies with uv..."
uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra mlx-whisper
# Check if sphinx-build is available
if ! uv run sphinx-build --version &> /dev/null; then
@@ -14,8 +22,7 @@ fi
rm -rf _build
echo "Building documentation..."
# Build docs matching ReadTheDocs configuration
uv run sphinx-build -b html -d _build/doctrees . _build/html -W --keep-going
uv run sphinx-build -b html -d _build/doctrees . _build/html $SPHINX_OPTS
if [ $? -eq 0 ]; then
echo "Documentation built successfully!"

View File

@@ -4,6 +4,19 @@ import sys
from datetime import datetime
from pathlib import Path
# Fix Pydantic v2 + Sphinx autodoc incompatibility: ConfigDict(extra="allow") fails
# during Sphinx's import because __pydantic_extra__ annotation on BaseModel resolves to
# `Dict[str, Any] | None` whose get_origin() is Union, not dict. Patch the check to
# accept Union-wrapped dict types (i.e., Optional[Dict[str, Any]]).
import pydantic._internal._generate_schema as _pydantic_gs
_ORIG_DICT_TYPES = _pydantic_gs.DICT_TYPES
# Expand the accepted types to include Union (Optional[Dict[str, Any]])
import types
import typing
_pydantic_gs.DICT_TYPES = [*_ORIG_DICT_TYPES, typing.Union, types.UnionType]
# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("sphinx-build")
@@ -76,16 +89,6 @@ autodoc_mock_imports = [
"einops",
"intel_extension_for_pytorch",
"huggingface_hub",
# riva dependencies
"riva",
"riva.client",
"riva.client.Auth",
"riva.client.ASRService",
"riva.client.StreamingRecognitionConfig",
"riva.client.RecognitionConfig",
"riva.client.AudioEncoding",
"riva.client.proto.riva_tts_pb2",
"riva.client.SpeechSynthesisService",
# MLX dependencies (Apple Silicon specific)
"mlx",
"mlx_whisper", # Note: might need underscore format too
@@ -107,6 +110,8 @@ autodoc_mock_imports = [
"fastapi.middleware",
"fastapi.responses",
"uvicorn",
# Deepgram dependencies
"deepgram",
]
# HTML output settings
@@ -133,6 +138,8 @@ def import_core_modules():
"pipecat.runner",
"pipecat.serializers",
"pipecat.transcriptions",
"pipecat.turns",
"pipecat.extensions",
"pipecat.utils",
]
@@ -177,7 +184,6 @@ def setup(app):
logger.info(f"Source directory: {source_dir}")
excludes = [
str(project_root / "src/pipecat/pipeline/to_be_updated"),
str(project_root / "src/pipecat/examples"),
str(project_root / "src/pipecat/tests"),
"**/test_*.py",

View File

@@ -32,4 +32,5 @@ Quick Links
Services <api/pipecat.services>
Transcriptions <api/pipecat.transcriptions>
Transports <api/pipecat.transports>
Turns <api/pipecat.turns>
Utils <api/pipecat.utils>

View File

@@ -34,7 +34,7 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
OFFICE_SOUND_FILE = os.path.join(
os.path.dirname(__file__), "assets", "office-ambience-24000-mono.mp3"
os.path.dirname(__file__), "../assets", "office-ambience-24000-mono.mp3"
)
# We use lambdas to defer transport parameter creation until the transport

View File

@@ -36,7 +36,7 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google import GoogleLLMService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

View File

@@ -45,7 +45,7 @@ from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.frames.frames import LLMRunFrame, TTSUpdateSettingsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -54,6 +54,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
@@ -100,39 +101,43 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create pattern pair aggregator for voice switching
pattern_aggregator = PatternPairAggregator()
llm_text_aggregator = PatternPairAggregator()
# Add pattern for voice switching
pattern_aggregator.add_pattern(
llm_text_aggregator.add_pattern(
type="voice",
start_pattern="<voice>",
end_pattern="</voice>",
action=MatchAction.REMOVE, # Remove tags from final text
action=MatchAction.AGGREGATE,
)
# Register handler for voice switching
async def on_voice_tag(match: PatternMatch):
voice_name = match.text.strip().lower()
if voice_name in VOICE_IDS:
# First flush any existing audio to finish the current context
await tts.flush_audio()
# Then set the new voice
await tts.set_voice(VOICE_IDS[voice_name])
await llm_text_processor.push_frame(
TTSUpdateSettingsFrame(
delta=CartesiaTTSService.Settings(voice=VOICE_IDS[voice_name])
)
)
logger.info(f"Switched to {voice_name} voice")
else:
logger.warning(f"Unknown voice: {voice_name}")
pattern_aggregator.on_pattern_match("voice", on_voice_tag)
llm_text_aggregator.on_pattern_match("voice", on_voice_tag)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
# Process LLM text through the pattern aggregator before TTS
llm_text_processor = LLMTextProcessor(text_aggregator=llm_text_aggregator)
# Initialize TTS with narrator voice as default
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice=VOICE_IDS["narrator"],
),
text_aggregator=pattern_aggregator,
skip_aggregator_types=["voice"], # Skip voice tags in TTS speech
)
# System prompt for storytelling with voice switching
@@ -204,7 +209,8 @@ Remember: Use narrator voice for EVERYTHING except the actual quoted dialogue.""
stt,
user_aggregator,
llm,
tts, # TTS with pattern aggregator
llm_text_processor,
tts,
transport.output(),
assistant_aggregator,
]

View File

@@ -0,0 +1,210 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with intermediate updates.
The ``track_current_location`` tool simulates a GPS tracker reporting the
device's position during a road trip from San Francisco to San Diego. It
sends two intermediate updates (via ``params.result_callback`` with
``is_final=False``) as the vehicle passes through cities along the way, then
delivers the final destination (via ``params.result_callback``). Each update
returns the same structure with a different city:
Update 1 {gps, city: "San Francisco"} ← trip start
Update 2 {gps, city: "Los Angeles"} ← passing through
Final {gps, city: "San Diego"} ← destination reached
Because the function is registered with ``cancel_on_interruption=False``, the
LLM can keep talking while the trip is in progress; each position update
arrives as a developer message so the LLM can narrate the journey to the user.
"""
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
FunctionCallResultProperties,
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def track_current_location(params: FunctionCallParams):
"""Simulate a GPS tracker reporting position during a road trip.
Step 1 San Francisco (trip start) (update)
Step 2 Los Angeles (passing through) (update)
Step 3 San Diego (destination) (final result)
"""
# First update: initial city estimate.
gps = {"lat": 37.7310, "lng": -122.4527}
await params.result_callback(
{"gps": gps, "city": "San Francisco"},
properties=FunctionCallResultProperties(is_final=False),
)
# Second update: revised city estimate.
await asyncio.sleep(10)
gps = {"lat": 33.96003, "lng": -118.40639}
await params.result_callback(
{"gps": gps, "city": "Los Angeles"},
properties=FunctionCallResultProperties(is_final=False),
)
# Final result: confirmed city.
await asyncio.sleep(10)
gps = {"lat": 32.743569, "lng": -117.20466}
await params.result_callback({"gps": gps, "city": "San Diego"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
enable_async_tool_cancellation=True,
settings=AnthropicLLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. "
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. "
"You have access to a function that starts tracking the user's location and "
"provides regular updates on it. When you receive the final location, tell the user "
"the destination has been reached."
),
),
)
# cancel_on_interruption=False makes this an async function call: the LLM
# continues the conversation immediately and receives updates/result later.
llm.register_function(
"track_current_location",
track_current_location,
cancel_on_interruption=False,
timeout_secs=30,
)
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
location_function = FunctionSchema(
name="track_current_location",
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
properties={},
required=[],
)
tools = ToolsSchema(standard_tools=[location_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,180 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call, so we can test async function calls (cancel_on_interruption=False).
await asyncio.sleep(20)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
enable_async_tool_cancellation=True,
settings=AnthropicLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
timeout_secs=30,
)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,214 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with intermediate updates.
The ``track_current_location`` tool simulates a GPS tracker reporting the
device's position during a road trip from San Francisco to San Diego. It
sends two intermediate updates (via ``params.result_callback`` with
``is_final=False``) as the vehicle passes through cities along the way, then
delivers the final destination (via ``params.result_callback``). Each update
returns the same structure with a different city:
Update 1 {gps, city: "San Francisco"} ← trip start
Update 2 {gps, city: "Los Angeles"} ← passing through
Final {gps, city: "San Diego"} ← destination reached
Because the function is registered with ``cancel_on_interruption=False``, the
LLM can keep talking while the trip is in progress; each position update
arrives as a developer message so the LLM can narrate the journey to the user.
"""
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
FunctionCallResultProperties,
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def track_current_location(params: FunctionCallParams):
"""Simulate a GPS tracker reporting position during a road trip.
Step 1 San Francisco (trip start) (update)
Step 2 Los Angeles (passing through) (update)
Step 3 San Diego (destination) (final result)
"""
# First update: initial city estimate.
gps = {"lat": 37.7310, "lng": -122.4527}
await params.result_callback(
{"gps": gps, "city": "San Francisco"},
properties=FunctionCallResultProperties(is_final=False),
)
# Second update: revised city estimate.
await asyncio.sleep(10)
gps = {"lat": 33.96003, "lng": -118.40639}
await params.result_callback(
{"gps": gps, "city": "Los Angeles"},
properties=FunctionCallResultProperties(is_final=False),
)
# Final result: confirmed city.
await asyncio.sleep(10)
gps = {"lat": 32.743569, "lng": -117.20466}
await params.result_callback({"gps": gps, "city": "San Diego"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
enable_async_tool_cancellation=True,
settings=GoogleLLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. "
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. "
"You have access to a function that starts tracking the user's location and "
"provides regular updates on it. When you receive the final location, tell the user "
"the destination has been reached."
),
),
)
# cancel_on_interruption=False makes this an async function call: the LLM
# continues the conversation immediately and receives updates/result later.
llm.register_function(
"track_current_location",
track_current_location,
cancel_on_interruption=False,
timeout_secs=30,
)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
location_function = FunctionSchema(
name="track_current_location",
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
properties={},
required=[],
)
tools = ToolsSchema(standard_tools=[location_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,256 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame, UserImageRequestFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import (
create_transport,
get_transport_client_id,
maybe_capture_participant_camera,
)
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
async def get_weather(params: FunctionCallParams):
# Simulate a long-running API call, so we can test async function calls (cancel_on_interruption=False).
await asyncio.sleep(20)
location = params.arguments["location"]
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
async def get_image(params: FunctionCallParams):
"""Fetch the user image and push it to the LLM.
When called, this function pushes a UserImageRequestFrame upstream to the
transport. As a result, the transport will request the user image and push a
UserImageRawFrame downstream which will be added to the context by the LLM
assistant aggregator. The result_callback will be invoked once the image is
retrieved and processed.
"""
user_id = params.arguments["user_id"]
question = params.arguments["question"]
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
# Request a user image frame and indicate that it should be added to the
# context. Also associate it to the function call. Pass the result_callback
# so it can be invoked when the image is actually retrieved.
await params.llm.push_frame(
UserImageRequestFrame(
user_id=user_id,
text=question,
append_to_context=True,
function_name=params.function_name,
tool_call_id=params.tool_call_id,
result_callback=params.result_callback,
),
FrameDirection.UPSTREAM,
)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_in_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to three tools: get_weather, get_restaurant_recommendation, and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
"""
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
enable_async_tool_cancellation=True,
settings=GoogleLLMService.Settings(
system_instruction=system_prompt,
),
)
llm.register_function("get_weather", get_weather, cancel_on_interruption=False, timeout_secs=30)
llm.register_function("get_image", get_image)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
weather_function = FunctionSchema(
name="get_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
get_image_function = FunctionSchema(
name="get_image",
description="Called when the user requests a description of their camera feed",
properties={
"user_id": {
"type": "string",
"description": "The ID of the user to grab the image from",
},
"question": {
"type": "string",
"description": "The question that the user is asking about the image",
},
},
required=["user_id", "question"],
)
tools = ToolsSchema(standard_tools=[weather_function, get_image_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
await maybe_capture_participant_camera(transport, client)
client_id = get_transport_client_id(transport, client)
# Kick off the conversation.
context.add_message(
{
"role": "developer",
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,214 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with intermediate updates.
The ``track_current_location`` tool simulates a GPS tracker reporting the
device's position during a road trip from San Francisco to San Diego. It
sends two intermediate updates (via ``params.result_callback`` with
``is_final=False``) as the vehicle passes through cities along the way, then
delivers the final destination (via ``params.result_callback``). Each update
returns the same structure with a different city:
Update 1 {gps, city: "San Francisco"} ← trip start
Update 2 {gps, city: "Los Angeles"} ← passing through
Final {gps, city: "San Diego"} ← destination reached
Because the function is registered with ``cancel_on_interruption=False``, the
LLM can keep talking while the trip is in progress; each position update
arrives as a developer message so the LLM can narrate the journey to the user.
"""
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
FunctionCallResultProperties,
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def track_current_location(params: FunctionCallParams):
"""Simulate a GPS tracker reporting position during a road trip.
Step 1 San Francisco (trip start) (update)
Step 2 Los Angeles (passing through) (update)
Step 3 San Diego (destination) (final result)
"""
# First update: initial city estimate.
gps = {"lat": 37.7310, "lng": -122.4527}
await params.result_callback(
{"gps": gps, "city": "San Francisco"},
properties=FunctionCallResultProperties(is_final=False),
)
# Second update: revised city estimate.
await asyncio.sleep(10)
gps = {"lat": 33.96003, "lng": -118.40639}
await params.result_callback(
{"gps": gps, "city": "Los Angeles"},
properties=FunctionCallResultProperties(is_final=False),
)
# Final result: confirmed city.
await asyncio.sleep(10)
gps = {"lat": 32.743569, "lng": -117.20466}
await params.result_callback({"gps": gps, "city": "San Diego"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
enable_async_tool_cancellation=True,
settings=OpenAILLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. "
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. "
"You have access to a function that starts tracking the user's location and "
"provides regular updates on it. When you receive the final location, tell the user "
"the destination has been reached."
),
),
)
# cancel_on_interruption=False makes this an async function call: the LLM
# continues the conversation immediately and receives updates/result later.
llm.register_function(
"track_current_location",
track_current_location,
cancel_on_interruption=False,
timeout_secs=30,
)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
location_function = FunctionSchema(
name="track_current_location",
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
properties={},
required=[],
)
tools = ToolsSchema(standard_tools=[location_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,198 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.openai.stt import OpenAISTTService
from pipecat.services.openai.tts import OpenAITTSService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call, so we can test async function calls.
await asyncio.sleep(20)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = OpenAISTTService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAISTTService.Settings(
model="gpt-4o-transcribe",
prompt="Expect words related weather, such as temperature and conditions. And restaurant names.",
),
)
tts = OpenAITTSService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAITTSService.Settings(
voice="ballad",
),
instructions="Please speak clearly and at a moderate pace.",
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
enable_async_tool_cancellation=True,
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
timeout_secs=30,
)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,211 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Example: async function call with intermediate updates.
The ``track_current_location`` tool simulates a GPS tracker reporting the
device's position during a road trip from San Francisco to San Diego. It
sends two intermediate updates (via ``params.result_callback`` with
``is_final=False``) as the vehicle passes through cities along the way, then
delivers the final destination (via ``params.result_callback``). Each update returns the same structure with a
different city:
Update 1 {gps, city: "San Francisco"} ← trip start
Update 2 {gps, city: "Los Angeles"} ← passing through
Final {gps, city: "San Diego"} ← destination reached
Because the function is registered with ``cancel_on_interruption=False``, the
LLM can keep talking while the trip is in progress; each position update
arrives as a developer message so the LLM can narrate the journey to the user.
"""
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
FunctionCallResultProperties,
LLMRunFrame,
TTSSpeakFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def track_current_location(params: FunctionCallParams):
"""Simulate a GPS tracker reporting position during a road trip.
Step 1 San Francisco (trip start) (update)
Step 2 Los Angeles (passing through) (update)
Step 3 San Diego (destination) (final result)
"""
# First update: initial city estimate.
gps = {"lat": 37.7310, "lng": -122.4527}
await params.result_callback(
{"gps": gps, "city": "San Francisco"},
properties=FunctionCallResultProperties(is_final=False),
)
# Second update: revised city estimate.
await asyncio.sleep(10)
gps = {"lat": 33.96003, "lng": -118.40639}
await params.result_callback(
{"gps": gps, "city": "Los Angeles"},
properties=FunctionCallResultProperties(is_final=False),
)
# Final result: confirmed city.
await asyncio.sleep(10)
gps = {"lat": 32.743569, "lng": -117.20466}
await params.result_callback({"gps": gps, "city": "San Diego"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
enable_async_tool_cancellation=True,
settings=OpenAIResponsesLLMService.Settings(
system_instruction=(
"You are a helpful assistant in a voice conversation. "
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
"formatting that can't be spoken. "
"You have access to a function that starts tracking a moving device's location and "
"provides regular updates on it. When you receive the final location, tell the user "
"the destination has been reached and announce the final city."
),
),
)
# cancel_on_interruption=False makes this an async function call: the LLM
# continues the conversation immediately and receives updates/result later.
llm.register_function(
"track_current_location",
track_current_location,
cancel_on_interruption=False,
timeout_secs=30,
)
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
location_function = FunctionSchema(
name="track_current_location",
description="Track the device's current GPS location during a road trip, reporting position updates as the vehicle moves through cities until it reaches the final destination.",
properties={},
required=[],
)
tools = ToolsSchema(standard_tools=[location_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,197 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
# Simulate a long-running API call, so we can test async function calls.
await asyncio.sleep(20)
await params.result_callback({"conditions": "nice", "temperature": "75"})
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
llm = OpenAIResponsesLLMService(
api_key=os.getenv("OPENAI_API_KEY"),
enable_async_tool_cancellation=True,
settings=OpenAIResponsesLLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
# You can also register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.
llm.register_function(
"get_current_weather",
fetch_weather_from_api,
cancel_on_interruption=False,
timeout_secs=30,
)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
@llm.event_handler("on_connection_error")
async def on_connection_error(service, error):
logger.error(f"LLM connection error: {error}")
@llm.event_handler("on_function_calls_started")
async def on_function_calls_started(service, function_calls):
# Avoid appending this filler message to the LLM context — it would
# alter the conversation history and prevent
# OpenAIResponsesLLMService's previous_response_id optimization from
# matching, forcing a full context resend.
await tts.queue_frame(TTSSpeakFrame("Let me check on that.", append_to_context=False))
@llm.event_handler("on_function_calls_cancelled")
async def on_function_calls_cancelled(service, function_calls):
for item in function_calls:
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(),
stt,
user_aggregator,
llm,
tts,
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -5,27 +5,17 @@
#
import asyncio
import io
import json
import os
import shutil
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from mcp import StdioServerParameters
from mcp.client.session_group import StreamableHttpParameters
from PIL import Image
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
LLMRunFrame,
URLImageRawFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -34,7 +24,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
@@ -47,66 +36,16 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UrlToImageProcessor(FrameProcessor):
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
super().__init__(**kwargs)
self._aiohttp_session = aiohttp_session
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, FunctionCallResultFrame):
await self.push_frame(frame, direction)
image_url = self.extract_url(frame.result)
if image_url:
await self.run_image_process(image_url)
# sometimes we get multiple image urls- process 1 at a time
await asyncio.sleep(1)
else:
await self.push_frame(frame, direction)
def extract_url(self, text: str):
try:
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
except (json.JSONDecodeError, KeyError, TypeError):
pass
async def run_image_process(self, image_url: str):
try:
logger.debug(f"handling image from url: '{image_url}'")
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
image = image.convert("RGB")
frame = URLImageRawFrame(
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
)
await self.push_frame(frame)
except Exception as e:
error_msg = f"Error handling image url {image_url}: {str(e)}"
logger.error(error_msg)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
@@ -114,85 +53,70 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session for API calls
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to memory tools that let you store and recall information,
and tools to answer questions about the user's GitHub repositories and account.
Offer to remember things for the user, like their name, preferences, or anything they'd like.
You can also recall things you've previously stored.
You can also offer to answer users questions about their GitHub repositories and account.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
)
async with (
# https://github.com/modelcontextprotocol/servers/tree/main/src/memory
MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
args=["-y", "@modelcontextprotocol/server-memory"],
# env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection and the user's GitHub repositories and account.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
You can also offer to answer users questions about their GitHub repositories and account.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
) as memory_mcp,
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
),
)
) as github_mcp,
):
memory_tools = await memory_mcp.register_tools(llm)
github_tools = await github_mcp.register_tools(llm)
try:
rijksmuseum_mcp = MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
)
)
except Exception as e:
logger.error(f"error setting up rijksmuseum mcp")
logger.exception("error trace:")
try:
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
github_mcp = MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={
"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"
},
)
)
except Exception as e:
logger.error(f"error setting up mcp.run")
logger.exception("error trace:")
rijksmuseum_tools = {}
github_tools = {}
try:
rijksmuseum_tools = await rijksmuseum_mcp.register_tools(llm)
github_tools = await github_mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
all_standard_tools = rijksmuseum_tools.standard_tools + github_tools.standard_tools
all_standard_tools = memory_tools.standard_tools + github_tools.standard_tools
all_tools = ToolsSchema(standard_tools=all_standard_tools)
context = LLMContext(tools=all_tools)
context = LLMContext(
messages=[{"role": "user", "content": "Please introduce yourself."}],
tools=all_tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
mcp_image_processor = UrlToImageProcessor(aiohttp_session=session)
pipeline = Pipeline(
[
@@ -201,7 +125,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
mcp_image_processor, # URL image -> output
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
@@ -239,10 +162,8 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.error(
f"Please set `RIJKSMUSEUM_API_KEY` and `GITHUB_PERSONAL_ACCESS_TOKEN` environment variables. See https://github.com/r-huijts/rijksmuseum-mcp."
)
if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
logger.error(f"Please set `GITHUB_PERSONAL_ACCESS_TOKEN` environment variable.")
import sys
sys.exit(1)

View File

@@ -4,26 +4,15 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import io
import json
import os
import re
import shutil
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from mcp import StdioServerParameters
from PIL import Image
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
LLMRunFrame,
URLImageRawFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -32,7 +21,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.anthropic.llm import AnthropicLLMService
@@ -44,86 +32,16 @@ from pipecat.transports.daily.transport import DailyParams
load_dotenv(override=True)
class UrlToImageProcessor(FrameProcessor):
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
super().__init__(**kwargs)
self._aiohttp_session = aiohttp_session
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, FunctionCallResultFrame):
await self.push_frame(frame, direction)
image_url = self.extract_url(frame.result)
if image_url:
await self.run_image_process(image_url)
# sometimes we get multiple image urls- process 1 at a time
await asyncio.sleep(1)
else:
await self.push_frame(frame, direction)
def extract_url(self, text: str):
try:
data = json.loads(text)
if "artObject" in data:
return data["artObject"]["webImage"]["url"]
if "artworks" in data and len(data["artworks"]):
return data["artworks"][0]["webImage"]["url"]
except (json.JSONDecodeError, KeyError, TypeError):
pass
return None
async def run_image_process(self, image_url: str):
try:
logger.debug(f"handling image from url: '{image_url}'")
async with self._aiohttp_session.get(image_url) as response:
image_stream = io.BytesIO(await response.content.read())
image = Image.open(image_stream)
image = image.convert("RGB")
frame = URLImageRawFrame(
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
)
await self.push_frame(frame)
except Exception as e:
error_msg = f"Error handling image url {image_url}: {str(e)}"
logger.error(error_msg)
# full list of tools available from rijksmuseum MCP:
# - get_artwork_details
# - get_artwork_image
# - get_user_sets
# - get_user_set_details
# - open_image_in_browser
# - get_artist_timeline
mcp_tools_filter = ["get_artwork_details", "get_artwork_image", "open_image_in_browser"]
def open_image_output_filter(output: str):
pattern = r"Successfully opened image in browser: "
text_to_print = re.sub(pattern, "", output)
print(f"🖼️ link to high resolution artwork: {text_to_print}")
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
),
}
@@ -131,63 +49,48 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Create an HTTP session for API calls
async with aiohttp.ClientSession() as session:
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
settings=CartesiaTTSService.Settings(
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to memory tools that let you store and recall information.
Offer to remember things for the user, like their name, preferences, or anything they'd like.
You can also recall things you've previously stored.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
)
# https://github.com/modelcontextprotocol/servers/tree/main/src/memory
async with MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
args=["-y", "@modelcontextprotocol/server-memory"],
# env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH
),
) as mcp:
tools = await mcp.register_tools(llm)
context = LLMContext(
messages=[{"role": "user", "content": "Please introduce yourself."}],
tools=tools,
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to demonstrate your capabilities in a succinct way.
You have access to tools to search the Rijksmuseum collection.
Offer, for example, to show a floral still life, use the `search_artwork` tool.
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Respond to what the user said in a creative and helpful way.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
settings=AnthropicLLMService.Settings(
system_instruction=system_prompt,
),
)
try:
mcp = MCPClient(
server_params=StdioServerParameters(
command=shutil.which("npx"),
# https://github.com/r-huijts/rijksmuseum-mcp
args=["-y", "mcp-server-rijksmuseum"],
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
),
# Optional
tools_filter=mcp_tools_filter, # Optional
tools_output_filters={"open_image_in_browser": open_image_output_filter},
)
except Exception as e:
logger.error(f"error setting up mcp")
logger.exception("error trace:")
mcp_image = UrlToImageProcessor(aiohttp_session=session)
tools = {}
try:
tools = await mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
@@ -200,7 +103,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
mcp_image, # URL image -> output
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
@@ -238,13 +140,6 @@ async def bot(runner_args: RunnerArguments):
if __name__ == "__main__":
if not os.getenv("RIJKSMUSEUM_API_KEY"):
logger.error(
f"Please set RIJKSMUSEUM_API_KEY environment variable for this example. See https://github.com/r-huijts/rijksmuseum-mcp and https://www.rijksmuseum.nl/en/register?redirectUrl=https://www.https://www.rijksmuseum.nl/en/rijksstudio/my/profile"
)
import sys
sys.exit(1)
from pipecat.runner.run import main
main()

View File

@@ -63,28 +63,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
try:
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
mcp = MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
)
)
except Exception as e:
logger.error(f"error setting up mcp")
logger.exception("error trace:")
tools = {}
try:
tools = await mcp.get_tools_schema()
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
system = f"""
You are a helpful LLM in a voice call.
Your goal is to answer questions about the user's GitHub repositories and account.
@@ -94,53 +72,65 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
Just respond with short sentences when you are carrying out tool calls.
"""
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system,
tools=tools,
)
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
async with MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
)
) as mcp:
tools = await mcp.get_tools_schema()
await mcp.register_tools_schema(tools, llm)
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system,
tools=tools,
)
context = LLMContext([{"role": "developer", "content": "Please introduce yourself."}])
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
await mcp.register_tools_schema(tools, llm)
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_aggregator, # User spoken responses
llm, # LLM
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
context = LLMContext([{"role": "user", "content": "Please introduce yourself."}])
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
user_aggregator, # User spoken responses
llm, # LLM
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
await runner.run(task)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):

View File

@@ -63,83 +63,78 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
),
)
system_prompt = f"""
You are a helpful LLM in a voice call.
Your goal is to answer questions about the user's GitHub repositories and account.
You have access to a number of tools provided by Github. Use any and all tools to help users.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
system_prompt = """\
You are a helpful LLM in a voice call.
Your goal is to answer questions about the user's GitHub repositories and account.
You have access to a number of tools provided by Github. Use any and all tools to help users.
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
Don't overexplain what you are doing.
Just respond with short sentences when you are carrying out tool calls.
"""
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_prompt,
)
try:
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
mcp = MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
)
)
except Exception as e:
logger.error(f"error setting up mcp")
logger.exception("error trace:")
tools = {}
try:
tools = await mcp.register_tools(llm)
except Exception as e:
logger.error(f"error registering tools")
logger.exception("error trace:")
context = LLMContext(tools=tools)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
settings=GoogleLLMService.Settings(
system_instruction=system_prompt,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
# Github MCP docs: https://github.com/github/github-mcp-server
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
async with MCPClient(
server_params=StreamableHttpParameters(
url="https://api.githubcopilot.com/mcp/",
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
)
) as mcp:
tools = await mcp.register_tools(llm)
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
context = LLMContext(
messages=[{"role": "user", "content": "Please introduce yourself."}],
tools=tools,
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User spoken responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses and tool context
]
)
await runner.run(task)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected: {client}")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):

View File

@@ -0,0 +1,162 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""
Inworld Realtime Example
This example demonstrates using Inworld's Realtime API for real-time voice
conversations. The Inworld Realtime API is OpenAI-compatible and operates
as a cascade STT/LLM/TTS pipeline under the hood, with built-in semantic
voice activity detection for turn management.
Features:
- Real-time audio streaming with low latency
- Built-in semantic VAD (voice activity detection)
- Streaming user transcription
- Text and audio input
Requirements:
- INWORLD_API_KEY environment variable set
- pip install pipecat-ai[inworld]
Usage:
python realtime-inworld.py --transport webrtc
python realtime-inworld.py --transport daily
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import LLMRunFrame
from pipecat.observers.loggers.transcription_log_observer import (
TranscriptionLogObserver,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
AssistantTurnStoppedMessage,
LLMContextAggregatorPair,
UserTurnStoppedMessage,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# --- Transport Configuration ---
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Starting Inworld Realtime bot")
# Create the Inworld Realtime LLM service.
# Common params (llm_model, voice, tts_model, stt_model) are top-level.
# For full control, use settings=InworldRealtimeLLMService.Settings(session_properties=...)
#
# llm_model can be any supported model or an Inworld Router.
# See: https://docs.inworld.ai/router/introduction
llm = InworldRealtimeLLMService(
api_key=os.getenv("INWORLD_API_KEY"),
llm_model="xai/grok-4-1-fast-non-reasoning",
voice="Sarah",
settings=InworldRealtimeLLMService.Settings(
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
Your voice and personality should be warm and engaging. Keep your responses
concise and conversational since this is a voice interaction.
Always be helpful and proactive in offering assistance.""",
),
)
# Create context with initial message
context = LLMContext(
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
)
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
# Build the pipeline
pipeline = Pipeline(
[
transport.input(),
user_aggregator,
llm, # Inworld Realtime (handles STT + LLM + TTS)
transport.output(),
assistant_aggregator,
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[TranscriptionLogObserver()],
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("Client connected")
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("Client disconnected")
await task.cancel()
@user_aggregator.event_handler("on_user_turn_stopped")
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
logger.info(f"Transcript: {timestamp}user: {message.content}")
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
logger.info(f"Transcript: {timestamp}assistant: {message.content}")
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -16,7 +16,7 @@ from pipecat.pipeline.task import PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gladia import GladiaSTTService
from pipecat.services.gladia.stt import GladiaSTTService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -40,7 +40,7 @@ class TranscriptHandler:
Maintains a list of conversation messages and outputs them either to a log
or to a file as they are received. Each message includes its timestamp and role.
Attributes:
Parameters:
messages: List of all processed transcript messages
output_file: Optional path to file where transcript is saved. If None, outputs to log only.
"""

View File

@@ -6,7 +6,6 @@
import asyncio
import os
from typing import Any
from dotenv import load_dotenv
from loguru import logger

View File

@@ -25,7 +25,6 @@ from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.whisper.stt import MLXModel, WhisperSTTServiceMLX
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -25,7 +25,6 @@ from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.services.whisper.stt import Model, WhisperSTTService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -114,6 +114,14 @@ async def main():
logger.info("Client disconnected")
await task.cancel()
@transport.event_handler("on_avatar_connected")
async def on_avatar_connected(transport, participant):
logger.info("Avatar connected")
@transport.event_handler("on_avatar_disconnected")
async def on_avatar_disconnected(transport, participant, reason):
logger.info(f"Avatar disconnected. Reason: {reason}")
runner = PipelineRunner()
await runner.run(task)

View File

@@ -28,7 +28,6 @@ from pipecat.frames.frames import (
Frame,
OutputImageRawFrame,
StartFrame,
SystemFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner

View File

@@ -0,0 +1,127 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import (
LLMContextAggregatorPair,
LLMUserAggregatorParams,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.mistral.tts import MistralTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We use lambdas to defer transport parameter creation until the transport
# type is selected at runtime.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = MistralTTSService(
api_key=os.getenv("MISTRAL_API_KEY"),
settings=MistralTTSService.Settings(
voice="c69964a6-ab8b-4f8a-9465-ec0925096ec8",
),
)
llm = OpenAILLMService(
api_key=os.getenv("OPENAI_API_KEY"),
settings=OpenAILLMService.Settings(
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
),
)
context = LLMContext()
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
context,
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
user_aggregator, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
assistant_aggregator, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
context.add_message(
{"role": "developer", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -96,7 +96,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
allow_interruptions=True,
),
)

View File

@@ -9,7 +9,7 @@ description = "An open source framework for voice (and multimodal) assistants"
license = "BSD-2-Clause"
license-files = ["LICENSE"]
readme = "README.md"
requires-python = ">=3.10"
requires-python = ">=3.11"
keywords = ["webrtc", "audio", "video", "ai"]
classifiers = [
"Development Status :: 5 - Production/Stable",
@@ -41,7 +41,7 @@ dependencies = [
# Required by LocalSmartTurnAnalyzerV3
# Inlined here instead of using a self-referential extra for Poetry compatibility.
"transformers>=4.48.0,<6",
"onnxruntime~=1.23.2",
"onnxruntime~=1.24.3",
]
[project.urls]
@@ -77,7 +77,7 @@ groq = [ "groq>=0.23.0,<2" ]
gstreamer = [ "pygobject~=3.50.0" ]
heygen = [ "livekit>=1.0.13,<2", "pipecat-ai[websockets-base]" ]
hume = [ "hume>=0.11.2,<1" ]
inworld = []
inworld = [ "pipecat-ai[websockets-base]" ]
koala = [ "pvkoala~=2.0.3" ]
kokoro = [ "kokoro-onnx>=0.5.0,<1", "requests>=2.32.5,<3" ]
langchain = [ "langchain>=1.2.13,<2", "langchain-community>=0.4.1,<1", "langchain-openai>=1.1.12,<2" ]
@@ -88,7 +88,7 @@ local = [ "pyaudio~=0.2.14" ]
local-smart-turn = [ "coremltools>=8.0", "transformers>=4.48.0,<6", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
mcp = [ "mcp[cli]>=1.11.0,<2" ]
mem0 = [ "mem0ai>=1.0.8,<2" ]
mistral = []
mistral = ["mistralai>=2.0.0,<3"]
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
moondream = [ "accelerate~=1.10.0", "einops~=0.8.0", "pyvips[binary]~=3.0.0", "timm~=1.0.13", "transformers>=4.48.0,<6" ]
nebius = []
@@ -101,10 +101,8 @@ openrouter = []
perplexity = []
piper = [ "piper-tts>=1.3.0,<2", "requests>=2.32.5,<3" ]
qwen = []
remote-smart-turn = []
resembleai = [ "pipecat-ai[websockets-base]" ]
rime = [ "pipecat-ai[websockets-base]" ]
riva = [ "pipecat-ai[nvidia]" ]
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-small-webrtc-prebuilt>=2.4.0"]
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
sambanova = []
@@ -135,9 +133,9 @@ dev = [
"pip-tools~=7.5.3",
"pre-commit~=4.5.1",
"pyright>=1.1.404,<1.2",
"pytest~=8.4.1",
"pytest-asyncio~=1.3.0",
"pytest-aiohttp==1.1.0",
"pytest>=9.0.0,<10",
"pytest-asyncio>=1.0.0,<2",
"pytest-aiohttp>=1.0.0,<2",
"ruff>=0.12.11,<1",
"setuptools~=78.1.1",
"setuptools_scm~=8.3.1",
@@ -211,7 +209,6 @@ ignore = [
"**/__init__.py" = ["D104"]
# Skip specific rules for generated protobuf files
"**/*_pb2.py" = ["D"]
"src/pipecat/services/__init__.py" = ["D"]
[tool.ruff.lint.pydocstyle]
convention = "google"

View File

@@ -171,6 +171,7 @@ class EvalRunner:
async def save_audio(self, name: str, audio: bytes, sample_rate: int, num_channels: int):
if len(audio) > 0:
filename = self._recording_file_name(name)
os.makedirs(os.path.dirname(filename), exist_ok=True)
logger.debug(f"Saving {name} audio to {filename}")
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:

View File

@@ -10,11 +10,13 @@ This module provides the abstract base class for implementing LLM provider-speci
adapters that handle tool format conversion and standardization.
"""
import warnings
from abc import ABC, abstractmethod
from typing import Any, Dict, Generic, List, Optional, TypeVar
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import (
LLMContext,
@@ -48,6 +50,21 @@ class BaseLLMAdapter(ABC, Generic[TLLMInvocationParams]):
def __init__(self):
"""Initialize the adapter."""
self._warned_system_instruction = False
self._builtin_tools: Dict[str, FunctionSchema] = {}
@property
def builtin_tools(self) -> Dict[str, FunctionSchema]:
"""Built-in tools automatically merged into every inference request.
Keyed by tool name for O(1) lookup, insertion, and removal. The
service injects tools here so they are sent transparently on every
inference request without the user having to add them to their
``ToolsSchema``.
Returns:
Mutable dict mapping tool name to ``FunctionSchema``.
"""
return self._builtin_tools
@property
@abstractmethod
@@ -108,20 +125,29 @@ class BaseLLMAdapter(ABC, Generic[TLLMInvocationParams]):
"""
return LLMSpecificMessage(llm=self.id_for_llm_specific_messages, message=message)
def get_messages(self, context: LLMContext) -> List[LLMContextMessage]:
def get_messages(
self, context: LLMContext, *, truncate_large_values: bool = False
) -> List[LLMContextMessage]:
"""Get messages from the LLM context, including standard and LLM-specific messages.
Args:
context: The LLM context containing messages.
truncate_large_values: If True, return deep copies of messages with
large values replaced by short placeholders.
Returns:
List of messages including standard and LLM-specific messages.
"""
return context.get_messages(self.id_for_llm_specific_messages)
return context.get_messages(
self.id_for_llm_specific_messages, truncate_large_values=truncate_large_values
)
def from_standard_tools(self, tools: Any) -> List[Any] | NotGiven:
"""Convert tools from standard format to provider format.
Built-in tools are automatically merged into the schema before conversion so that every
inference request receives them without the user having to declare them explicitly.
Args:
tools: Tools in standard format or provider-specific format.
@@ -129,8 +155,31 @@ class BaseLLMAdapter(ABC, Generic[TLLMInvocationParams]):
List of tools converted to provider format, or original tools
if not in standard format.
"""
if self._builtin_tools:
if isinstance(tools, ToolsSchema):
tools = ToolsSchema(
standard_tools=tools.standard_tools + list(self._builtin_tools.values()),
custom_tools=tools.custom_tools,
)
else:
# User supplied tools in a legacy/provider-specific format.
# Built-in tools cannot be safely merged, so they will not be injected.
# Migrate to ToolsSchema to enable built-in tool support; use custom_tools
# as an escape hatch for any provider-specific tools that don't fit the
# standard schema.
if tools is not None:
warnings.warn(
"Built-in tools (e.g. async tool cancellation) could not be injected "
"because the supplied tools are not a ToolsSchema instance. "
"Migrate to ToolsSchema to enable built-in tool support. "
"Use ToolsSchema(custom_tools=...) as an escape hatch for any "
"provider-specific tools that don't fit the standard schema.",
DeprecationWarning,
stacklevel=2,
)
# Fall through and return the original tools unchanged.
if isinstance(tools, ToolsSchema):
logger.debug(f"Retrieving the tools using the adapter: {type(self)}")
return self.to_provider_tools_format(tools)
# Fallback to return the same tools in case they are not in a standard format
return tools

View File

@@ -21,10 +21,12 @@ class AdapterType(Enum):
"""Supported adapter types for custom tools.
Parameters:
GEMINI: Google Gemini adapter - currently the only service supporting custom tools.
GEMINI: Google Gemini adapter.
OPENAI: OpenAI adapter (Chat Completions, Responses, and Realtime API).
"""
GEMINI = "gemini" # that is the only service where we are able to add custom tools for now
GEMINI = "gemini"
OPENAI = "openai"
class ToolsSchema:

View File

@@ -16,7 +16,7 @@ from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage

View File

@@ -19,7 +19,7 @@ from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
from pipecat.services.xai.realtime import events
@@ -27,7 +27,7 @@ from pipecat.services.xai.realtime import events
class GrokRealtimeLLMInvocationParams(TypedDict):
"""Context-based parameters for invoking Grok Realtime API.
Attributes:
Parameters:
system_instruction: System prompt/instructions for the session.
messages: List of conversation items formatted for Grok Realtime.
tools: List of tool definitions (function, web_search, x_search, file_search).
@@ -77,7 +77,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from context in a format safe for logging.
Removes or truncates sensitive data like audio content.
Binary data (images, audio) is replaced with short placeholders.
Args:
context: The LLM context containing messages.
@@ -85,18 +85,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages with sensitive data redacted.
"""
msgs = []
for message in self.get_messages(context):
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item.get("type") == "input_audio":
item["audio"] = "..."
if item.get("type") == "audio":
item["audio"] = "..."
msgs.append(msg)
return msgs
return self.get_messages(context, truncate_large_values=True)
@dataclass
class ConvertedMessages:

View File

@@ -0,0 +1,244 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Inworld Realtime LLM adapter for Pipecat.
Converts Pipecat's tool schemas and context into the format required by
Inworld's Realtime API.
"""
import copy
import json
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, TypedDict
from loguru import logger
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
from pipecat.services.inworld.realtime import events
class InworldRealtimeLLMInvocationParams(TypedDict):
"""Context-based parameters for invoking Inworld Realtime API.
Attributes:
system_instruction: System prompt/instructions for the session.
messages: List of conversation items formatted for Inworld Realtime.
tools: List of tool definitions.
"""
system_instruction: Optional[str]
messages: List[events.ConversationItem]
tools: List[Dict[str, Any]]
class InworldRealtimeLLMAdapter(BaseLLMAdapter):
"""LLM adapter for Inworld Realtime API.
Converts Pipecat's universal context and tool schemas into the specific
format required by Inworld's Realtime API.
"""
@property
def id_for_llm_specific_messages(self) -> str:
"""Get the identifier used in LLMSpecificMessage instances for Inworld Realtime."""
return "inworld-realtime"
def get_llm_invocation_params(
self, context: LLMContext, *, system_instruction: Optional[str] = None
) -> InworldRealtimeLLMInvocationParams:
"""Get Inworld Realtime-specific LLM invocation parameters from a universal LLM context.
Args:
context: The LLM context containing messages, tools, etc.
system_instruction: Optional system instruction from service settings.
Returns:
Dictionary of parameters for invoking Inworld's Realtime API.
"""
messages = self._from_universal_context_messages(self.get_messages(context))
effective_system = self._resolve_system_instruction(
messages.system_instruction,
system_instruction,
discard_context_system=True,
)
return {
"system_instruction": effective_system,
"messages": messages.messages,
"tools": self.from_standard_tools(context.tools) or [],
}
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from context in a format safe for logging.
Binary data (images, audio) is replaced with short placeholders.
Args:
context: The LLM context containing messages.
Returns:
List of messages with sensitive data redacted.
"""
return self.get_messages(context, truncate_large_values=True)
@dataclass
class ConvertedMessages:
"""Container for Inworld-formatted messages converted from universal context."""
messages: List[events.ConversationItem]
system_instruction: Optional[str] = None
def _from_universal_context_messages(
self, universal_context_messages: List[LLMContextMessage]
) -> ConvertedMessages:
"""Convert universal context messages to Inworld Realtime format.
Similar to OpenAI Realtime, we pack conversation history into a single
user message since the realtime API doesn't support loading long histories.
Args:
universal_context_messages: List of messages in universal format.
Returns:
ConvertedMessages with Inworld-formatted messages and system instruction.
"""
if not universal_context_messages:
return self.ConvertedMessages(messages=[])
messages = copy.deepcopy(universal_context_messages)
system_instruction = None
# Extract system message as session instructions
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
system_instruction = content
elif isinstance(content, list):
system_instruction = content[0].get("text")
if not messages:
return self.ConvertedMessages(messages=[], system_instruction=system_instruction)
# Convert any remaining "system"/"developer" messages to "user"
for msg in messages:
if msg.get("role") in ("system", "developer"):
msg["role"] = "user"
# Single user message can be sent normally
if len(messages) == 1 and messages[0].get("role") == "user":
return self.ConvertedMessages(
messages=[self._from_universal_context_message(messages[0])],
system_instruction=system_instruction,
)
# Pack multiple messages into a single user message
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simply say that you
are ready to continue the conversation."""
return self.ConvertedMessages(
messages=[
events.ConversationItem(
role="user",
type="message",
content=[
events.ItemContent(
type="input_text",
text="\n\n".join(
[
intro_text,
json.dumps(messages, indent=2),
trailing_text,
]
),
)
],
)
],
system_instruction=system_instruction,
)
def _from_universal_context_message(
self, message: LLMContextMessage
) -> events.ConversationItem:
"""Convert a single universal context message to Inworld format.
Args:
message: Message in universal format.
Returns:
ConversationItem formatted for Inworld Realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
if isinstance(content, list):
text_content = ""
for c in content:
if c.get("type") == "text":
text_content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
content = text_content.strip()
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
@staticmethod
def _to_inworld_function_format(function: FunctionSchema) -> Dict[str, Any]:
"""Convert a function schema to Inworld Realtime function format.
Args:
function: The function schema to convert.
Returns:
Dictionary in Inworld Realtime function format.
"""
return {
"type": "function",
"name": function.name,
"description": function.description,
"parameters": {
"type": "object",
"properties": function.properties,
"required": function.required,
},
}
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Dict[str, Any]]:
"""Convert tool schemas to Inworld Realtime format.
Args:
tools_schema: The tools schema containing functions to convert.
Returns:
List of tool definitions in Inworld Realtime format.
"""
functions_schema = tools_schema.standard_tools
return [self._to_inworld_function_format(func) for func in functions_schema]

View File

@@ -6,7 +6,6 @@
"""OpenAI LLM adapter for Pipecat."""
import copy
from typing import Any, Dict, List, Optional, TypedDict
from openai._types import NotGiven as OpenAINotGiven
@@ -17,7 +16,7 @@ from openai.types.chat import (
)
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
@@ -107,15 +106,19 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
with ChatCompletion API.
"""
functions_schema = tools_schema.standard_tools
return [
formatted_standard_tools = [
ChatCompletionToolParam(type="function", function=func.to_default_dict())
for func in functions_schema
]
custom_openai_tools = []
if tools_schema.custom_tools:
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
return formatted_standard_tools + custom_openai_tools
def get_messages_for_logging(self, context: LLMContext) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about OpenAI.
Removes or truncates sensitive data like image content for safe logging.
Binary data (images, audio) is replaced with short placeholders.
Args:
context: The LLM context containing messages.
@@ -123,21 +126,7 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
Returns:
List of messages in a format ready for logging about OpenAI.
"""
msgs = []
for message in self.get_messages(context):
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item["type"] == "image_url":
if item["image_url"]["url"].startswith("data:image/"):
item["image_url"]["url"] = "data:image/..."
if item["type"] == "input_audio":
item["input_audio"]["data"] = "..."
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
msg["data"] = "..."
msgs.append(msg)
return msgs
return self.get_messages(context, truncate_large_values=True)
def _from_universal_context_messages(
self,

View File

@@ -71,7 +71,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
"""Get messages from a universal LLM context in a format ready for logging about OpenAI Realtime.
Removes or truncates sensitive data like image content for safe logging.
Binary data (images, audio) is replaced with short placeholders.
This is a placeholder until support for universal LLMContext machinery is added for OpenAI Realtime.
@@ -81,25 +81,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
Returns:
List of messages in a format ready for logging about OpenAI Realtime.
"""
# NOTE: this is the same as in OpenAIAdapter, as that's what it was
# prior to a refactor. Worth noting that for OpenAI Realtime
# specifically, not everything handled here is necessarily supported
# (or supported yet).
msgs = []
for message in self.get_messages(context):
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item["type"] == "image_url":
if item["image_url"]["url"].startswith("data:image/"):
item["image_url"]["url"] = "data:image/..."
if item["type"] == "input_audio":
item["input_audio"]["data"] = "..."
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
msg["data"] = "..."
msgs.append(msg)
return msgs
return self.get_messages(context, truncate_large_values=True)
@dataclass
class ConvertedMessages:
@@ -236,4 +218,10 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
List of function definitions in OpenAI Realtime format.
"""
functions_schema = tools_schema.standard_tools
return [self._to_openai_realtime_function_format(func) for func in functions_schema]
formatted_standard_tools = [
self._to_openai_realtime_function_format(func) for func in functions_schema
]
custom_openai_tools = []
if tools_schema.custom_tools:
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
return formatted_standard_tools + custom_openai_tools

View File

@@ -6,19 +6,17 @@
"""OpenAI Responses API adapter for Pipecat."""
import copy
from typing import Any, Dict, List, Optional, TypedDict
from openai._types import NotGiven as OpenAINotGiven
from openai.types.responses import FunctionToolParam, ResponseInputItemParam
from openai.types.responses import FunctionToolParam, ResponseInputItemParam, ToolParam
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.processors.aggregators.llm_context import (
LLMContext,
LLMContextMessage,
LLMSpecificMessage,
NotGiven,
)
@@ -26,7 +24,7 @@ class OpenAIResponsesLLMInvocationParams(TypedDict, total=False):
"""Context-based parameters for invoking OpenAI Responses API."""
input: List[ResponseInputItemParam]
tools: List[FunctionToolParam] | OpenAINotGiven
tools: List[ToolParam] | OpenAINotGiven
instructions: str
@@ -107,7 +105,7 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
return params
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[FunctionToolParam]:
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[ToolParam]:
"""Convert function schemas to Responses API function tool format.
Args:
@@ -129,12 +127,15 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
if "description" in d:
tool["description"] = d["description"]
result.append(tool)
return result
custom_openai_tools = []
if tools_schema.custom_tools:
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
return result + custom_openai_tools
def get_messages_for_logging(self, context: LLMContext) -> List[Dict[str, Any]]:
"""Get messages from context in a format ready for logging.
Removes or truncates sensitive data like image content for safe logging.
Binary data (images, audio) is replaced with short placeholders.
Args:
context: The LLM context containing messages.
@@ -142,19 +143,7 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
Returns:
List of messages in a format ready for logging.
"""
msgs = []
for message in self.get_messages(context):
msg = copy.deepcopy(message)
if "content" in msg:
if isinstance(msg["content"], list):
for item in msg["content"]:
if item.get("type") == "image_url":
if item["image_url"]["url"].startswith("data:image/"):
item["image_url"]["url"] = "data:image/..."
if item.get("type") == "input_audio":
item["input_audio"]["data"] = "..."
msgs.append(msg)
return msgs
return self.get_messages(context, truncate_large_values=True)
def _convert_messages_to_input(
self, messages: List[LLMContextMessage]

View File

@@ -1,58 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Base interruption strategy for determining when users can interrupt bot speech."""
from abc import ABC, abstractmethod
class BaseInterruptionStrategy(ABC):
"""Base class for interruption strategies.
This is a base class for interruption strategies. Interruption strategies
decide when the user can interrupt the bot while the bot is speaking. For
example, there could be strategies based on audio volume or strategies based
on the number of words the user spoke.
"""
async def append_audio(self, audio: bytes, sample_rate: int):
"""Append audio data to the strategy for analysis.
Not all strategies handle audio. Default implementation does nothing.
Args:
audio: Raw audio bytes to append.
sample_rate: Sample rate of the audio data in Hz.
"""
pass
async def append_text(self, text: str):
"""Append text data to the strategy for analysis.
Not all strategies handle text. Default implementation does nothing.
Args:
text: Text string to append for analysis.
"""
pass
@abstractmethod
async def should_interrupt(self) -> bool:
"""Determine if the user should interrupt the bot.
This is called when the user stops speaking and it's time to decide
whether the user should interrupt the bot. The decision will be based on
the aggregated audio and/or text.
Returns:
True if the user should interrupt the bot, False otherwise.
"""
pass
@abstractmethod
async def reset(self):
"""Reset the current accumulated text and/or audio."""
pass

View File

@@ -1,75 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Minimum words interruption strategy for word count-based interruptions."""
from loguru import logger
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
class MinWordsInterruptionStrategy(BaseInterruptionStrategy):
"""Interruption strategy based on minimum number of words spoken.
This is an interruption strategy based on a minimum number of words said
by the user. That is, the strategy will be true if the user has said at
least that amount of words.
.. deprecated:: 0.0.99
This class is deprecated, use
`pipecat.turns.user_start.MinWordsUserTurnStartStrategy` with `PipelineTask`'s
new `user_turn_strategies` parameter instead.
"""
def __init__(self, *, min_words: int):
"""Initialize the minimum words interruption strategy.
Args:
min_words: Minimum number of words required to trigger an interruption.
"""
super().__init__()
self._min_words = min_words
self._text = ""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"'pipecat.audio.interruptions' is deprecated. "
"Use `pipecat.turns.user_start.MinWordsUserTurnStartStrategy` with `PipelineTask`'s "
"new `user_turn_strategies` parameter instead.",
DeprecationWarning,
)
async def append_text(self, text: str):
"""Append text for word count analysis.
Args:
text: Text string to append to the accumulated text.
Note: Not all strategies need to handle text.
"""
self._text += text
async def should_interrupt(self) -> bool:
"""Check if the minimum word count has been reached.
Returns:
True if the user has spoken at least the minimum number of words.
"""
word_count = len(self._text.split())
interrupt = word_count >= self._min_words
logger.debug(
f"should_interrupt={interrupt} num_spoken_words={word_count} min_words={self._min_words}"
)
return interrupt
async def reset(self):
"""Reset the accumulated text for the next analysis cycle."""
self._text = ""

View File

@@ -10,8 +10,11 @@ This module provides a controller that wraps a VADAnalyzer to track speech state
and emit events when speech starts, stops, or is actively detected.
"""
import asyncio
import time
from typing import Type
from typing import Optional, Type
from loguru import logger
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
from pipecat.frames.frames import (
@@ -22,6 +25,7 @@ from pipecat.frames.frames import (
VADParamsUpdateFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.utils.asyncio.task_manager import BaseTaskManager
from pipecat.utils.base_object import BaseObject
@@ -35,7 +39,8 @@ class VADController(BaseObject):
Event handlers available:
- on_speech_started: Called when speech begins.
- on_speech_stopped: Called when speech ends.
- on_speech_stopped: Called when speech ends, including forced stop when
the audio stream goes idle (no frames received while speaking).
- on_speech_activity: Called periodically while speech is detected.
- on_push_frame: Called when the controller wants to push a frame.
- on_broadcast_frame: Called when the controller wants to broadcast a frame.
@@ -63,30 +68,62 @@ class VADController(BaseObject):
...
"""
def __init__(self, vad_analyzer: VADAnalyzer, *, speech_activity_period: float = 0.2):
def __init__(
self,
vad_analyzer: VADAnalyzer,
*,
speech_activity_period: float = 0.2,
audio_idle_timeout: float = 1.0,
):
"""Initialize the VAD controller.
Args:
vad_analyzer: The `VADAnalyzer` instance for processing audio.
speech_activity_period: Minimum interval in seconds between
`on_speech_activity` events. Defaults to 0.2.
audio_idle_timeout: Timeout in seconds to force speech stop
when no audio frames are received while in SPEAKING state.
This handles cases like mic mute mid-speech.
Set to 0 to disable. Defaults to 1.0.
"""
super().__init__()
self._vad_analyzer = vad_analyzer
self._vad_state: VADState = VADState.QUIET
self._task_manager: Optional[BaseTaskManager] = None
# Last time a on_speech_activity was triggered.
self._speech_activity_time = 0
# How often a on_speech_activity event should be triggered (value should
# be greater than the audio chunks to have any effect).
self._speech_activity_period = speech_activity_period
# Audio idle detection: force speech stop when no audio arrives
# while in SPEAKING state (e.g. user mutes mic mid-speech).
self._last_audio_time: float = 0.0
self._audio_idle_timeout = audio_idle_timeout
self._audio_idle_task: Optional[asyncio.Task] = None
self._register_event_handler("on_speech_started", sync=True)
self._register_event_handler("on_speech_stopped", sync=True)
self._register_event_handler("on_speech_activity", sync=True)
self._register_event_handler("on_push_frame", sync=True)
self._register_event_handler("on_broadcast_frame", sync=True)
async def setup(self, task_manager: BaseTaskManager):
"""Initialize the controller with the given task manager.
Args:
task_manager: The task manager to be associated with this instance.
"""
self._task_manager = task_manager
self._last_audio_time = time.monotonic()
if self._audio_idle_timeout > 0 and not self._audio_idle_task:
self._audio_idle_task = self._task_manager.create_task(
self._audio_idle_handler(),
f"{self}::_audio_idle_handler",
)
async def process_frame(self, frame: Frame):
"""Process a frame and handle VAD-related events.
@@ -116,6 +153,10 @@ class VADController(BaseObject):
It waits for all currently executing event handler tasks to finish
before returning.
"""
await super().cleanup()
if self._audio_idle_task and self._task_manager:
await self._task_manager.cancel_task(self._audio_idle_task)
self._audio_idle_task = None
if self._vad_analyzer:
await self._vad_analyzer.cleanup()
@@ -128,6 +169,8 @@ class VADController(BaseObject):
Args:
frame: Audio frame to process.
"""
self._last_audio_time = time.monotonic()
self._vad_state = await self._handle_vad(frame.audio, self._vad_state)
if self._vad_state == VADState.SPEAKING:
@@ -149,6 +192,29 @@ class VADController(BaseObject):
vad_state = new_vad_state
return vad_state
async def _audio_idle_handler(self):
"""Monitor for an idle audio stream while in SPEAKING state.
When no audio frames arrive for `audio_idle_timeout` seconds
(e.g. user mutes mic mid-speech), forces a transition to QUIET and
emits `on_speech_stopped`.
"""
while True:
deadline = self._last_audio_time + self._audio_idle_timeout
remaining = deadline - time.monotonic()
if remaining > 0:
# Audio is still recent; sleep only for the remaining window.
await asyncio.sleep(remaining)
continue
if self._vad_state == VADState.SPEAKING:
logger.warning(f"{self}: no audio received while speaking, forcing speech stop")
self._vad_state = VADState.QUIET
await self._call_event_handler("on_speech_stopped")
# Wait for the next potential idle window.
await asyncio.sleep(self._audio_idle_timeout)
async def _maybe_speech_activity(self):
"""Handle user speaking frame."""
diff_time = time.time() - self._speech_activity_time

View File

@@ -29,7 +29,6 @@ from typing import (
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.audio.dtmf.types import KeypadEntry
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
from pipecat.audio.turn.base_turn_analyzer import BaseTurnParams
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.metrics.metrics import MetricsData
@@ -39,7 +38,7 @@ from pipecat.utils.time import nanoseconds_to_str
from pipecat.utils.utils import obj_count, obj_id
if TYPE_CHECKING:
from pipecat.processors.aggregators.llm_context import LLMContext, NotGiven
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage, NotGiven
from pipecat.processors.frame_processor import FrameProcessor
from pipecat.services.settings import ServiceSettings
from pipecat.utils.context.llm_context_summarization import LLMContextSummaryConfig
@@ -462,137 +461,6 @@ class LLMContextAssistantTimestampFrame(DataFrame):
timestamp: str
@dataclass
class TranscriptionMessage:
"""A message in a conversation transcript.
A message in a conversation transcript containing the role and content.
Messages are in standard format with roles normalized to user/assistant.
Parameters:
role: The role of the message sender (user or assistant).
content: The message content/text.
user_id: Optional identifier for the user.
timestamp: Optional timestamp when the message was created.
.. deprecated:: 0.0.99
`TranscriptionMessage` is deprecated and will be removed in a future version.
Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.
"""
role: Literal["user", "assistant"]
content: str
user_id: Optional[str] = None
timestamp: Optional[str] = None
def __post_init__(self):
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"TranscriptionMessage is deprecated and will be removed in a future version. "
"Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class ThoughtTranscriptionMessage:
"""An LLM thought message in a conversation transcript.
.. deprecated:: 0.0.99
`ThoughtTranscriptionMessage` is deprecated and will be removed in a future version.
Use `LLMAssistantAggregator`'s new events instead.
"""
role: Literal["assistant"] = field(default="assistant", init=False)
content: str
timestamp: Optional[str] = None
def __post_init__(self):
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"ThoughtTranscriptionMessage is deprecated and will be removed in a future version. "
"Use `LLMAssistantAggregator`'s new events instead.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class TranscriptionUpdateFrame(DataFrame):
"""Frame containing new messages added to conversation transcript.
A frame containing new messages added to the conversation transcript.
This frame is emitted when new messages are added to the conversation history,
containing only the newly added messages rather than the full transcript.
Messages have normalized roles (user/assistant) regardless of the LLM service used.
Messages are always in the OpenAI standard message format, which supports both:
Examples:
Simple format::
[
{
"role": "user",
"content": "Hi, how are you?"
},
{
"role": "assistant",
"content": "Great! And you?"
}
]
Content list format::
[
{
"role": "user",
"content": [{"type": "text", "text": "Hi, how are you?"}]
},
{
"role": "assistant",
"content": [{"type": "text", "text": "Great! And you?"}]
}
]
OpenAI supports both formats. Anthropic and Google messages are converted to the
content list format.
Parameters:
messages: List of new transcript messages that were added.
.. deprecated:: 0.0.99
`TranscriptionUpdateFrame` is deprecated and will be removed in a future version.
Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.
"""
messages: List[TranscriptionMessage | ThoughtTranscriptionMessage]
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"TranscriptionUpdateFrame is deprecated and will be removed in a future version. "
"Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.",
DeprecationWarning,
stacklevel=2,
)
def __str__(self):
pts = format_pts(self.pts)
return f"{self.name}(pts: {pts}, messages: {len(self.messages)})"
@dataclass
class LLMContextFrame(Frame):
"""Frame containing a universal LLM context.
@@ -719,6 +587,23 @@ class LLMMessagesUpdateFrame(DataFrame):
run_llm: Optional[bool] = None
@dataclass
class LLMMessagesTransformFrame(DataFrame):
"""Frame containing a transform function to modify the current context's LLM messages.
A frame containing a transform function that takes the context's current list
of LLM messages and returns a modified list.
Parameters:
transform: A function that takes a list of messages and returns a
modified list.
run_llm: Whether the context update should be sent to the LLM.
"""
transform: Callable[[List["LLMContextMessage"]], List["LLMContextMessage"]]
run_llm: Optional[bool] = None
@dataclass
class LLMSetToolsFrame(DataFrame):
"""Frame containing tools for LLM function calling.
@@ -778,10 +663,14 @@ class FunctionCallResultProperties:
Parameters:
run_llm: Whether to run the LLM after receiving this result.
on_context_updated: Callback to execute when context is updated.
is_final: Whether this is the final result for the function call. When
``False`` the result is treated as an intermediate update. Defaults to ``True``.
Only meaningful for async function calls (``cancel_on_interruption=False``).
"""
run_llm: Optional[bool] = None
on_context_updated: Optional[Callable[[], Awaitable[None]]] = None
is_final: bool = True
@dataclass
@@ -841,26 +730,66 @@ class OutputTransportMessageFrame(DataFrame):
@dataclass
class DTMFFrame:
"""Base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
"""Marker base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
Parameters:
button: The DTMF keypad entry that was pressed.
Used only as a shared tag so that both input and output DTMF frames can
be identified via ``isinstance(frame, DTMFFrame)``. The concrete frames
define their own fields.
"""
button: KeypadEntry
pass
@dataclass
class OutputDTMFFrame(DTMFFrame, DataFrame):
"""DTMF keypress output frame for transport queuing.
A DTMF keypress output that will be queued. If your transport supports
multiple dial-out destinations, use the `transport_destination` field to
specify where the DTMF keypress should be sent.
Parameters:
button: Convenience shortcut for sending a single DTMF keypad
entry. Equivalent to ``buttons=[button]``. If both ``buttons``
and ``button`` are provided, ``buttons`` takes precedence.
buttons: Sequence of one or more DTMF keypad buttons to send. Use
:meth:`from_string` to build this from a string like ``"123#"``.
"""
button: KeypadEntry | None = None
buttons: list[KeypadEntry] | None = None
def __post_init__(self):
super().__post_init__()
if self.buttons is None and self.button is not None:
self.buttons = [self.button]
if not self.buttons:
raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set")
def __str__(self):
return f"{self.name}(tone: {self.button})"
return f"{self.name}(buttons: {self.to_string()})"
@classmethod
def from_string(cls, buttons: str, **kwargs) -> "OutputDTMFFrame":
"""Build an ``OutputDTMFFrame`` from a string of DTMF characters.
Args:
buttons: A string like ``"123#"``. Each character must be a
valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value.
**kwargs: Additional keyword arguments forwarded to the frame
constructor.
Returns:
A frame of type ``cls`` with ``buttons`` populated as a list of
:class:`~pipecat.audio.dtmf.types.KeypadEntry`.
"""
return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
def to_string(self) -> str:
"""Return the frame's ``buttons`` as a dial string.
Returns:
A string such as ``"123#"`` formed by concatenating the values
of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in
``buttons``, or an empty string if ``buttons`` is not set.
"""
return "".join(b.value for b in self.buttons) if self.buttons else ""
#
@@ -878,30 +807,18 @@ class StartFrame(SystemFrame):
Parameters:
audio_in_sample_rate: Input audio sample rate in Hz.
audio_out_sample_rate: Output audio sample rate in Hz.
allow_interruptions: Whether to allow user interruptions.
.. deprecated:: 0.0.99
Use `LLMUserAggregator`'s new `user_mute_strategies` parameter instead.
enable_metrics: Whether to enable performance metrics collection.
enable_tracing: Whether to enable OpenTelemetry tracing.
enable_usage_metrics: Whether to enable usage metrics collection.
interruption_strategies: List of interruption handling strategies.
.. deprecated:: 0.0.99
Use `LLMUserAggregator`'s new `user_turn_strategies` parameter instead.
report_only_initial_ttfb: Whether to report only initial time-to-first-byte.
tracing_context: Pipeline-scoped tracing context for span hierarchy.
"""
audio_in_sample_rate: int = 16000
audio_out_sample_rate: int = 24000
allow_interruptions: bool = False
enable_metrics: bool = False
enable_tracing: bool = False
enable_usage_metrics: bool = False
interruption_strategies: List[BaseInterruptionStrategy] = field(default_factory=list)
report_only_initial_ttfb: bool = False
tracing_context: Optional["TracingContext"] = None
@@ -1010,16 +927,9 @@ class UserStartedSpeakingFrame(SystemFrame):
Emitted when the user turn starts, which usually means that some
transcriptions are already available.
Parameters:
emulated: Whether this event was emulated rather than detected by VAD.
.. deprecated:: 0.0.99
This field is deprecated and will be removed in a future version.
"""
emulated: bool = False
pass
@dataclass
@@ -1028,16 +938,9 @@ class UserStoppedSpeakingFrame(SystemFrame):
Emitted when the user turn ends. This usually coincides with the start of
the bot turn.
Parameters:
emulated: Whether this event was emulated rather than detected by VAD.
.. deprecated:: 0.0.99
This field is deprecated and will be removed in a future version.
"""
emulated: bool = False
pass
@dataclass
@@ -1072,56 +975,6 @@ class UserSpeakingFrame(SystemFrame):
pass
@dataclass
class EmulateUserStartedSpeakingFrame(SystemFrame):
"""Frame to emulate user started speaking behavior.
Emitted by internal processors upstream to emulate VAD behavior when a
user starts speaking.
.. deprecated:: 0.0.99
This frame is deprecated and will be removed in a future version.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"EmulateUserStartedSpeakingFrame is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class EmulateUserStoppedSpeakingFrame(SystemFrame):
"""Frame to emulate user stopped speaking behavior.
Emitted by internal processors upstream to emulate VAD behavior when a
user stops speaking.
.. deprecated:: 0.0.99
This frame is deprecated and will be removed in a future version.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"EmulateUserStoppedSpeakingFrame is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class VADUserStartedSpeakingFrame(SystemFrame):
"""Frame emitted when VAD definitively detects user started speaking.
@@ -1300,7 +1153,6 @@ class UserImageRequestFrame(SystemFrame):
function_name: Name of function that generated this request (if any).
tool_call_id: Tool call ID if generated by function call (if any).
result_callback: Optional callback to invoke when the image is retrieved.
context: [DEPRECATED] Optional context for the image request.
"""
user_id: str
@@ -1310,21 +1162,6 @@ class UserImageRequestFrame(SystemFrame):
function_name: Optional[str] = None
tool_call_id: Optional[str] = None
result_callback: Optional[Any] = None
context: Optional[Any] = None
def __post_init__(self):
super().__post_init__()
if self.context:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"`UserImageRequestFrame` field `context` is deprecated.",
DeprecationWarning,
stacklevel=2,
)
def __str__(self):
return f"{self.name}(user: {self.user_id}, text: {self.text}, append_to_context: {self.append_to_context}, {self.video_source})"
@@ -1435,7 +1272,13 @@ class AssistantImageRawFrame(OutputImageRawFrame):
@dataclass
class InputDTMFFrame(DTMFFrame, SystemFrame):
"""DTMF keypress input frame from transport."""
"""DTMF keypress input frame from transport.
Parameters:
button: The DTMF keypad entry that was pressed.
"""
button: KeypadEntry
def __str__(self):
return f"{self.name}(tone: {self.button.value})"
@@ -1445,12 +1288,52 @@ class InputDTMFFrame(DTMFFrame, SystemFrame):
class OutputDTMFUrgentFrame(DTMFFrame, SystemFrame):
"""DTMF keypress output frame for immediate sending.
A DTMF keypress output that will be sent right away. If your transport
supports multiple dial-out destinations, use the `transport_destination`
field to specify where the DTMF keypress should be sent.
Parameters:
button: Convenience shortcut for sending a single DTMF keypad
entry. Equivalent to ``buttons=[button]``. If both ``buttons``
and ``button`` are provided, ``buttons`` takes precedence.
buttons: Sequence of one or more DTMF keypad buttons to send. Use
:meth:`from_string` to build this from a string like ``"123#"``.
"""
pass
button: KeypadEntry | None = None
buttons: list[KeypadEntry] | None = None
def __post_init__(self):
super().__post_init__()
if self.buttons is None and self.button is not None:
self.buttons = [self.button]
if not self.buttons:
raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set")
def __str__(self):
return f"{self.name}(buttons: {self.to_string()})"
@classmethod
def from_string(cls, buttons: str, **kwargs) -> "OutputDTMFUrgentFrame":
"""Build an ``OutputDTMFUrgentFrame`` from a string of DTMF characters.
Args:
buttons: A string like ``"123#"``. Each character must be a
valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value.
**kwargs: Additional keyword arguments forwarded to the frame
constructor.
Returns:
A frame of type ``cls`` with ``buttons`` populated as a list of
:class:`~pipecat.audio.dtmf.types.KeypadEntry`.
"""
return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
def to_string(self) -> str:
"""Return the frame's ``buttons`` as a dial string.
Returns:
A string such as ``"123#"`` formed by concatenating the values
of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in
``buttons``, or an empty string if ``buttons`` is not set.
"""
return "".join(b.value for b in self.buttons) if self.buttons else ""
@dataclass
@@ -1850,12 +1733,19 @@ class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame):
tool_call_id: Unique identifier for this function call.
arguments: Arguments passed to the function.
cancel_on_interruption: Whether to cancel this call if interrupted.
When ``False`` the call is treated as asynchronous: the LLM
continues the conversation immediately without waiting for the
result, and the result is injected later via a developer message.
group_id: Identifier shared by all function calls originating from the
same LLM response batch. Used to determine when the last call in a
group completes so the LLM can be triggered exactly once.
"""
function_name: str
tool_call_id: str
arguments: Any
cancel_on_interruption: bool = False
group_id: Optional[str] = None
@dataclass

View File

@@ -1,109 +0,0 @@
#
# Copyright (c) 2024-2026, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Observer for measuring user-to-bot response latency.
.. deprecated:: 0.0.102
This module is deprecated. Use :class:`UserBotLatencyObserver` directly
with its ``on_latency_measured`` event handler instead.
"""
import time
import warnings
from statistics import mean
from loguru import logger
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
CancelFrame,
EndFrame,
VADUserStartedSpeakingFrame,
VADUserStoppedSpeakingFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.frame_processor import FrameDirection
class UserBotLatencyLogObserver(BaseObserver):
"""Observer that measures time between user stopping speech and bot starting speech.
This helps measure how quickly the AI services respond by tracking
conversation turn timing and logging latency metrics.
.. deprecated:: 0.0.102
This class is deprecated. Use :class:`UserBotLatencyObserver` directly
with its ``on_latency_measured`` event handler for custom logging.
"""
def __init__(self):
"""Initialize the latency observer.
Sets up tracking for processed frames and user speech timing
to calculate response latencies.
.. deprecated:: 0.0.102
This class is deprecated. Use :class:`UserBotLatencyObserver`
directly with its ``on_latency_measured`` event handler.
"""
warnings.warn(
"UserBotLatencyLogObserver is deprecated and will be removed in a future version. "
"Use UserBotLatencyObserver directly with its on_latency_measured event handler instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__()
self._user_bot_latency_processed_frames = set()
self._user_stopped_time = 0
self._latencies = []
async def on_push_frame(self, data: FramePushed):
"""Process frames to track speech timing and calculate latency.
Args:
data: Frame push event containing the frame and direction information.
"""
# Only process downstream frames
if data.direction != FrameDirection.DOWNSTREAM:
return
# Skip already processed frames
if data.frame.id in self._user_bot_latency_processed_frames:
return
self._user_bot_latency_processed_frames.add(data.frame.id)
if isinstance(data.frame, VADUserStartedSpeakingFrame):
self._user_stopped_time = 0
elif isinstance(data.frame, VADUserStoppedSpeakingFrame):
self._user_stopped_time = data.frame.timestamp - data.frame.stop_secs
elif isinstance(data.frame, (EndFrame, CancelFrame)):
self._log_summary()
elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time:
latency = time.time() - self._user_stopped_time
self._user_stopped_time = 0
self._latencies.append(latency)
self._log_latency(latency)
def _log_summary(self):
if not self._latencies:
return
avg_latency = mean(self._latencies)
min_latency = min(self._latencies)
max_latency = max(self._latencies)
logger.info(
f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING - Avg: {avg_latency:.3f}s, Min: {min_latency:.3f}s, Max: {max_latency:.3f}s"
)
def _log_latency(self, latency: float):
"""Log the latency.
Args:
latency: The latency to log.
"""
logger.debug(
f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING: {latency:.3f}s"
)

View File

@@ -90,7 +90,7 @@ class PipelineRunner(BaseObject):
await self._sig_task
if self._force_gc:
self._gc_collect()
await self._gc_collect()
logger.debug(f"Runner {self} finished running {task}")
@@ -136,8 +136,8 @@ class PipelineRunner(BaseObject):
logger.warning(f"Interruption detected. Cancelling runner {self}")
await self.cancel()
def _gc_collect(self):
async def _gc_collect(self):
"""Force garbage collection and log results."""
collected = gc.collect()
collected = await asyncio.to_thread(gc.collect)
logger.debug(f"Garbage collector: collected {collected} objects.")
logger.debug(f"Garbage collector: uncollectable objects {gc.garbage}")

Some files were not shown because too many files have changed in this diff Show More