Compare commits
222 Commits
filipi/asy
...
ac/daily-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bf63a7f2f | ||
|
|
8d4feede23 | ||
|
|
b11a3bc43f | ||
|
|
f094ce80fb | ||
|
|
9fbe1bf2a3 | ||
|
|
d8b0e78bc8 | ||
|
|
675b7df408 | ||
|
|
30f39d7395 | ||
|
|
fe2ef9c712 | ||
|
|
173cf39aee | ||
|
|
457f55e99a | ||
|
|
f8318289d4 | ||
|
|
958d90819f | ||
|
|
403235eb48 | ||
|
|
698c2ba92e | ||
|
|
f013d5632b | ||
|
|
570849955c | ||
|
|
84b885682f | ||
|
|
989fb4deaa | ||
|
|
ab74605a26 | ||
|
|
49998d252b | ||
|
|
84566c1110 | ||
|
|
45aa95fa10 | ||
|
|
d1f7af0330 | ||
|
|
31b5a64382 | ||
|
|
d20013d7a6 | ||
|
|
804e3ea9ec | ||
|
|
a14d257cf2 | ||
|
|
a8660aabfe | ||
|
|
7dc763d512 | ||
|
|
36b15c92ef | ||
|
|
64ed0aae13 | ||
|
|
be81dac723 | ||
|
|
d942a713af | ||
|
|
e248c4c049 | ||
|
|
1d5dcf1698 | ||
|
|
f45a410f56 | ||
|
|
e38647151d | ||
|
|
1a02b5d61a | ||
|
|
4254c1f0e0 | ||
|
|
f91a113de7 | ||
|
|
e553bb010f | ||
|
|
245339e885 | ||
|
|
812cdc6822 | ||
|
|
153814ecc2 | ||
|
|
b1204cc430 | ||
|
|
c542167065 | ||
|
|
02116c58de | ||
|
|
dcd21e7ff4 | ||
|
|
5356f3028b | ||
|
|
cb2c1868b0 | ||
|
|
dac88c0a47 | ||
|
|
8e5fe8afda | ||
|
|
d07eebff20 | ||
|
|
ef4dcca4f1 | ||
|
|
fc3307bc63 | ||
|
|
da9a55a430 | ||
|
|
094d36904c | ||
|
|
746fadc2b5 | ||
|
|
8cce25d2d2 | ||
|
|
891f00cb5f | ||
|
|
1ca094dad7 | ||
|
|
346c585290 | ||
|
|
c134110399 | ||
|
|
f9117e6d4a | ||
|
|
360e4480e0 | ||
|
|
9b7e15c9bc | ||
|
|
00ea86fda8 | ||
|
|
5f75728207 | ||
|
|
9d274f0fb3 | ||
|
|
43ddbdf1ec | ||
|
|
565349d332 | ||
|
|
2dd1170229 | ||
|
|
5cf90cba98 | ||
|
|
981b7bdcb7 | ||
|
|
c4320e7f07 | ||
|
|
ea0be4d39c | ||
|
|
dca4e1090a | ||
|
|
ec574edd53 | ||
|
|
772fb57090 | ||
|
|
76601944c6 | ||
|
|
178985ec8a | ||
|
|
edc197d050 | ||
|
|
7ece8e3c4a | ||
|
|
7b45a56119 | ||
|
|
a544f885a3 | ||
|
|
375deac912 | ||
|
|
699ca38dc1 | ||
|
|
aeda60f761 | ||
|
|
b010dd58d2 | ||
|
|
225ea907d5 | ||
|
|
1443dfb070 | ||
|
|
4bef85e363 | ||
|
|
0acfb4dd49 | ||
|
|
8594401024 | ||
|
|
aa7a014518 | ||
|
|
27a8a973b1 | ||
|
|
8abda808ca | ||
|
|
7f3f23dcb9 | ||
|
|
be509e5647 | ||
|
|
9f0b18b03d | ||
|
|
6eccd16543 | ||
|
|
d8dc6bc7d0 | ||
|
|
d12a8529e2 | ||
|
|
aa061f7e2c | ||
|
|
e863293198 | ||
|
|
9c7d5a9de2 | ||
|
|
a451c42dc7 | ||
|
|
bc009d8f98 | ||
|
|
67ee802772 | ||
|
|
ceaa27ee6e | ||
|
|
42335e2ef0 | ||
|
|
7585864113 | ||
|
|
18852adc28 | ||
|
|
f11b6d7151 | ||
|
|
9df1e18b43 | ||
|
|
b8f9a21e0c | ||
|
|
c18d997ad8 | ||
|
|
56aaebe1b0 | ||
|
|
916af84974 | ||
|
|
3e911b5fa0 | ||
|
|
7c08779a2f | ||
|
|
988c08a5b6 | ||
|
|
7351298849 | ||
|
|
392134be46 | ||
|
|
9266e1e7ad | ||
|
|
e9eff4626f | ||
|
|
21aa50283e | ||
|
|
70469e3c0c | ||
|
|
6111df947e | ||
|
|
4eebfd65d9 | ||
|
|
c2358b273b | ||
|
|
3a10a528c0 | ||
|
|
f078b8b867 | ||
|
|
5490820338 | ||
|
|
10697636c9 | ||
|
|
e1638a9342 | ||
|
|
bfffefa95c | ||
|
|
fbb49ffc8d | ||
|
|
eace782752 | ||
|
|
b94071d37f | ||
|
|
796a10fe9c | ||
|
|
1ab07d312f | ||
|
|
8adb38f87c | ||
|
|
33f145d70a | ||
|
|
41e46ee69e | ||
|
|
60933b7a56 | ||
|
|
64e09d592e | ||
|
|
883de8ab08 | ||
|
|
793ed8f9e3 | ||
|
|
d8ea33e1a4 | ||
|
|
1d7404ef21 | ||
|
|
dc909e2713 | ||
|
|
e22f9f84bb | ||
|
|
7af72eee3e | ||
|
|
57068f1b38 | ||
|
|
bbb605accc | ||
|
|
929a0e33f4 | ||
|
|
3724ecd378 | ||
|
|
4c8734c5e1 | ||
|
|
283f6df205 | ||
|
|
a29be38f48 | ||
|
|
976c644f90 | ||
|
|
34aa37f395 | ||
|
|
380867a87a | ||
|
|
cc3af59db4 | ||
|
|
f93d13efff | ||
|
|
c28b7e8f26 | ||
|
|
d1a2dee7a1 | ||
|
|
da1a1a59a4 | ||
|
|
134790b17c | ||
|
|
e5aa3bbc20 | ||
|
|
3be0ea05ef | ||
|
|
0c59819682 | ||
|
|
5b67dcd9e7 | ||
|
|
d503383c23 | ||
|
|
fa30268b84 | ||
|
|
2a118084bd | ||
|
|
87e8ed109a | ||
|
|
a5e1bbf4a3 | ||
|
|
f8267f1ea6 | ||
|
|
74acb0b7d0 | ||
|
|
41e3afbc2f | ||
|
|
d4824ffe8a | ||
|
|
2426f80789 | ||
|
|
5ce46df599 | ||
|
|
a6013ba437 | ||
|
|
279ca5a87b | ||
|
|
c6f79592d8 | ||
|
|
e74e497b8d | ||
|
|
d245b79bba | ||
|
|
8a794424dd | ||
|
|
f4743a6c91 | ||
|
|
ba32a48510 | ||
|
|
a9cafa2a3b | ||
|
|
58b1b7249e | ||
|
|
db8e73e5ca | ||
|
|
170f6dfe8b | ||
|
|
c763abc4ae | ||
|
|
197d96fc49 | ||
|
|
c8e9bf77fd | ||
|
|
48b25962e2 | ||
|
|
5d093c9ad7 | ||
|
|
d93f63deb5 | ||
|
|
09a57972f5 | ||
|
|
f83d062df9 | ||
|
|
a2a42b8703 | ||
|
|
e60a72e2d4 | ||
|
|
83f4989a78 | ||
|
|
5d2b288274 | ||
|
|
52ece87ac9 | ||
|
|
bc4bbb1895 | ||
|
|
eb014fffc4 | ||
|
|
e74930b954 | ||
|
|
6ed4109da9 | ||
|
|
53f809b7d5 | ||
|
|
f6a3678f93 | ||
|
|
3af93ed257 | ||
|
|
f37bf989dd | ||
|
|
86a16d53bc | ||
|
|
fa982a05c0 | ||
|
|
419c7d4450 |
@@ -1,30 +0,0 @@
|
||||
# flyctl launch added from .gitignore
|
||||
**/.vscode
|
||||
**/env
|
||||
**/__pycache__
|
||||
**/*~
|
||||
**/venv
|
||||
#*#
|
||||
|
||||
# Distribution / packaging
|
||||
**/.Python
|
||||
**/build
|
||||
**/develop-eggs
|
||||
**/dist
|
||||
**/downloads
|
||||
**/eggs
|
||||
**/.eggs
|
||||
**/lib
|
||||
**/lib64
|
||||
**/parts
|
||||
**/sdist
|
||||
**/var
|
||||
**/wheels
|
||||
**/share/python-wheels
|
||||
**/*.egg-info
|
||||
**/.installed.cfg
|
||||
**/*.egg
|
||||
**/MANIFEST
|
||||
**/.DS_Store
|
||||
**/.env
|
||||
fly.toml
|
||||
2
.github/workflows/python-compatibility.yaml
vendored
2
.github/workflows/python-compatibility.yaml
vendored
@@ -14,7 +14,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ['3.10.19', '3.11.14', '3.12.12', '3.13.12']
|
||||
python-version: ['3.11.15', '3.12.13', '3.13.12', '3.14.3']
|
||||
|
||||
name: Python ${{ matrix.python-version }}
|
||||
steps:
|
||||
|
||||
@@ -11,7 +11,7 @@ build:
|
||||
jobs:
|
||||
post_install:
|
||||
- pip install uv
|
||||
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
|
||||
- UV_PROJECT_ENVIRONMENT=$READTHEDOCS_VIRTUALENV_PATH uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra mlx-whisper
|
||||
|
||||
sphinx:
|
||||
configuration: docs/api/conf.py
|
||||
|
||||
678
CHANGELOG.md
678
CHANGELOG.md
@@ -7,6 +7,684 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
<!-- towncrier release notes start -->
|
||||
|
||||
## [1.0.0] - 2026-04-14
|
||||
|
||||
Migration guide: https://docs.pipecat.ai/pipecat/migration/migration-1.0
|
||||
|
||||
### Added
|
||||
|
||||
- Updated LemonSlice transport:
|
||||
- Added `on_avatar_connected` and `on_avatar_disconnected` events triggered
|
||||
when the avatar joins and leaves the room.
|
||||
- Added `api_url` parameter to `LemonSliceNewSessionRequest` to allow
|
||||
overriding the LemonSlice API endpoint.
|
||||
- Added support for passing arbitrary named parameters to the LemonSlice
|
||||
API endpoint.
|
||||
(PR [#3995](https://github.com/pipecat-ai/pipecat/pull/3995))
|
||||
|
||||
- Added Inworld Realtime LLM service with WebSocket-based cascade STT/LLM/TTS,
|
||||
semantic VAD, function calling, and Router support.
|
||||
(PR [#4140](https://github.com/pipecat-ai/pipecat/pull/4140))
|
||||
|
||||
- ⚠️ Added WebSocket-based `OpenAIResponsesLLMService` as the new default for
|
||||
the OpenAI Responses API. It maintains a persistent connection to
|
||||
`wss://api.openai.com/v1/responses` and automatically uses
|
||||
`previous_response_id` to send only incremental context, falling back to full
|
||||
context on reconnection or cache miss. The previous HTTP-based implementation
|
||||
is now available as `OpenAIResponsesHttpLLMService`.
|
||||
(PR [#4141](https://github.com/pipecat-ai/pipecat/pull/4141))
|
||||
|
||||
- Added `group_parallel_tools` parameter to `LLMService` (default `True`). When
|
||||
`True`, all function calls from the same LLM response batch share a group ID
|
||||
and the LLM is triggered exactly once after the last call completes. Set to
|
||||
`False` to trigger inference independently for each function call result as
|
||||
it arrives.
|
||||
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
|
||||
|
||||
- Added async function call support to `register_function()` and
|
||||
`register_direct_function()` via `cancel_on_interruption=False`. When set to
|
||||
`False`, the LLM continues the conversation immediately without waiting for
|
||||
the function result. The result is injected back into the context as a
|
||||
`developer` message once available, triggering a new LLM inference at that
|
||||
point.
|
||||
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
|
||||
|
||||
- Added `enable_prompt_caching` setting to `AWSBedrockLLMService` for Bedrock
|
||||
ConverseStream prompt caching.
|
||||
(PR [#4219](https://github.com/pipecat-ai/pipecat/pull/4219))
|
||||
|
||||
- Added support for streaming intermediate results from async function calls.
|
||||
Call `result_callback` multiple times with
|
||||
`properties=FunctionCallResultProperties(is_final=False)` to push incremental
|
||||
updates, then call it once more (with `is_final=True`, the default) to
|
||||
deliver the final result. Only valid for functions registered with
|
||||
`cancel_on_interruption=False`.
|
||||
(PR [#4230](https://github.com/pipecat-ai/pipecat/pull/4230))
|
||||
|
||||
- Added `LLMMessagesTransformFrame` to facilitate programmatically editing
|
||||
context in a frame-based way.
|
||||
|
||||
The previous approach required the caller to directly grab a reference to
|
||||
the context object, grab a "snapshot" of its messages _at that point in
|
||||
time_, transform the messages, and then push an `LLMMessagesUpdateFrame` with
|
||||
the transformed messages. This approach can lead to problems: what if there
|
||||
had already been a change to the context queued in the pipeline? The
|
||||
transformed messages would simply overwrite it without consideration.
|
||||
(PR [#4231](https://github.com/pipecat-ai/pipecat/pull/4231))
|
||||
|
||||
- The development runner now exports a module-level `app` FastAPI instance
|
||||
(`from pipecat.runner.run import app`) so you can register custom routes
|
||||
before calling `main()`.
|
||||
(PR [#4234](https://github.com/pipecat-ai/pipecat/pull/4234))
|
||||
|
||||
- `ToolsSchema` now accepts `custom_tools` for OpenAI LLM services
|
||||
(`OpenAILLMService`, `OpenAIResponsesLLMService`,
|
||||
`OpenAIResponsesHttpLLMService`, and `OpenAIRealtimeLLMService`), letting you
|
||||
pass provider-specific tools like `tool_search` alongside standard function
|
||||
tools.
|
||||
(PR [#4248](https://github.com/pipecat-ai/pipecat/pull/4248))
|
||||
|
||||
- Added enhancements to `NvidiaTTSService`:
|
||||
|
||||
- Cross-sentence stitching: multiple sentences within an LLM turn are fed
|
||||
into a single `SynthesizeOnline` gRPC stream for seamless audio across
|
||||
sentence boundaries (requires Magpie TTS model v1.7.0+).
|
||||
- `custom_dictionary` and `encoding` parameters for IPA-based custom
|
||||
pronunciation and output audio encoding.
|
||||
- Metrics generation (`can_generate_metrics` returns true) and
|
||||
`stop_all_metrics()` when an audio context is interrupted.
|
||||
- gRPC error handling around synthesis config retrieval
|
||||
(`GetRivaSynthesisConfig`).
|
||||
(PR [#4249](https://github.com/pipecat-ai/pipecat/pull/4249))
|
||||
|
||||
- Added `MistralTTSService` for streaming text-to-speech using Mistral's
|
||||
Voxtral TTS API (`voxtral-mini-tts-2603`). Supports SSE-based audio streaming
|
||||
with automatic resampling from the API's native 24kHz to any requested sample
|
||||
rate. Requires the `mistral` optional extra (`pip install
|
||||
pipecat-ai[mistral]`).
|
||||
(PR [#4251](https://github.com/pipecat-ai/pipecat/pull/4251))
|
||||
|
||||
- Added `truncate_large_values` parameter to `LLMContext.get_messages()`. When
|
||||
`True`, returns compact deep copies of messages with binary data (base64
|
||||
images, audio) replaced by short placeholders and long string values in
|
||||
LLM-specific messages recursively truncated. Useful for serialization,
|
||||
logging, and debugging tools.
|
||||
(PR [#4272](https://github.com/pipecat-ai/pipecat/pull/4272))
|
||||
|
||||
- `CartesiaSTTService` now supports runtime settings updates (e.g. changing
|
||||
`language` or `model` via `STTUpdateSettingsFrame`). The service
|
||||
automatically reconnects with the new parameters. Previously, settings
|
||||
updates were silently ignored.
|
||||
(PR [#4282](https://github.com/pipecat-ai/pipecat/pull/4282))
|
||||
|
||||
- Added `pcm_32000` and `pcm_48000` sample rate support to ElevenLabs TTS
|
||||
services.
|
||||
(PR [#4293](https://github.com/pipecat-ai/pipecat/pull/4293))
|
||||
|
||||
- Added `enable_logging` parameter to `ElevenLabsHttpTTSService`. Set to
|
||||
`False` to enable zero retention mode (enterprise only).
|
||||
(PR [#4293](https://github.com/pipecat-ai/pipecat/pull/4293))
|
||||
|
||||
### Changed
|
||||
|
||||
- Updated `onnxruntime` from 1.23.2 to 1.24.3, adding support for Python 3.14.
|
||||
(PR [#3984](https://github.com/pipecat-ai/pipecat/pull/3984))
|
||||
|
||||
- MCPClient now requires async with MCPClient(...) as mcp: or explicit
|
||||
start()/close() calls to manage the connection lifecycle.
|
||||
(PR [#4034](https://github.com/pipecat-ai/pipecat/pull/4034))
|
||||
|
||||
- ⚠️ Updated `langchain` extra to require langchain 1.x (from 0.3.x),
|
||||
langchain-community 0.4.x (from 0.3.x), and langchain-openai 1.x (from
|
||||
0.3.x). If you pin these packages in your project, update your pins
|
||||
accordingly.
|
||||
(PR [#4192](https://github.com/pipecat-ai/pipecat/pull/4192))
|
||||
|
||||
- `WebsocketService` reconnection errors are now non-fatal. When a websocket
|
||||
service exhausts its reconnection attempts (either via exponential backoff or
|
||||
quick failure detection), it emits a non-fatal `ErrorFrame` instead of a
|
||||
fatal one. This allows application-level failover (e.g. `ServiceSwitcher`) to
|
||||
handle the failure instead of killing the entire pipeline.
|
||||
(PR [#4201](https://github.com/pipecat-ai/pipecat/pull/4201))
|
||||
|
||||
- Changed `GrokLLMService` default model from `grok-3-beta` to `grok-3`, now
|
||||
that the model is generally available.
|
||||
(PR [#4209](https://github.com/pipecat-ai/pipecat/pull/4209))
|
||||
|
||||
- `GoogleImageGenService` now defaults to `imagen-4.0-generate-001` (previously
|
||||
`imagen-3.0-generate-002`).
|
||||
(PR [#4213](https://github.com/pipecat-ai/pipecat/pull/4213))
|
||||
|
||||
- ⚠️ `BaseOpenAILLMService.get_chat_completions()` now accepts an `LLMContext`
|
||||
instead of `OpenAILLMInvocationParams`. If you override this method, update
|
||||
your signature accordingly.
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- When multiple function calls are returned in a single LLM response, by
|
||||
default (when `group_parallel_tools=True`) the LLM is now triggered exactly
|
||||
once after the last call in the batch completes, rather than waiting for all
|
||||
function calls.
|
||||
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
|
||||
|
||||
- ⚠️ `LLMService.function_call_timeout_secs` now defaults to `None` instead of
|
||||
`10.0`. Deferred function calls will run indefinitely unless a timeout is
|
||||
explicitly set at the service level or per-call. If you relied on the
|
||||
previous 10-second default, pass `function_call_timeout_secs=10.0`
|
||||
explicitly.
|
||||
(PR [#4224](https://github.com/pipecat-ai/pipecat/pull/4224))
|
||||
|
||||
- Updated `NvidiaTTSService`:
|
||||
|
||||
- Made `api_key` optional for local NIM deployments.
|
||||
- Voice, language, and quality can be updated without reconnecting the gRPC
|
||||
client; new values take effect on the next synthesis turn, not for the
|
||||
current turn's in-flight requests.
|
||||
- Replaced per-sentence synchronous `synthesize_online` calls with async
|
||||
queue-backed gRPC streaming.
|
||||
- Streaming now uses asyncio tasks with explicit gRPC cancellation on
|
||||
interruption and stale-response filtering when a stream is aborted or
|
||||
replaced.
|
||||
- Renamed Riva references to Nemotron Speech in docs and messages.
|
||||
- Disabled automatic TTS start frames at the service level
|
||||
(`push_start_frame=False`) and emit `TTSStartedFrame` when a stitched
|
||||
synthesis stream is started for a context.
|
||||
(PR [#4249](https://github.com/pipecat-ai/pipecat/pull/4249))
|
||||
|
||||
### Removed
|
||||
|
||||
- ⚠️ Removed `OpenPipeLLMService` and the `openpipe` extra. OpenPipe was
|
||||
acquired by CoreWeave and the package is no longer maintained. If you were
|
||||
using `openpipe` as an LLM provider, switch to the underlying provider
|
||||
directly (e.g. `openai`). The OpenPipe interface can still be used with
|
||||
`OpenAILLMService` by specifying a `base_url`.
|
||||
(PR [#4191](https://github.com/pipecat-ai/pipecat/pull/4191))
|
||||
|
||||
- ⚠️ Removed `NoisereduceFilter`. Use system-level noise reduction or a
|
||||
service-based alternative instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated `vad_enabled` and `vad_audio_passthrough` transport
|
||||
params.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated `camera_in_enabled`, `camera_in_is_live`,
|
||||
`camera_in_width`, `camera_in_height`, `camera_out_enabled`,
|
||||
`camera_out_is_live`, `camera_out_width`, `camera_out_height`, and
|
||||
`camera_out_color` transport params. Use the `video_in_*` and `video_out_*`
|
||||
equivalents instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `FrameProcessor.wait_for_task()`. Use `create_task()` and manage
|
||||
tasks with the built-in `TaskManager` instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated transport frames: `TransportMessageFrame`,
|
||||
`TransportMessageUrgentFrame`, `InputTransportMessageUrgentFrame`,
|
||||
`DailyTransportMessageFrame`, and `DailyTransportMessageUrgentFrame`. Use
|
||||
`OutputTransportMessageFrame`, `OutputTransportMessageUrgentFrame`,
|
||||
`InputTransportMessageFrame`, `DailyOutputTransportMessageFrame`, and
|
||||
`DailyOutputTransportMessageUrgentFrame` instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `create_default_resampler()` from `pipecat.audio.utils`.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `DailyRunner.configure_with_args()`. Use `PipelineRunner` with
|
||||
`RunnerArguments` instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated `on_pipeline_ended`, `on_pipeline_cancelled`, and
|
||||
`on_pipeline_stopped` events from `PipelineTask`. Use `on_pipeline_finished`
|
||||
instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed single-argument function call support from `LLMService`. Functions
|
||||
must use named parameters instead of a single `arguments` parameter.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `FalSmartTurnAnalyzer` and `LocalSmartTurnAnalyzer`.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `RTVIObserver.errors_enabled` parameter.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated RTVI models, frames, and processor methods including
|
||||
`RTVIConfig`, `RTVIServiceConfig`, `RTVIServiceOptionConfig`, various
|
||||
`RTVI*Data` models, `RTVIActionFrame`, and
|
||||
`RTVIProcessor.handle_function_call`/`handle_function_call_start`. Use the
|
||||
updated RTVI processor API instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated `KeypadEntryFrame` alias.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated interruption frames: `StartInterruptionFrame` and
|
||||
`BotInterruptionFrame`. Use `InterruptionFrame` and `InterruptionTaskFrame`
|
||||
instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `LLMService.request_image_frame()`. Push a `UserImageRequestFrame`
|
||||
instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `TTSService.say()`. Push a `TTSSpeakFrame` into the pipeline
|
||||
instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `KrispFilter`. The `krisp` extra has been removed from
|
||||
`pyproject.toml`.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `AudioBufferProcessor.user_continuous_stream` parameter. Use
|
||||
`user_audio_passthrough` instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed `LLMService.start_callback` parameter. Register an
|
||||
`on_llm_response_start` event handler instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated `observers` field from `PipelineParams`. Pass observers
|
||||
directly to `PipelineTask` constructor instead.
|
||||
(PR [#4204](https://github.com/pipecat-ai/pipecat/pull/4204))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.openai_realtime` package. Use
|
||||
`pipecat.services.openai.realtime` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.google.llm_vertex` module. Use
|
||||
`pipecat.services.google.vertex.llm` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `GoogleLLMOpenAIBetaService` from
|
||||
`pipecat.services.google.openai`. Use `GoogleLLMService` from
|
||||
`pipecat.services.google.llm` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `OpenAIRealtimeBetaLLMService` and
|
||||
`AzureRealtimeBetaLLMService`. Use `OpenAIRealtimeLLMService` and
|
||||
`AzureRealtimeLLMService` from `pipecat.services.openai.realtime` and
|
||||
`pipecat.services.azure.realtime` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.ai_services` module. Import from
|
||||
`pipecat.services.ai_service`, `pipecat.services.llm_service`,
|
||||
`pipecat.services.stt_service`, `pipecat.services.tts_service`, etc. instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.gemini_multimodal_live` package. Use
|
||||
`pipecat.services.google.gemini_live` instead. Note that class names no
|
||||
longer include "Multimodal" (e.g. `GeminiMultimodalLiveLLMService` →
|
||||
`GeminiLiveLLMService`).
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.google.gemini_live.llm_vertex`
|
||||
module. Use `pipecat.services.google.gemini_live.vertex.llm` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.nim` package. Use
|
||||
`pipecat.services.nvidia.llm` instead (`NimLLMService` → `NvidiaLLMService`).
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.deepgram.stt_sagemaker` and
|
||||
`pipecat.services.deepgram.tts_sagemaker` modules. Use
|
||||
`pipecat.services.deepgram.sagemaker.stt` and
|
||||
`pipecat.services.deepgram.sagemaker.tts` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.aws_nova_sonic` package. Use
|
||||
`pipecat.services.aws.nova_sonic` instead.
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.services.riva` package. Use
|
||||
`pipecat.services.nvidia.stt` and `pipecat.services.nvidia.tts` instead
|
||||
(`RivaSTTService` → `NvidiaSTTService`, `RivaTTSService` →
|
||||
`NvidiaTTSService`).
|
||||
(PR [#4208](https://github.com/pipecat-ai/pipecat/pull/4208))
|
||||
|
||||
- ⚠️ Removed deprecated compatibility modules:
|
||||
`pipecat.services.openai_realtime_beta` (use
|
||||
`pipecat.services.openai.realtime`),
|
||||
`pipecat.services.openai_realtime.context`,
|
||||
`pipecat.services.openai_realtime.frames`,
|
||||
`pipecat.services.openai.realtime.context`,
|
||||
`pipecat.services.openai.realtime.frames`,
|
||||
`pipecat.services.gemini_multimodal_live` (use
|
||||
`pipecat.services.google.gemini_live`),
|
||||
`pipecat.services.aws_nova_sonic.context` (use
|
||||
`pipecat.services.aws.nova_sonic`), `pipecat.services.google.openai` and
|
||||
`pipecat.services.google.llm_openai` (use `pipecat.services.google.llm`).
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- ⚠️ Removed `VisionImageFrameAggregator` (from
|
||||
`pipecat.processors.aggregators.vision_image_frame`). Vision/image handling
|
||||
is now built into `LLMContext` (from
|
||||
`pipecat.processors.aggregators.llm_context`). See the `12*` examples for the
|
||||
recommended replacement pattern.
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- ⚠️ Removed `OpenAILLMContext`, `OpenAILLMContextFrame`, and
|
||||
`OpenAILLMContext.from_messages()`. Use `LLMContext` (from
|
||||
`pipecat.processors.aggregators.llm_context`) and `LLMContextFrame` (from
|
||||
`pipecat.frames.frames`) instead. All services now exclusively use the
|
||||
universal `LLMContext`.
|
||||
|
||||
From the developer's point of view, migrating will usually be a matter of
|
||||
going from this:
|
||||
|
||||
```python
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
```
|
||||
|
||||
To this:
|
||||
|
||||
```python
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
```
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- ⚠️ Removed deprecated frame types `LLMMessagesFrame` and
|
||||
`OpenAILLMContextAssistantTimestampFrame` from `pipecat.frames.frames`.
|
||||
Instead of `LLMMessagesFrame`, use `LLMContextFrame` with the new messages,
|
||||
or `LLMMessagesUpdateFrame` with `run_llm=True`.
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- ⚠️ Removed `GatedOpenAILLMContextAggregator` (from
|
||||
`pipecat.processors.aggregators.gated_open_ai_llm_context`). Use
|
||||
`GatedLLMContextAggregator` (from
|
||||
`pipecat.processors.aggregators.gated_llm_context`) instead.
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- ⚠️ Removed deprecated service-specific context and aggregator machinery,
|
||||
which was superseded by the universal `LLMContext` system.
|
||||
|
||||
Service-specific classes removed: `AnthropicLLMContext`,
|
||||
`AnthropicContextAggregatorPair`, `AWSBedrockLLMContext`,
|
||||
`AWSBedrockContextAggregatorPair`, `OpenAIContextAggregatorPair`, and their
|
||||
user/assistant aggregators. Also removed `create_context_aggregator()` from
|
||||
`LLMService`, `OpenAILLMService`, `AnthropicLLMService`, and
|
||||
`AWSBedrockLLMService`.
|
||||
|
||||
Base aggregator classes removed (from
|
||||
`pipecat.processors.aggregators.llm_response`): `BaseLLMResponseAggregator`,
|
||||
`LLMContextResponseAggregator`, `LLMUserContextAggregator`,
|
||||
`LLMAssistantContextAggregator`, `LLMUserResponseAggregator`,
|
||||
`LLMAssistantResponseAggregator`.
|
||||
|
||||
From the developer's point of view, migrating will usually be a matter of
|
||||
going from this:
|
||||
|
||||
```python
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
```
|
||||
|
||||
To this:
|
||||
|
||||
```python
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
```
|
||||
(PR [#4215](https://github.com/pipecat-ai/pipecat/pull/4215))
|
||||
|
||||
- ⚠️ Removed deprecated service parameters and shims that have been replaced by
|
||||
the `settings=Service.Settings(...)` pattern or direct `__init__` parameters:
|
||||
- `PollyTTSService` alias (use `AWSTTSService`)
|
||||
- `TTSService`: `text_aggregator`, `text_filter` init params
|
||||
- `AWSNovaSonicLLMService`: `send_transcription_frames` init param
|
||||
- `DeepgramSTTService`: `url` init param (use `base_url`)
|
||||
- `FishAudioTTSService`: `model` init param (use `reference_id` or
|
||||
`settings`)
|
||||
- `GladiaSTTService`: `language` and `confidence` from `GladiaInputParams`,
|
||||
`InputParams` class alias
|
||||
- `GeminiTTSService`: `api_key` init param
|
||||
- `GeminiLiveLLMService`: `base_url` init param (use `http_options`)
|
||||
- `GoogleVertexLLMService`: `InputParams` class with
|
||||
`location`/`project_id` fields (use direct init params); `project_id` is now
|
||||
required, `location` defaults to `"us-east4"`
|
||||
- `MiniMaxHttpTTSService`: `english_normalization` from `InputParams` (use
|
||||
`text_normalization`)
|
||||
- `SimliVideoService`: `simli_config` init param (use `api_key`/`face_id`),
|
||||
`use_turn_server` init param; `api_key` and `face_id` are now required
|
||||
- `AnthropicLLMService`: `enable_prompt_caching_beta` from `InputParams`
|
||||
(use `enable_prompt_caching`)
|
||||
(PR [#4220](https://github.com/pipecat-ai/pipecat/pull/4220))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.transports.services` and
|
||||
`pipecat.transports.network` module aliases. Update imports to use
|
||||
`pipecat.transports.daily.transport`, `pipecat.transports.livekit.transport`,
|
||||
`pipecat.transports.websocket.*`, `pipecat.transports.webrtc.*`, and
|
||||
`pipecat.transports.daily.utils` respectively.
|
||||
(PR [#4225](https://github.com/pipecat-ai/pipecat/pull/4225))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.sync` package. Use `pipecat.utils.sync`
|
||||
instead.
|
||||
(PR [#4225](https://github.com/pipecat-ai/pipecat/pull/4225))
|
||||
|
||||
- ⚠️ Removed deprecated `TranscriptionMessage`, `ThoughtTranscriptionMessage`,
|
||||
and `TranscriptionUpdateFrame` from `pipecat.frames.frames`.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `allow_interruptions` parameter from `PipelineParams`,
|
||||
`StartFrame`, and `FrameProcessor`. Interruptions are now always allowed by
|
||||
default. Use `LLMUserAggregator`'s `user_turn_strategies` /
|
||||
`user_mute_strategies` parameters to control interruption behavior.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `STTMuteFilter`, `STTMuteConfig`, and `STTMuteStrategy`
|
||||
from `pipecat.processors.filters.stt_mute_filter`. Use
|
||||
`pipecat.turns.user_mute` strategies with `LLMUserAggregator`'s
|
||||
`user_mute_strategies` parameter instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.processors.transcript_processor` module
|
||||
(`TranscriptProcessor`, `TranscriptProcessorConfig`). Use pipeline observers
|
||||
instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `EmulateUserStartedSpeakingFrame` and
|
||||
`EmulateUserStoppedSpeakingFrame` frames, and the `emulated` field from
|
||||
`UserStartedSpeakingFrame` / `UserStoppedSpeakingFrame`.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `interruption_strategies` parameter from
|
||||
`PipelineParams`, `StartFrame`, and `FrameProcessor`. Use
|
||||
`LLMUserAggregator`'s `user_turn_strategies` parameter instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.audio.interruptions` module
|
||||
(`BaseInterruptionStrategy`, `MinWordsInterruptionStrategy`). Use
|
||||
`pipecat.turns.user_start.MinWordsUserTurnStartStrategy` with
|
||||
`LLMUserAggregator`'s `user_turn_strategies` parameter instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `pipecat.utils.tracing.class_decorators` module. Use
|
||||
`pipecat.utils.tracing.service_decorators` instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `add_pattern_pair` method from `PatternPairAggregator`.
|
||||
Use `add_pattern` instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed deprecated `UserResponseAggregator` class from
|
||||
`pipecat.processors.aggregators.user_response`. Use `LLMUserAggregator`
|
||||
instead.
|
||||
(PR [#4228](https://github.com/pipecat-ai/pipecat/pull/4228))
|
||||
|
||||
- ⚠️ Removed `ExternalUserTurnStrategies` and the automatic fallback to it in
|
||||
`LLMUserAggregator` when a `SpeechControlParamsFrame` was received from the
|
||||
transport.
|
||||
(PR [#4229](https://github.com/pipecat-ai/pipecat/pull/4229))
|
||||
|
||||
- ⚠️ Removed `vad_analyzer` and `turn_analyzer` parameters from
|
||||
`TransportParams` and all transport input classes, along with all deprecated
|
||||
VAD/turn analysis logic in `BaseInputTransport`. VAD and turn detection are
|
||||
now handled entirely by `LLMUserAggregator`.
|
||||
(PR [#4229](https://github.com/pipecat-ai/pipecat/pull/4229))
|
||||
|
||||
- ⚠️ Removed deprecated `TranscriptionUserTurnStopStrategy` alias (deprecated
|
||||
in 0.0.102). Use `SpeechTimeoutUserTurnStopStrategy` instead.
|
||||
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
|
||||
|
||||
- ⚠️ Removed deprecated `vad_events` setting and `should_interrupt` parameter
|
||||
from `DeepgramSTTService` (deprecated in 0.0.99). Use Silero VAD for voice
|
||||
activity detection instead.
|
||||
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
|
||||
|
||||
- ⚠️ Removed deprecated `send_transcription_frames` parameter from
|
||||
`OpenAIRealtimeLLMService` (deprecated in 0.0.92). Transcription frames are
|
||||
always sent.
|
||||
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
|
||||
|
||||
- ⚠️ Removed deprecated `UserIdleProcessor` (deprecated in 0.0.100). Use
|
||||
`LLMUserAggregator` with the `user_idle_timeout` parameter instead.
|
||||
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
|
||||
|
||||
- ⚠️ Removed deprecated `UserBotLatencyLogObserver` (deprecated in 0.0.102).
|
||||
Use `UserBotLatencyObserver` with its `on_latency_measured` event handler
|
||||
instead.
|
||||
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
|
||||
|
||||
- ⚠️ Removed the `riva` install extra. Use `nvidia` instead (`pip install
|
||||
"pipecat-ai[nvidia]"`).
|
||||
(PR [#4235](https://github.com/pipecat-ai/pipecat/pull/4235))
|
||||
|
||||
- Removed the empty `remote-smart-turn` install extra (was already a no-op).
|
||||
(PR [#4235](https://github.com/pipecat-ai/pipecat/pull/4235))
|
||||
|
||||
- ⚠️ Removed `DeprecatedModuleProxy` and all service `__init__.py` re-export
|
||||
shims. Flat imports like `from pipecat.services.openai import
|
||||
OpenAILLMService` no longer work. Use the full submodule path instead: `from
|
||||
pipecat.services.openai.llm import OpenAILLMService`. This is already the
|
||||
established pattern across all examples and internal code.
|
||||
(PR [#4239](https://github.com/pipecat-ai/pipecat/pull/4239))
|
||||
|
||||
- ⚠️ Removed deprecated `PIPECAT_OBSERVER_FILES` environment variable support.
|
||||
Use `PIPECAT_SETUP_FILES` instead.
|
||||
(PR [#4267](https://github.com/pipecat-ai/pipecat/pull/4267))
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fixed `IdleFrameProcessor` where `asyncio.Event` was unconditionally cleared
|
||||
in a `finally` block instead of only on the success path.
|
||||
(PR [#3796](https://github.com/pipecat-ai/pipecat/pull/3796))
|
||||
|
||||
- Fixed MCPClient opening a new connection for every tool call instead of
|
||||
reusing the session.
|
||||
(PR [#4034](https://github.com/pipecat-ai/pipecat/pull/4034))
|
||||
|
||||
- GoogleLLMService now applies a low-latency thinking default
|
||||
(`thinking_level="minimal"`) for Gemini 3+ Flash models.
|
||||
(PR [#4067](https://github.com/pipecat-ai/pipecat/pull/4067))
|
||||
|
||||
- Fixed `WebsocketService` entering an infinite reconnection loop when a server
|
||||
accepts the WebSocket handshake but immediately closes the connection (e.g.
|
||||
invalid API key, close code 1008). The service now detects connections that
|
||||
fail repeatedly within seconds of being established and stops retrying after
|
||||
3 consecutive quick failures.
|
||||
(PR [#4201](https://github.com/pipecat-ai/pipecat/pull/4201))
|
||||
|
||||
- Fixed `InworldHttpTTSService` streaming responses crashing with
|
||||
`UnicodeDecodeError` when multi-byte UTF-8 characters were split across chunk
|
||||
boundaries. This caused TTS audio to cut off mid-sentence intermittently.
|
||||
(PR [#4202](https://github.com/pipecat-ai/pipecat/pull/4202))
|
||||
|
||||
- Fixed a crash (`JSONDecodeError`) when a user interruption occurs while the
|
||||
LLM is streaming function call arguments. Previously, the incomplete JSON
|
||||
arguments were passed directly to `json.loads()`, causing an unhandled
|
||||
exception. Affected services: OpenAI, Google (OpenAI-compatible), and
|
||||
SambaNova.
|
||||
(PR [#4203](https://github.com/pipecat-ai/pipecat/pull/4203))
|
||||
|
||||
- Fixed `BaseOutputTransport` discarding pending `UninterruptibleFrame` items
|
||||
(e.g. function-call context updates) when an interruption arrived. The audio
|
||||
task is now kept alive and only interruptible frames are drained when
|
||||
uninterruptible frames are present in the queue.
|
||||
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
|
||||
|
||||
- Fixed spurious LLM inference being triggered when a function call result
|
||||
arrived while the user was actively speaking. The context frame is now
|
||||
suppressed until the user stops speaking.
|
||||
(PR [#4217](https://github.com/pipecat-ai/pipecat/pull/4217))
|
||||
|
||||
- Fixed `CartesiaTTSService` failing with "Context has closed" errors when
|
||||
switching voice, model, or language via `TTSUpdateSettingsFrame`. The service
|
||||
now automatically flushes the current audio context and opens a fresh one
|
||||
when these settings change.
|
||||
(PR [#4220](https://github.com/pipecat-ai/pipecat/pull/4220))
|
||||
|
||||
- Fixed duplicate LLM replies that could occur when multiple async function
|
||||
call results arrived while an LLM request was already queued.
|
||||
(PR [#4230](https://github.com/pipecat-ai/pipecat/pull/4230))
|
||||
|
||||
- Fixed undefined `_warn_deprecated_param` calls in `OpenAIRealtimeLLMService`
|
||||
and `GrokRealtimeLLMService` for the deprecated `session_properties` init
|
||||
parameter.
|
||||
(PR [#4232](https://github.com/pipecat-ai/pipecat/pull/4232))
|
||||
|
||||
- Fixed Gemini Live bot hanging after a session resumption reconnect. Audio,
|
||||
video, and text input were silently dropped after reconnecting because the
|
||||
internal `_ready_for_realtime_input` flag was not being reset.
|
||||
(PR [#4242](https://github.com/pipecat-ai/pipecat/pull/4242))
|
||||
|
||||
- Fixed `VADController` getting stuck in the `SPEAKING` state when audio frames
|
||||
stop arriving mid-speech (e.g. user mutes mic). A new `audio_idle_timeout`
|
||||
parameter (default 1s, set to 0 to disable) forces a transition back to
|
||||
`QUIET` and emits `on_speech_stopped` when no audio is received while
|
||||
speaking.
|
||||
(PR [#4244](https://github.com/pipecat-ai/pipecat/pull/4244))
|
||||
|
||||
- Fixed `PipelineRunner._gc_collect()` blocking the event loop by running
|
||||
`gc.collect()` synchronously. Now offloaded via `asyncio.to_thread` to avoid
|
||||
stalling concurrent pipeline tasks.
|
||||
(PR [#4255](https://github.com/pipecat-ai/pipecat/pull/4255))
|
||||
|
||||
- Fixed `ElevenLabsTTSService` incorrectly enabling `auto_mode` when using
|
||||
`TextAggregationMode.TOKEN`. Auto mode disables server-side buffering and is
|
||||
designed for complete sentences — enabling it with token streaming degraded
|
||||
speech quality. The default is now derived automatically from the aggregation
|
||||
strategy: `auto_mode=True` for `SENTENCE`, `auto_mode=False` for `TOKEN`.
|
||||
Callers can still override by passing `auto_mode` explicitly.
|
||||
(PR [#4265](https://github.com/pipecat-ai/pipecat/pull/4265))
|
||||
|
||||
- Fixed `ValueError: write to closed file` during pipeline shutdown when
|
||||
observers were active. Observer proxy tasks are now cancelled before observer
|
||||
resources are cleaned up.
|
||||
(PR [#4267](https://github.com/pipecat-ai/pipecat/pull/4267))
|
||||
|
||||
- Fixed delayed turn completion when STT transcripts arrive after the p99
|
||||
timeout. Previously, a late transcript (beyond the p99 window) would fall
|
||||
through to the 5-second `user_turn_stop_timeout` fallback. Now the turn stop
|
||||
triggers immediately when the late transcript arrives.
|
||||
(PR [#4283](https://github.com/pipecat-ai/pipecat/pull/4283))
|
||||
|
||||
- Fixed `ElevenLabsTTSService` ignoring `enable_logging=False` and
|
||||
`enable_ssml_parsing=False`. The truthy check treated `False` the same as
|
||||
`None` (both skipped), and Python's `str(False)` produced `"False"` instead
|
||||
of the lowercase `"false"` expected by the API.
|
||||
(PR [#4293](https://github.com/pipecat-ai/pipecat/pull/4293))
|
||||
|
||||
- Fixed `on_assistant_turn_stopped` not resetting internal state when the LLM
|
||||
returned no text tokens. Added `interrupted` field to
|
||||
`AssistantTurnStoppedMessage` to indicate whether the assistant turn was
|
||||
interrupted.
|
||||
(PR [#4294](https://github.com/pipecat-ai/pipecat/pull/4294))
|
||||
|
||||
- Fixed `LLMContextSummarizer` failing with "No messages to summarize" when
|
||||
using `system_instruction` instead of a system-role message at the start of
|
||||
the context. The summarizer previously scanned the entire context for the
|
||||
first system message, which could match a mid-conversation injection (e.g.
|
||||
idle notifications) instead of the initial prompt, causing the summarization
|
||||
range to be empty.
|
||||
(PR [#4295](https://github.com/pipecat-ai/pipecat/pull/4295))
|
||||
|
||||
## [0.0.108] - 2026-03-27
|
||||
|
||||
### Added
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
# Changelog
|
||||
|
||||
All notable changes to the **<project name>** SDK will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
Please make sure to add your changes to the appropriate categories:
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
|
||||
<!-- for new functionality -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Changed
|
||||
|
||||
<!-- for changed functionality -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Deprecated
|
||||
|
||||
<!-- for soon-to-be removed functionality -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Removed
|
||||
|
||||
<!-- for removed functionality -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Fixed
|
||||
|
||||
<!-- for fixed bugs -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Performance
|
||||
|
||||
<!-- for performance-relevant changes -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Security
|
||||
|
||||
<!-- for security-relevant changes -->
|
||||
|
||||
- n/a
|
||||
|
||||
### Other
|
||||
|
||||
<!-- for everything else -->
|
||||
|
||||
- n/a
|
||||
|
||||
## [0.1.0] - YYYY-MM-DD
|
||||
|
||||
Initial release.
|
||||
@@ -79,7 +79,7 @@ Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.yout
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/simple-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/simple-chatbot/image.png" width="400" /></a>
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/storytelling-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/storytelling-chatbot/image.png" width="400" /></a>
|
||||
<br/>
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/translation-chatbot"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/translation-chatbot/image.png" width="400" /></a>
|
||||
<a href="https://github.com/pipecat-ai/pipecat-examples/tree/main/daily-multi-translation"><img src="https://raw.githubusercontent.com/pipecat-ai/pipecat-examples/main/daily-multi-translation/image.png" width="400" /></a>
|
||||
<a href="https://github.com/pipecat-ai/pipecat/blob/main/examples/vision/vision-moondream.py"><img src="https://github.com/pipecat-ai/pipecat/blob/main/examples/assets/moondream.png" width="400" /></a>
|
||||
</p>
|
||||
|
||||
@@ -149,8 +149,8 @@ You can get started with Pipecat running on your local machine, then move your a
|
||||
|
||||
### Prerequisites
|
||||
|
||||
**Minimum Python Version:** 3.10
|
||||
**Recommended Python Version:** 3.12
|
||||
**Minimum Python Version:** 3.11
|
||||
**Recommended Python Version:** >= 3.12
|
||||
|
||||
### Setup Steps
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Added WebSocket-based `OpenAIResponsesLLMService` as the new default for the OpenAI Responses API. It maintains a persistent connection to `wss://api.openai.com/v1/responses` and automatically uses `previous_response_id` to send only incremental context, falling back to full context on reconnection or cache miss. The previous HTTP-based implementation is now available as `OpenAIResponsesHttpLLMService`.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `OpenPipeLLMService` and the `openpipe` extra. OpenPipe was acquired by CoreWeave and the package is no longer maintained. If you were using `openpipe` as an LLM provider, switch to the underlying provider directly (e.g. `openai`). The OpenPipe interface can still be used with `OpenAILLMService` by specifying a `base_url`.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Updated `langchain` extra to require langchain 1.x (from 0.3.x), langchain-community 0.4.x (from 0.3.x), and langchain-openai 1.x (from 0.3.x). If you pin these packages in your project, update your pins accordingly.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed `InworldHttpTTSService` streaming responses crashing with `UnicodeDecodeError` when multi-byte UTF-8 characters were split across chunk boundaries. This caused TTS audio to cut off mid-sentence intermittently.
|
||||
@@ -1 +0,0 @@
|
||||
- Fixed a crash (`JSONDecodeError`) when a user interruption occurs while the LLM is streaming function call arguments. Previously, the incomplete JSON arguments were passed directly to `json.loads()`, causing an unhandled exception. Affected services: OpenAI, Google (OpenAI-compatible), and SambaNova.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `observers` field from `PipelineParams`. Pass observers directly to `PipelineTask` constructor instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `on_pipeline_ended`, `on_pipeline_cancelled`, and `on_pipeline_stopped` events from `PipelineTask`. Use `on_pipeline_finished` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `AudioBufferProcessor.user_continuous_stream` parameter. Use `user_audio_passthrough` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `camera_in_enabled`, `camera_in_is_live`, `camera_in_width`, `camera_in_height`, `camera_out_enabled`, `camera_out_is_live`, `camera_out_width`, `camera_out_height`, and `camera_out_color` transport params. Use the `video_in_*` and `video_out_*` equivalents instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `RTVIObserver.errors_enabled` parameter.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `vad_enabled` and `vad_audio_passthrough` transport params.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `TTSService.say()`. Push a `TTSSpeakFrame` into the pipeline instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `DailyRunner.configure_with_args()`. Use `PipelineRunner` with `RunnerArguments` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated RTVI models, frames, and processor methods including `RTVIConfig`, `RTVIServiceConfig`, `RTVIServiceOptionConfig`, various `RTVI*Data` models, `RTVIActionFrame`, and `RTVIProcessor.handle_function_call`/`handle_function_call_start`. Use the updated RTVI processor API instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `FrameProcessor.wait_for_task()`. Use `create_task()` and manage tasks with the built-in `TaskManager` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `KrispFilter`. The `krisp` extra has been removed from `pyproject.toml`.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `LLMService.request_image_frame()`. Push a `UserImageRequestFrame` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `create_default_resampler()` from `pipecat.audio.utils`.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `FalSmartTurnAnalyzer` and `LocalSmartTurnAnalyzer`.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated transport frames: `TransportMessageFrame`, `TransportMessageUrgentFrame`, `InputTransportMessageUrgentFrame`, `DailyTransportMessageFrame`, and `DailyTransportMessageUrgentFrame`. Use `OutputTransportMessageFrame`, `OutputTransportMessageUrgentFrame`, `InputTransportMessageFrame`, `DailyOutputTransportMessageFrame`, and `DailyOutputTransportMessageUrgentFrame` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `KeypadEntryFrame` alias.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated interruption frames: `StartInterruptionFrame` and `BotInterruptionFrame`. Use `InterruptionFrame` and `InterruptionTaskFrame` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `LLMService.start_callback` parameter. Register an `on_llm_response_start` event handler instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed single-argument function call support from `LLMService`. Functions must use named parameters instead of a single `arguments` parameter.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `NoisereduceFilter`. Use system-level noise reduction or a service-based alternative instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.riva` package. Use `pipecat.services.nvidia.stt` and `pipecat.services.nvidia.tts` instead (`RivaSTTService` → `NvidiaSTTService`, `RivaTTSService` → `NvidiaTTSService`).
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.nim` package. Use `pipecat.services.nvidia.llm` instead (`NimLLMService` → `NvidiaLLMService`).
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.gemini_multimodal_live` package. Use `pipecat.services.google.gemini_live` instead. Note that class names no longer include "Multimodal" (e.g. `GeminiMultimodalLiveLLMService` → `GeminiLiveLLMService`).
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.aws_nova_sonic` package. Use `pipecat.services.aws.nova_sonic` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.openai_realtime` package. Use `pipecat.services.openai.realtime` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `OpenAIRealtimeBetaLLMService` and `AzureRealtimeBetaLLMService`. Use `OpenAIRealtimeLLMService` and `AzureRealtimeLLMService` from `pipecat.services.openai.realtime` and `pipecat.services.azure.realtime` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.deepgram.stt_sagemaker` and `pipecat.services.deepgram.tts_sagemaker` modules. Use `pipecat.services.deepgram.sagemaker.stt` and `pipecat.services.deepgram.sagemaker.tts` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `GoogleLLMOpenAIBetaService` from `pipecat.services.google.openai`. Use `GoogleLLMService` from `pipecat.services.google.llm` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.google.llm_vertex` module. Use `pipecat.services.google.vertex.llm` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.google.gemini_live.llm_vertex` module. Use `pipecat.services.google.gemini_live.vertex.llm` instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated `pipecat.services.ai_services` module. Import from `pipecat.services.ai_service`, `pipecat.services.llm_service`, `pipecat.services.stt_service`, `pipecat.services.tts_service`, etc. instead.
|
||||
@@ -1 +0,0 @@
|
||||
- Changed `GrokLLMService` default model from `grok-3-beta` to `grok-3`, now that the model is generally available.
|
||||
@@ -1 +0,0 @@
|
||||
- `GoogleImageGenService` now defaults to `imagen-4.0-generate-001` (previously `imagen-3.0-generate-002`).
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ `BaseOpenAILLMService.get_chat_completions()` now accepts an `LLMContext` instead of `OpenAILLMInvocationParams`. If you override this method, update your signature accordingly.
|
||||
@@ -1,22 +0,0 @@
|
||||
- ⚠️ Removed deprecated service-specific context and aggregator machinery, which was superseded by the universal `LLMContext` system.
|
||||
|
||||
Service-specific classes removed: `AnthropicLLMContext`, `AnthropicContextAggregatorPair`, `AWSBedrockLLMContext`, `AWSBedrockContextAggregatorPair`, `OpenAIContextAggregatorPair`, and their user/assistant aggregators. Also removed `create_context_aggregator()` from `LLMService`, `OpenAILLMService`, `AnthropicLLMService`, and `AWSBedrockLLMService`.
|
||||
|
||||
Base aggregator classes removed (from `pipecat.processors.aggregators.llm_response`): `BaseLLMResponseAggregator`, `LLMContextResponseAggregator`, `LLMUserContextAggregator`, `LLMAssistantContextAggregator`, `LLMUserResponseAggregator`, `LLMAssistantResponseAggregator`.
|
||||
|
||||
From the developer's point of view, migrating will usually be a matter of going from this:
|
||||
|
||||
```python
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
```
|
||||
|
||||
To this:
|
||||
|
||||
```python
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
```
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated frame types `LLMMessagesFrame` and `OpenAILLMContextAssistantTimestampFrame` from `pipecat.frames.frames`. Instead of `LLMMessagesFrame`, use `LLMContextFrame` with the new messages, or `LLMMessagesUpdateFrame` with `run_llm=True`.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `GatedOpenAILLMContextAggregator` (from `pipecat.processors.aggregators.gated_open_ai_llm_context`). Use `GatedLLMContextAggregator` (from `pipecat.processors.aggregators.gated_llm_context`) instead.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed `VisionImageFrameAggregator` (from `pipecat.processors.aggregators.vision_image_frame`). Vision/image handling is now built into `LLMContext` (from `pipecat.processors.aggregators.llm_context`). See the `12*` examples for the recommended replacement pattern.
|
||||
@@ -1 +0,0 @@
|
||||
- ⚠️ Removed deprecated compatibility modules: `pipecat.services.openai_realtime_beta` (use `pipecat.services.openai.realtime`), `pipecat.services.openai_realtime.context`, `pipecat.services.openai_realtime.frames`, `pipecat.services.openai.realtime.context`, `pipecat.services.openai.realtime.frames`, `pipecat.services.gemini_multimodal_live` (use `pipecat.services.google.gemini_live`), `pipecat.services.aws_nova_sonic.context` (use `pipecat.services.aws.nova_sonic`), `pipecat.services.google.openai` and `pipecat.services.google.llm_openai` (use `pipecat.services.google.llm`).
|
||||
@@ -1,18 +0,0 @@
|
||||
- ⚠️ Removed `OpenAILLMContext`, `OpenAILLMContextFrame`, and `OpenAILLMContext.from_messages()`. Use `LLMContext` (from `pipecat.processors.aggregators.llm_context`) and `LLMContextFrame` (from `pipecat.frames.frames`) instead. All services now exclusively use the universal `LLMContext`.
|
||||
|
||||
From the developer's point of view, migrating will usually be a matter of going from this:
|
||||
|
||||
```python
|
||||
context = OpenAILLMContext(messages, tools)
|
||||
context_aggregator = llm.create_context_aggregator(context)
|
||||
```
|
||||
|
||||
To this:
|
||||
|
||||
```python
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
|
||||
|
||||
context = LLMContext(messages, tools)
|
||||
context_aggregator = LLMContextAggregatorPair(context)
|
||||
```
|
||||
1
changelog/4313.added.2.md
Normal file
1
changelog/4313.added.2.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `buttons` field to `OutputDTMFFrame` and `OutputDTMFUrgentFrame` for sending multi-key DTMF sequences as a `list[KeypadEntry]`. Use `OutputDTMFFrame.from_string("123#")` (or the equivalent on `OutputDTMFUrgentFrame`) to build one from a dial string, and `to_string()` to convert back.
|
||||
1
changelog/4313.added.3.md
Normal file
1
changelog/4313.added.3.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `DailyOutputDTMFFrame` and `DailyOutputDTMFUrgentFrame` frames. In addition to the inherited `buttons`, they accept `session_id`, `digit_duration_ms` and `method`, which are forwarded to Daily's `send_dtmf` as `sessionId`, `digitDurationMs` and `method`.
|
||||
1
changelog/4313.added.md
Normal file
1
changelog/4313.added.md
Normal file
@@ -0,0 +1 @@
|
||||
- Added `DailyTransport.send_dtmf()` to expose the Daily call client's DTMF sending capability, enabling applications to send tones during a call (e.g. IVR navigation).
|
||||
@@ -1,108 +1,60 @@
|
||||
# Pipecat Documentation
|
||||
# Pipecat API Documentation
|
||||
|
||||
This directory contains the source files for auto-generating Pipecat's server API reference documentation.
|
||||
|
||||
## Setup
|
||||
|
||||
1. Install documentation dependencies:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. Make the build scripts executable:
|
||||
|
||||
```bash
|
||||
chmod +x build-docs.sh rtd-test.py
|
||||
```
|
||||
This directory contains the source files for auto-generating Pipecat's API reference documentation.
|
||||
|
||||
## Building Documentation
|
||||
|
||||
From this directory, you can build the documentation in several ways:
|
||||
|
||||
### Local Build
|
||||
From this directory:
|
||||
|
||||
```bash
|
||||
# Using the build script (automatically opens docs when done)
|
||||
./build-docs.sh
|
||||
# Build docs (warnings shown but don't fail the build)
|
||||
cd docs/api && uv run ./build-docs.sh
|
||||
|
||||
# Or directly with sphinx-build
|
||||
sphinx-build -b html . _build/html -W --keep-going
|
||||
# Build with strict mode (warnings treated as errors)
|
||||
cd docs/api && uv run ./build-docs.sh --strict
|
||||
```
|
||||
|
||||
### ReadTheDocs Test Build
|
||||
The build script will:
|
||||
|
||||
To test the documentation build process exactly as it would run on ReadTheDocs:
|
||||
|
||||
```bash
|
||||
./rtd-test.py
|
||||
```
|
||||
|
||||
This script:
|
||||
|
||||
- Creates a fresh virtual environment
|
||||
- Installs all dependencies as specified in requirements files
|
||||
- Handles conflicting dependencies (like grpcio versions for Riva)
|
||||
- Builds the documentation in an isolated environment
|
||||
- Provides detailed logging of the build process
|
||||
|
||||
Use this script to verify your documentation will build correctly on ReadTheDocs before pushing changes.
|
||||
|
||||
## Viewing Documentation
|
||||
|
||||
The built documentation will be available at `_build/html/index.html`. To open:
|
||||
|
||||
```bash
|
||||
# On MacOS
|
||||
open _build/html/index.html
|
||||
|
||||
# On Linux
|
||||
xdg-open _build/html/index.html
|
||||
|
||||
# On Windows
|
||||
start _build/html/index.html
|
||||
```
|
||||
1. Install documentation dependencies via `uv sync --group docs`
|
||||
2. Clean previous build output
|
||||
3. Run `sphinx-build` to generate HTML documentation
|
||||
4. Open the result in your browser (macOS)
|
||||
|
||||
## Directory Structure
|
||||
|
||||
```
|
||||
.
|
||||
├── api/ # Auto-generated API documentation
|
||||
├── _build/ # Built documentation
|
||||
├── _static/ # Static files (images, css, etc.)
|
||||
├── conf.py # Sphinx configuration
|
||||
├── api/ # Auto-generated API documentation (created during build)
|
||||
├── _build/ # Built documentation output
|
||||
├── conf.py # Sphinx configuration (mock imports, extensions, etc.)
|
||||
├── index.rst # Main documentation entry point
|
||||
├── requirements-base.txt # Base documentation dependencies
|
||||
├── requirements-riva.txt # Riva-specific dependencies
|
||||
├── build-docs.sh # Local build script
|
||||
└── rtd-test.py # ReadTheDocs test build script
|
||||
└── rtd-test.sh # ReadTheDocs test build script (uses pip, not uv)
|
||||
```
|
||||
|
||||
## Notes
|
||||
## How It Works
|
||||
|
||||
- Documentation is auto-generated from Python docstrings
|
||||
- Service modules are automatically detected and included
|
||||
- The build process matches our ReadTheDocs configuration
|
||||
- Warnings are treated as errors (-W flag) to maintain consistency
|
||||
- The --keep-going flag ensures all errors are reported
|
||||
- Dependencies are split into multiple requirements files to handle version conflicts
|
||||
- `conf.py` runs `sphinx-apidoc` during Sphinx's `setup()` phase to generate `.rst` files from Python source
|
||||
- Sphinx autodoc imports each module to extract docstrings
|
||||
- Modules with unavailable dependencies are listed in `autodoc_mock_imports` in `conf.py`
|
||||
- Napoleon extension converts Google-style docstrings to reStructuredText
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If you encounter missing service modules:
|
||||
**Module not appearing in docs:**
|
||||
|
||||
1. Verify the service is installed with its extras: `pip install pipecat-ai[service-name]`
|
||||
2. Check the build logs for import errors
|
||||
3. Ensure the service module is properly initialized in the package
|
||||
4. Run `./rtd-test.py` to test in an isolated environment matching ReadTheDocs
|
||||
1. Check the build output for `autodoc: failed to import` warnings
|
||||
2. If the module has an unresolvable import dependency, add it to `autodoc_mock_imports` in `conf.py`
|
||||
3. Verify the module is importable: `uv run python -c "import pipecat.module.name"`
|
||||
|
||||
For dependency conflicts:
|
||||
**Duplicate object warnings:**
|
||||
|
||||
1. Check the requirements files for version specifications
|
||||
2. Use `rtd-test.py` to verify dependency resolution
|
||||
3. Consider adding service-specific requirements files if needed
|
||||
These come from re-export modules or Sphinx discovering the same class through multiple import paths. Usually cosmetic.
|
||||
|
||||
For more information:
|
||||
**Docstring formatting warnings:**
|
||||
|
||||
- [ReadTheDocs Configuration](.readthedocs.yaml)
|
||||
- [Sphinx Documentation](https://www.sphinx-doc.org/)
|
||||
Docstrings use reStructuredText, not Markdown. Common issues:
|
||||
- Use `Example::` with indented code blocks, not `` ```python ``
|
||||
- Ensure blank lines between directive content and subsequent sections
|
||||
- Use `Parameters:` (not `Attributes:`) for dataclass field documentation to avoid duplicate entries
|
||||
|
||||
@@ -1,8 +1,16 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Usage: ./build-docs.sh [--strict]
|
||||
# --strict: Treat warnings as errors (default: warnings only)
|
||||
|
||||
SPHINX_OPTS=""
|
||||
if [ "$1" = "--strict" ]; then
|
||||
SPHINX_OPTS="-W --keep-going"
|
||||
fi
|
||||
|
||||
# Build docs using uv
|
||||
echo "Installing dependencies with uv..."
|
||||
uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra riva --no-extra mlx-whisper
|
||||
uv sync --group docs --all-extras --no-extra gstreamer --no-extra local_smart_turn --no-extra moondream --no-extra mlx-whisper
|
||||
|
||||
# Check if sphinx-build is available
|
||||
if ! uv run sphinx-build --version &> /dev/null; then
|
||||
@@ -14,8 +22,7 @@ fi
|
||||
rm -rf _build
|
||||
|
||||
echo "Building documentation..."
|
||||
# Build docs matching ReadTheDocs configuration
|
||||
uv run sphinx-build -b html -d _build/doctrees . _build/html -W --keep-going
|
||||
uv run sphinx-build -b html -d _build/doctrees . _build/html $SPHINX_OPTS
|
||||
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "Documentation built successfully!"
|
||||
|
||||
@@ -4,6 +4,19 @@ import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Fix Pydantic v2 + Sphinx autodoc incompatibility: ConfigDict(extra="allow") fails
|
||||
# during Sphinx's import because __pydantic_extra__ annotation on BaseModel resolves to
|
||||
# `Dict[str, Any] | None` whose get_origin() is Union, not dict. Patch the check to
|
||||
# accept Union-wrapped dict types (i.e., Optional[Dict[str, Any]]).
|
||||
import pydantic._internal._generate_schema as _pydantic_gs
|
||||
|
||||
_ORIG_DICT_TYPES = _pydantic_gs.DICT_TYPES
|
||||
# Expand the accepted types to include Union (Optional[Dict[str, Any]])
|
||||
import types
|
||||
import typing
|
||||
|
||||
_pydantic_gs.DICT_TYPES = [*_ORIG_DICT_TYPES, typing.Union, types.UnionType]
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
|
||||
logger = logging.getLogger("sphinx-build")
|
||||
@@ -76,16 +89,6 @@ autodoc_mock_imports = [
|
||||
"einops",
|
||||
"intel_extension_for_pytorch",
|
||||
"huggingface_hub",
|
||||
# riva dependencies
|
||||
"riva",
|
||||
"riva.client",
|
||||
"riva.client.Auth",
|
||||
"riva.client.ASRService",
|
||||
"riva.client.StreamingRecognitionConfig",
|
||||
"riva.client.RecognitionConfig",
|
||||
"riva.client.AudioEncoding",
|
||||
"riva.client.proto.riva_tts_pb2",
|
||||
"riva.client.SpeechSynthesisService",
|
||||
# MLX dependencies (Apple Silicon specific)
|
||||
"mlx",
|
||||
"mlx_whisper", # Note: might need underscore format too
|
||||
@@ -107,6 +110,8 @@ autodoc_mock_imports = [
|
||||
"fastapi.middleware",
|
||||
"fastapi.responses",
|
||||
"uvicorn",
|
||||
# Deepgram dependencies
|
||||
"deepgram",
|
||||
]
|
||||
|
||||
# HTML output settings
|
||||
@@ -133,6 +138,8 @@ def import_core_modules():
|
||||
"pipecat.runner",
|
||||
"pipecat.serializers",
|
||||
"pipecat.transcriptions",
|
||||
"pipecat.turns",
|
||||
"pipecat.extensions",
|
||||
"pipecat.utils",
|
||||
]
|
||||
|
||||
@@ -177,7 +184,6 @@ def setup(app):
|
||||
logger.info(f"Source directory: {source_dir}")
|
||||
|
||||
excludes = [
|
||||
str(project_root / "src/pipecat/pipeline/to_be_updated"),
|
||||
str(project_root / "src/pipecat/examples"),
|
||||
str(project_root / "src/pipecat/tests"),
|
||||
"**/test_*.py",
|
||||
|
||||
@@ -32,4 +32,5 @@ Quick Links
|
||||
Services <api/pipecat.services>
|
||||
Transcriptions <api/pipecat.transcriptions>
|
||||
Transports <api/pipecat.transports>
|
||||
Turns <api/pipecat.turns>
|
||||
Utils <api/pipecat.utils>
|
||||
|
||||
@@ -34,7 +34,7 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
load_dotenv(override=True)
|
||||
|
||||
OFFICE_SOUND_FILE = os.path.join(
|
||||
os.path.dirname(__file__), "assets", "office-ambience-24000-mono.mp3"
|
||||
os.path.dirname(__file__), "../assets", "office-ambience-24000-mono.mp3"
|
||||
)
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
|
||||
@@ -36,7 +36,7 @@ from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.google import GoogleLLMService
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
|
||||
@@ -45,7 +45,7 @@ from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.frames.frames import LLMRunFrame, TTSUpdateSettingsFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -54,6 +54,7 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.aggregators.llm_text_processor import LLMTextProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
@@ -100,39 +101,43 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# Create pattern pair aggregator for voice switching
|
||||
pattern_aggregator = PatternPairAggregator()
|
||||
llm_text_aggregator = PatternPairAggregator()
|
||||
|
||||
# Add pattern for voice switching
|
||||
pattern_aggregator.add_pattern(
|
||||
llm_text_aggregator.add_pattern(
|
||||
type="voice",
|
||||
start_pattern="<voice>",
|
||||
end_pattern="</voice>",
|
||||
action=MatchAction.REMOVE, # Remove tags from final text
|
||||
action=MatchAction.AGGREGATE,
|
||||
)
|
||||
|
||||
# Register handler for voice switching
|
||||
async def on_voice_tag(match: PatternMatch):
|
||||
voice_name = match.text.strip().lower()
|
||||
if voice_name in VOICE_IDS:
|
||||
# First flush any existing audio to finish the current context
|
||||
await tts.flush_audio()
|
||||
# Then set the new voice
|
||||
await tts.set_voice(VOICE_IDS[voice_name])
|
||||
await llm_text_processor.push_frame(
|
||||
TTSUpdateSettingsFrame(
|
||||
delta=CartesiaTTSService.Settings(voice=VOICE_IDS[voice_name])
|
||||
)
|
||||
)
|
||||
logger.info(f"Switched to {voice_name} voice")
|
||||
else:
|
||||
logger.warning(f"Unknown voice: {voice_name}")
|
||||
|
||||
pattern_aggregator.on_pattern_match("voice", on_voice_tag)
|
||||
llm_text_aggregator.on_pattern_match("voice", on_voice_tag)
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
# Process LLM text through the pattern aggregator before TTS
|
||||
llm_text_processor = LLMTextProcessor(text_aggregator=llm_text_aggregator)
|
||||
|
||||
# Initialize TTS with narrator voice as default
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice=VOICE_IDS["narrator"],
|
||||
),
|
||||
text_aggregator=pattern_aggregator,
|
||||
skip_aggregator_types=["voice"], # Skip voice tags in TTS speech
|
||||
)
|
||||
|
||||
# System prompt for storytelling with voice switching
|
||||
@@ -204,7 +209,8 @@ Remember: Use narrator voice for EVERYTHING except the actual quoted dialogue.""
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts, # TTS with pattern aggregator
|
||||
llm_text_processor,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
|
||||
@@ -0,0 +1,210 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example: async function call with intermediate updates.
|
||||
|
||||
The ``track_current_location`` tool simulates a GPS tracker reporting the
|
||||
device's position during a road trip from San Francisco to San Diego. It
|
||||
sends two intermediate updates (via ``params.result_callback`` with
|
||||
``is_final=False``) as the vehicle passes through cities along the way, then
|
||||
delivers the final destination (via ``params.result_callback``). Each update
|
||||
returns the same structure with a different city:
|
||||
|
||||
Update 1 – {gps, city: "San Francisco"} ← trip start
|
||||
Update 2 – {gps, city: "Los Angeles"} ← passing through
|
||||
Final – {gps, city: "San Diego"} ← destination reached
|
||||
|
||||
Because the function is registered with ``cancel_on_interruption=False``, the
|
||||
LLM can keep talking while the trip is in progress; each position update
|
||||
arrives as a developer message so the LLM can narrate the journey to the user.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
FunctionCallResultProperties,
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def track_current_location(params: FunctionCallParams):
|
||||
"""Simulate a GPS tracker reporting position during a road trip.
|
||||
|
||||
Step 1 – San Francisco (trip start) (update)
|
||||
Step 2 – Los Angeles (passing through) (update)
|
||||
Step 3 – San Diego (destination) (final result)
|
||||
"""
|
||||
|
||||
# First update: initial city estimate.
|
||||
gps = {"lat": 37.7310, "lng": -122.4527}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "San Francisco"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Second update: revised city estimate.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 33.96003, "lng": -118.40639}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "Los Angeles"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Final result: confirmed city.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 32.743569, "lng": -117.20466}
|
||||
await params.result_callback({"gps": gps, "city": "San Diego"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=AnthropicLLMService.Settings(
|
||||
system_instruction=(
|
||||
"You are a helpful assistant in a voice conversation. "
|
||||
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
|
||||
"formatting that can't be spoken. "
|
||||
"You have access to a function that starts tracking the user's location and "
|
||||
"provides regular updates on it. When you receive the final location, tell the user "
|
||||
"the destination has been reached."
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# cancel_on_interruption=False makes this an async function call: the LLM
|
||||
# continues the conversation immediately and receives updates/result later.
|
||||
llm.register_function(
|
||||
"track_current_location",
|
||||
track_current_location,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
location_function = FunctionSchema(
|
||||
name="track_current_location",
|
||||
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
|
||||
properties={},
|
||||
required=[],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[location_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
180
examples/function-calling/function-calling-anthropic-async.py
Normal file
180
examples/function-calling/function-calling-anthropic-async.py
Normal file
@@ -0,0 +1,180 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
# Simulate a long-running API call, so we can test async function calls (cancel_on_interruption=False).
|
||||
await asyncio.sleep(20)
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=AnthropicLLMService.Settings(
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(
|
||||
"get_current_weather",
|
||||
fetch_weather_from_api,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,214 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example: async function call with intermediate updates.
|
||||
|
||||
The ``track_current_location`` tool simulates a GPS tracker reporting the
|
||||
device's position during a road trip from San Francisco to San Diego. It
|
||||
sends two intermediate updates (via ``params.result_callback`` with
|
||||
``is_final=False``) as the vehicle passes through cities along the way, then
|
||||
delivers the final destination (via ``params.result_callback``). Each update
|
||||
returns the same structure with a different city:
|
||||
|
||||
Update 1 – {gps, city: "San Francisco"} ← trip start
|
||||
Update 2 – {gps, city: "Los Angeles"} ← passing through
|
||||
Final – {gps, city: "San Diego"} ← destination reached
|
||||
|
||||
Because the function is registered with ``cancel_on_interruption=False``, the
|
||||
LLM can keep talking while the trip is in progress; each position update
|
||||
arrives as a developer message so the LLM can narrate the journey to the user.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
FunctionCallResultProperties,
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def track_current_location(params: FunctionCallParams):
|
||||
"""Simulate a GPS tracker reporting position during a road trip.
|
||||
|
||||
Step 1 – San Francisco (trip start) (update)
|
||||
Step 2 – Los Angeles (passing through) (update)
|
||||
Step 3 – San Diego (destination) (final result)
|
||||
"""
|
||||
|
||||
# First update: initial city estimate.
|
||||
gps = {"lat": 37.7310, "lng": -122.4527}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "San Francisco"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Second update: revised city estimate.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 33.96003, "lng": -118.40639}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "Los Angeles"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Final result: confirmed city.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 32.743569, "lng": -117.20466}
|
||||
await params.result_callback({"gps": gps, "city": "San Diego"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
llm = GoogleLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=GoogleLLMService.Settings(
|
||||
system_instruction=(
|
||||
"You are a helpful assistant in a voice conversation. "
|
||||
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
|
||||
"formatting that can't be spoken. "
|
||||
"You have access to a function that starts tracking the user's location and "
|
||||
"provides regular updates on it. When you receive the final location, tell the user "
|
||||
"the destination has been reached."
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# cancel_on_interruption=False makes this an async function call: the LLM
|
||||
# continues the conversation immediately and receives updates/result later.
|
||||
llm.register_function(
|
||||
"track_current_location",
|
||||
track_current_location,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
location_function = FunctionSchema(
|
||||
name="track_current_location",
|
||||
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
|
||||
properties={},
|
||||
required=[],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[location_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
256
examples/function-calling/function-calling-google-async.py
Normal file
256
examples/function-calling/function-calling-google-async.py
Normal file
@@ -0,0 +1,256 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame, UserImageRequestFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import (
|
||||
create_transport,
|
||||
get_transport_client_id,
|
||||
maybe_capture_participant_camera,
|
||||
)
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.google.llm import GoogleLLMService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def get_weather(params: FunctionCallParams):
|
||||
# Simulate a long-running API call, so we can test async function calls (cancel_on_interruption=False).
|
||||
await asyncio.sleep(20)
|
||||
location = params.arguments["location"]
|
||||
await params.result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
async def get_image(params: FunctionCallParams):
|
||||
"""Fetch the user image and push it to the LLM.
|
||||
|
||||
When called, this function pushes a UserImageRequestFrame upstream to the
|
||||
transport. As a result, the transport will request the user image and push a
|
||||
UserImageRawFrame downstream which will be added to the context by the LLM
|
||||
assistant aggregator. The result_callback will be invoked once the image is
|
||||
retrieved and processed.
|
||||
"""
|
||||
user_id = params.arguments["user_id"]
|
||||
question = params.arguments["question"]
|
||||
logger.debug(f"Requesting image with user_id={user_id}, question={question}")
|
||||
|
||||
# Request a user image frame and indicate that it should be added to the
|
||||
# context. Also associate it to the function call. Pass the result_callback
|
||||
# so it can be invoked when the image is actually retrieved.
|
||||
await params.llm.push_frame(
|
||||
UserImageRequestFrame(
|
||||
user_id=user_id,
|
||||
text=question,
|
||||
append_to_context=True,
|
||||
function_name=params.function_name,
|
||||
tool_call_id=params.tool_call_id,
|
||||
result_callback=params.result_callback,
|
||||
),
|
||||
FrameDirection.UPSTREAM,
|
||||
)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_in_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_in_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = """\
|
||||
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
|
||||
|
||||
Your response will be turned into speech so use only simple words and punctuation.
|
||||
|
||||
You have access to three tools: get_weather, get_restaurant_recommendation, and get_image.
|
||||
|
||||
You can respond to questions about the weather using the get_weather tool.
|
||||
|
||||
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
|
||||
indicate you should use the get_image tool are:
|
||||
- What do you see?
|
||||
- What's in the video?
|
||||
- Can you describe the video?
|
||||
- Tell me about what you see.
|
||||
- Tell me something interesting about what you see.
|
||||
- What's happening in the video?
|
||||
"""
|
||||
|
||||
llm = GoogleLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=GoogleLLMService.Settings(
|
||||
system_instruction=system_prompt,
|
||||
),
|
||||
)
|
||||
llm.register_function("get_weather", get_weather, cancel_on_interruption=False, timeout_secs=30)
|
||||
llm.register_function("get_image", get_image)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
get_image_function = FunctionSchema(
|
||||
name="get_image",
|
||||
description="Called when the user requests a description of their camera feed",
|
||||
properties={
|
||||
"user_id": {
|
||||
"type": "string",
|
||||
"description": "The ID of the user to grab the image from",
|
||||
},
|
||||
"question": {
|
||||
"type": "string",
|
||||
"description": "The question that the user is asking about the image",
|
||||
},
|
||||
},
|
||||
required=["user_id", "question"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, get_image_function, restaurant_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
|
||||
await maybe_capture_participant_camera(transport, client)
|
||||
|
||||
client_id = get_transport_client_id(transport, client)
|
||||
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{
|
||||
"role": "developer",
|
||||
"content": f"Please introduce yourself to the user. Use '{client_id}' as the user ID during function calls.",
|
||||
}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,214 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example: async function call with intermediate updates.
|
||||
|
||||
The ``track_current_location`` tool simulates a GPS tracker reporting the
|
||||
device's position during a road trip from San Francisco to San Diego. It
|
||||
sends two intermediate updates (via ``params.result_callback`` with
|
||||
``is_final=False``) as the vehicle passes through cities along the way, then
|
||||
delivers the final destination (via ``params.result_callback``). Each update
|
||||
returns the same structure with a different city:
|
||||
|
||||
Update 1 – {gps, city: "San Francisco"} ← trip start
|
||||
Update 2 – {gps, city: "Los Angeles"} ← passing through
|
||||
Final – {gps, city: "San Diego"} ← destination reached
|
||||
|
||||
Because the function is registered with ``cancel_on_interruption=False``, the
|
||||
LLM can keep talking while the trip is in progress; each position update
|
||||
arrives as a developer message so the LLM can narrate the journey to the user.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
FunctionCallResultProperties,
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def track_current_location(params: FunctionCallParams):
|
||||
"""Simulate a GPS tracker reporting position during a road trip.
|
||||
|
||||
Step 1 – San Francisco (trip start) (update)
|
||||
Step 2 – Los Angeles (passing through) (update)
|
||||
Step 3 – San Diego (destination) (final result)
|
||||
"""
|
||||
|
||||
# First update: initial city estimate.
|
||||
gps = {"lat": 37.7310, "lng": -122.4527}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "San Francisco"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Second update: revised city estimate.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 33.96003, "lng": -118.40639}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "Los Angeles"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Final result: confirmed city.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 32.743569, "lng": -117.20466}
|
||||
await params.result_callback({"gps": gps, "city": "San Diego"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=OpenAILLMService.Settings(
|
||||
system_instruction=(
|
||||
"You are a helpful assistant in a voice conversation. "
|
||||
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
|
||||
"formatting that can't be spoken. "
|
||||
"You have access to a function that starts tracking the user's location and "
|
||||
"provides regular updates on it. When you receive the final location, tell the user "
|
||||
"the destination has been reached."
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# cancel_on_interruption=False makes this an async function call: the LLM
|
||||
# continues the conversation immediately and receives updates/result later.
|
||||
llm.register_function(
|
||||
"track_current_location",
|
||||
track_current_location,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
location_function = FunctionSchema(
|
||||
name="track_current_location",
|
||||
description="Start tracking the user's current GPS location, reporting position updates until the user reaches their destination.",
|
||||
properties={},
|
||||
required=[],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[location_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
198
examples/function-calling/function-calling-openai-async.py
Normal file
198
examples/function-calling/function-calling-openai-async.py
Normal file
@@ -0,0 +1,198 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.openai.stt import OpenAISTTService
|
||||
from pipecat.services.openai.tts import OpenAITTSService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
# Simulate a long-running API call, so we can test async function calls.
|
||||
await asyncio.sleep(20)
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = OpenAISTTService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
settings=OpenAISTTService.Settings(
|
||||
model="gpt-4o-transcribe",
|
||||
prompt="Expect words related weather, such as temperature and conditions. And restaurant names.",
|
||||
),
|
||||
)
|
||||
|
||||
tts = OpenAITTSService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
settings=OpenAITTSService.Settings(
|
||||
voice="ballad",
|
||||
),
|
||||
instructions="Please speak clearly and at a moderate pace.",
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=OpenAILLMService.Settings(
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(
|
||||
"get_current_weather",
|
||||
fetch_weather_from_api,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Let me check on that."))
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,211 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Example: async function call with intermediate updates.
|
||||
|
||||
The ``track_current_location`` tool simulates a GPS tracker reporting the
|
||||
device's position during a road trip from San Francisco to San Diego. It
|
||||
sends two intermediate updates (via ``params.result_callback`` with
|
||||
``is_final=False``) as the vehicle passes through cities along the way, then
|
||||
delivers the final destination (via ``params.result_callback``). Each update returns the same structure with a
|
||||
different city:
|
||||
|
||||
Update 1 – {gps, city: "San Francisco"} ← trip start
|
||||
Update 2 – {gps, city: "Los Angeles"} ← passing through
|
||||
Final – {gps, city: "San Diego"} ← destination reached
|
||||
|
||||
Because the function is registered with ``cancel_on_interruption=False``, the
|
||||
LLM can keep talking while the trip is in progress; each position update
|
||||
arrives as a developer message so the LLM can narrate the journey to the user.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
FunctionCallResultProperties,
|
||||
LLMRunFrame,
|
||||
TTSSpeakFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def track_current_location(params: FunctionCallParams):
|
||||
"""Simulate a GPS tracker reporting position during a road trip.
|
||||
|
||||
Step 1 – San Francisco (trip start) (update)
|
||||
Step 2 – Los Angeles (passing through) (update)
|
||||
Step 3 – San Diego (destination) (final result)
|
||||
"""
|
||||
|
||||
# First update: initial city estimate.
|
||||
gps = {"lat": 37.7310, "lng": -122.4527}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "San Francisco"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Second update: revised city estimate.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 33.96003, "lng": -118.40639}
|
||||
await params.result_callback(
|
||||
{"gps": gps, "city": "Los Angeles"},
|
||||
properties=FunctionCallResultProperties(is_final=False),
|
||||
)
|
||||
|
||||
# Final result: confirmed city.
|
||||
await asyncio.sleep(10)
|
||||
gps = {"lat": 32.743569, "lng": -117.20466}
|
||||
await params.result_callback({"gps": gps, "city": "San Diego"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAIResponsesLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=OpenAIResponsesLLMService.Settings(
|
||||
system_instruction=(
|
||||
"You are a helpful assistant in a voice conversation. "
|
||||
"Your responses will be spoken aloud, so avoid emojis, bullet points, or other "
|
||||
"formatting that can't be spoken. "
|
||||
"You have access to a function that starts tracking a moving device's location and "
|
||||
"provides regular updates on it. When you receive the final location, tell the user "
|
||||
"the destination has been reached and announce the final city."
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
# cancel_on_interruption=False makes this an async function call: the LLM
|
||||
# continues the conversation immediately and receives updates/result later.
|
||||
llm.register_function(
|
||||
"track_current_location",
|
||||
track_current_location,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
await tts.queue_frame(TTSSpeakFrame("Sure, tracking your location now."))
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
location_function = FunctionSchema(
|
||||
name="track_current_location",
|
||||
description="Track the device's current GPS location during a road trip, reporting position updates as the vehicle moves through cities until it reaches the final destination.",
|
||||
properties={},
|
||||
required=[],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[location_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -0,0 +1,197 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame, TTSSpeakFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.llm_service import FunctionCallParams
|
||||
from pipecat.services.openai.responses.llm import OpenAIResponsesLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
async def fetch_weather_from_api(params: FunctionCallParams):
|
||||
# Simulate a long-running API call, so we can test async function calls.
|
||||
await asyncio.sleep(20)
|
||||
await params.result_callback({"conditions": "nice", "temperature": "75"})
|
||||
|
||||
|
||||
async def fetch_restaurant_recommendation(params: FunctionCallParams):
|
||||
await params.result_callback({"name": "The Golden Dragon"})
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAIResponsesLLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
enable_async_tool_cancellation=True,
|
||||
settings=OpenAIResponsesLLMService.Settings(
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
|
||||
# You can also register a function_name of None to get all functions
|
||||
# sent to the same callback with an additional function_name parameter.
|
||||
llm.register_function(
|
||||
"get_current_weather",
|
||||
fetch_weather_from_api,
|
||||
cancel_on_interruption=False,
|
||||
timeout_secs=30,
|
||||
)
|
||||
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
|
||||
|
||||
@llm.event_handler("on_connection_error")
|
||||
async def on_connection_error(service, error):
|
||||
logger.error(f"LLM connection error: {error}")
|
||||
|
||||
@llm.event_handler("on_function_calls_started")
|
||||
async def on_function_calls_started(service, function_calls):
|
||||
# Avoid appending this filler message to the LLM context — it would
|
||||
# alter the conversation history and prevent
|
||||
# OpenAIResponsesLLMService's previous_response_id optimization from
|
||||
# matching, forcing a full context resend.
|
||||
await tts.queue_frame(TTSSpeakFrame("Let me check on that.", append_to_context=False))
|
||||
|
||||
@llm.event_handler("on_function_calls_cancelled")
|
||||
async def on_function_calls_cancelled(service, function_calls):
|
||||
for item in function_calls:
|
||||
logger.info(f"Function call cancelled: {item.function_name} [{item.tool_call_id}]")
|
||||
|
||||
weather_function = FunctionSchema(
|
||||
name="get_current_weather",
|
||||
description="Get the current weather",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
"format": {
|
||||
"type": "string",
|
||||
"enum": ["celsius", "fahrenheit"],
|
||||
"description": "The temperature unit to use. Infer this from the user's location.",
|
||||
},
|
||||
},
|
||||
required=["location", "format"],
|
||||
)
|
||||
restaurant_function = FunctionSchema(
|
||||
name="get_restaurant_recommendation",
|
||||
description="Get a restaurant recommendation",
|
||||
properties={
|
||||
"location": {
|
||||
"type": "string",
|
||||
"description": "The city and state, e.g. San Francisco, CA",
|
||||
},
|
||||
},
|
||||
required=["location"],
|
||||
)
|
||||
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
stt,
|
||||
user_aggregator,
|
||||
llm,
|
||||
tts,
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -5,27 +5,17 @@
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from mcp import StdioServerParameters
|
||||
from mcp.client.session_group import StreamableHttpParameters
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
LLMRunFrame,
|
||||
URLImageRawFrame,
|
||||
)
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -34,7 +24,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
@@ -47,66 +36,16 @@ from pipecat.transports.daily.transport import DailyParams
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class UrlToImageProcessor(FrameProcessor):
|
||||
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aiohttp_session = aiohttp_session
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, FunctionCallResultFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
image_url = self.extract_url(frame.result)
|
||||
if image_url:
|
||||
await self.run_image_process(image_url)
|
||||
# sometimes we get multiple image urls- process 1 at a time
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def extract_url(self, text: str):
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if "artObject" in data:
|
||||
return data["artObject"]["webImage"]["url"]
|
||||
if "artworks" in data and len(data["artworks"]):
|
||||
return data["artworks"][0]["webImage"]["url"]
|
||||
except (json.JSONDecodeError, KeyError, TypeError):
|
||||
pass
|
||||
|
||||
async def run_image_process(self, image_url: str):
|
||||
try:
|
||||
logger.debug(f"handling image from url: '{image_url}'")
|
||||
async with self._aiohttp_session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
image = image.convert("RGB")
|
||||
frame = URLImageRawFrame(
|
||||
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
error_msg = f"Error handling image url {image_url}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=1024,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=1024,
|
||||
),
|
||||
}
|
||||
|
||||
@@ -114,85 +53,70 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# Create an HTTP session for API calls
|
||||
async with aiohttp.ClientSession() as session:
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = f"""
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to memory tools that let you store and recall information,
|
||||
and tools to answer questions about the user's GitHub repositories and account.
|
||||
Offer to remember things for the user, like their name, preferences, or anything they'd like.
|
||||
You can also recall things you've previously stored.
|
||||
You can also offer to answer users questions about their GitHub repositories and account.
|
||||
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
settings=AnthropicLLMService.Settings(
|
||||
system_instruction=system_prompt,
|
||||
),
|
||||
)
|
||||
|
||||
async with (
|
||||
# https://github.com/modelcontextprotocol/servers/tree/main/src/memory
|
||||
MCPClient(
|
||||
server_params=StdioServerParameters(
|
||||
command=shutil.which("npx"),
|
||||
args=["-y", "@modelcontextprotocol/server-memory"],
|
||||
# env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = f"""
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to tools to search the Rijksmuseum collection and the user's GitHub repositories and account.
|
||||
Offer, for example, to show a floral still life, use the `search_artwork` tool.
|
||||
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
|
||||
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
|
||||
You can also offer to answer users questions about their GitHub repositories and account.
|
||||
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
settings=AnthropicLLMService.Settings(
|
||||
system_instruction=system_prompt,
|
||||
) as memory_mcp,
|
||||
# Github MCP docs: https://github.com/github/github-mcp-server
|
||||
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
|
||||
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
|
||||
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
|
||||
MCPClient(
|
||||
server_params=StreamableHttpParameters(
|
||||
url="https://api.githubcopilot.com/mcp/",
|
||||
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
|
||||
),
|
||||
)
|
||||
) as github_mcp,
|
||||
):
|
||||
memory_tools = await memory_mcp.register_tools(llm)
|
||||
github_tools = await github_mcp.register_tools(llm)
|
||||
|
||||
try:
|
||||
rijksmuseum_mcp = MCPClient(
|
||||
server_params=StdioServerParameters(
|
||||
command=shutil.which("npx"),
|
||||
# https://github.com/r-huijts/rijksmuseum-mcp
|
||||
args=["-y", "mcp-server-rijksmuseum"],
|
||||
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up rijksmuseum mcp")
|
||||
logger.exception("error trace:")
|
||||
try:
|
||||
# Github MCP docs: https://github.com/github/github-mcp-server
|
||||
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
|
||||
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
|
||||
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
|
||||
github_mcp = MCPClient(
|
||||
server_params=StreamableHttpParameters(
|
||||
url="https://api.githubcopilot.com/mcp/",
|
||||
headers={
|
||||
"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"
|
||||
},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp.run")
|
||||
logger.exception("error trace:")
|
||||
|
||||
rijksmuseum_tools = {}
|
||||
github_tools = {}
|
||||
try:
|
||||
rijksmuseum_tools = await rijksmuseum_mcp.register_tools(llm)
|
||||
github_tools = await github_mcp.register_tools(llm)
|
||||
except Exception as e:
|
||||
logger.error(f"error registering tools")
|
||||
logger.exception("error trace:")
|
||||
|
||||
all_standard_tools = rijksmuseum_tools.standard_tools + github_tools.standard_tools
|
||||
all_standard_tools = memory_tools.standard_tools + github_tools.standard_tools
|
||||
all_tools = ToolsSchema(standard_tools=all_standard_tools)
|
||||
|
||||
context = LLMContext(tools=all_tools)
|
||||
context = LLMContext(
|
||||
messages=[{"role": "user", "content": "Please introduce yourself."}],
|
||||
tools=all_tools,
|
||||
)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
mcp_image_processor = UrlToImageProcessor(aiohttp_session=session)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
@@ -201,7 +125,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
mcp_image_processor, # URL image -> output
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
@@ -239,10 +162,8 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("RIJKSMUSEUM_API_KEY") or not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
|
||||
logger.error(
|
||||
f"Please set `RIJKSMUSEUM_API_KEY` and `GITHUB_PERSONAL_ACCESS_TOKEN` environment variables. See https://github.com/r-huijts/rijksmuseum-mcp."
|
||||
)
|
||||
if not os.getenv("GITHUB_PERSONAL_ACCESS_TOKEN"):
|
||||
logger.error(f"Please set `GITHUB_PERSONAL_ACCESS_TOKEN` environment variable.")
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
@@ -4,26 +4,15 @@
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
import asyncio
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
||||
import aiohttp
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
from mcp import StdioServerParameters
|
||||
from PIL import Image
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import (
|
||||
Frame,
|
||||
FunctionCallResultFrame,
|
||||
LLMRunFrame,
|
||||
URLImageRawFrame,
|
||||
)
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
@@ -32,7 +21,6 @@ from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.anthropic.llm import AnthropicLLMService
|
||||
@@ -44,86 +32,16 @@ from pipecat.transports.daily.transport import DailyParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
class UrlToImageProcessor(FrameProcessor):
|
||||
def __init__(self, aiohttp_session: aiohttp.ClientSession, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._aiohttp_session = aiohttp_session
|
||||
|
||||
async def process_frame(self, frame: Frame, direction: FrameDirection):
|
||||
await super().process_frame(frame, direction)
|
||||
|
||||
if isinstance(frame, FunctionCallResultFrame):
|
||||
await self.push_frame(frame, direction)
|
||||
image_url = self.extract_url(frame.result)
|
||||
if image_url:
|
||||
await self.run_image_process(image_url)
|
||||
# sometimes we get multiple image urls- process 1 at a time
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
await self.push_frame(frame, direction)
|
||||
|
||||
def extract_url(self, text: str):
|
||||
try:
|
||||
data = json.loads(text)
|
||||
if "artObject" in data:
|
||||
return data["artObject"]["webImage"]["url"]
|
||||
if "artworks" in data and len(data["artworks"]):
|
||||
return data["artworks"][0]["webImage"]["url"]
|
||||
except (json.JSONDecodeError, KeyError, TypeError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
async def run_image_process(self, image_url: str):
|
||||
try:
|
||||
logger.debug(f"handling image from url: '{image_url}'")
|
||||
async with self._aiohttp_session.get(image_url) as response:
|
||||
image_stream = io.BytesIO(await response.content.read())
|
||||
image = Image.open(image_stream)
|
||||
image = image.convert("RGB")
|
||||
frame = URLImageRawFrame(
|
||||
url=image_url, image=image.tobytes(), size=image.size, format="RGB"
|
||||
)
|
||||
await self.push_frame(frame)
|
||||
except Exception as e:
|
||||
error_msg = f"Error handling image url {image_url}: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
|
||||
|
||||
# full list of tools available from rijksmuseum MCP:
|
||||
# - get_artwork_details
|
||||
# - get_artwork_image
|
||||
# - get_user_sets
|
||||
# - get_user_set_details
|
||||
# - open_image_in_browser
|
||||
# - get_artist_timeline
|
||||
|
||||
mcp_tools_filter = ["get_artwork_details", "get_artwork_image", "open_image_in_browser"]
|
||||
|
||||
|
||||
def open_image_output_filter(output: str):
|
||||
pattern = r"Successfully opened image in browser: "
|
||||
text_to_print = re.sub(pattern, "", output)
|
||||
print(f"🖼️ link to high resolution artwork: {text_to_print}")
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=1024,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
video_out_enabled=True,
|
||||
video_out_width=1024,
|
||||
video_out_height=1024,
|
||||
),
|
||||
}
|
||||
|
||||
@@ -131,63 +49,48 @@ transport_params = {
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
# Create an HTTP session for API calls
|
||||
async with aiohttp.ClientSession() as session:
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
tts = CartesiaTTSService(
|
||||
api_key=os.getenv("CARTESIA_API_KEY"),
|
||||
settings=CartesiaTTSService.Settings(
|
||||
voice="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = f"""
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to memory tools that let you store and recall information.
|
||||
Offer to remember things for the user, like their name, preferences, or anything they'd like.
|
||||
You can also recall things you've previously stored.
|
||||
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
settings=AnthropicLLMService.Settings(
|
||||
system_instruction=system_prompt,
|
||||
),
|
||||
)
|
||||
|
||||
# https://github.com/modelcontextprotocol/servers/tree/main/src/memory
|
||||
async with MCPClient(
|
||||
server_params=StdioServerParameters(
|
||||
command=shutil.which("npx"),
|
||||
args=["-y", "@modelcontextprotocol/server-memory"],
|
||||
# env={"MEMORY_FILE_PATH": "/tmp/pipecat_memory.jsonl"}, # Optional: specify MEMORY_FILE_PATH
|
||||
),
|
||||
) as mcp:
|
||||
tools = await mcp.register_tools(llm)
|
||||
|
||||
context = LLMContext(
|
||||
messages=[{"role": "user", "content": "Please introduce yourself."}],
|
||||
tools=tools,
|
||||
)
|
||||
|
||||
system_prompt = f"""
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to demonstrate your capabilities in a succinct way.
|
||||
You have access to tools to search the Rijksmuseum collection.
|
||||
Offer, for example, to show a floral still life, use the `search_artwork` tool.
|
||||
The tool may respond with a JSON object with an `artworks` array. Choose the art from that array.
|
||||
Once the tool has responded, tell the user the title and use the `open_image_in_browser` tool.
|
||||
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
|
||||
Respond to what the user said in a creative and helpful way.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
llm = AnthropicLLMService(
|
||||
api_key=os.getenv("ANTHROPIC_API_KEY"),
|
||||
settings=AnthropicLLMService.Settings(
|
||||
system_instruction=system_prompt,
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
mcp = MCPClient(
|
||||
server_params=StdioServerParameters(
|
||||
command=shutil.which("npx"),
|
||||
# https://github.com/r-huijts/rijksmuseum-mcp
|
||||
args=["-y", "mcp-server-rijksmuseum"],
|
||||
env={"RIJKSMUSEUM_API_KEY": os.getenv("RIJKSMUSEUM_API_KEY")},
|
||||
),
|
||||
# Optional
|
||||
tools_filter=mcp_tools_filter, # Optional
|
||||
tools_output_filters={"open_image_in_browser": open_image_output_filter},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp")
|
||||
logger.exception("error trace:")
|
||||
|
||||
mcp_image = UrlToImageProcessor(aiohttp_session=session)
|
||||
|
||||
tools = {}
|
||||
try:
|
||||
tools = await mcp.register_tools(llm)
|
||||
except Exception as e:
|
||||
logger.error(f"error registering tools")
|
||||
logger.exception("error trace:")
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
@@ -200,7 +103,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
mcp_image, # URL image -> output
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
@@ -238,13 +140,6 @@ async def bot(runner_args: RunnerArguments):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not os.getenv("RIJKSMUSEUM_API_KEY"):
|
||||
logger.error(
|
||||
f"Please set RIJKSMUSEUM_API_KEY environment variable for this example. See https://github.com/r-huijts/rijksmuseum-mcp and https://www.rijksmuseum.nl/en/register?redirectUrl=https://www.https://www.rijksmuseum.nl/en/rijksstudio/my/profile"
|
||||
)
|
||||
import sys
|
||||
|
||||
sys.exit(1)
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
|
||||
@@ -63,28 +63,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
),
|
||||
)
|
||||
|
||||
try:
|
||||
# Github MCP docs: https://github.com/github/github-mcp-server
|
||||
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
|
||||
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
|
||||
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
|
||||
mcp = MCPClient(
|
||||
server_params=StreamableHttpParameters(
|
||||
url="https://api.githubcopilot.com/mcp/",
|
||||
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp")
|
||||
logger.exception("error trace:")
|
||||
|
||||
tools = {}
|
||||
try:
|
||||
tools = await mcp.get_tools_schema()
|
||||
except Exception as e:
|
||||
logger.error(f"error registering tools")
|
||||
logger.exception("error trace:")
|
||||
|
||||
system = f"""
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to answer questions about the user's GitHub repositories and account.
|
||||
@@ -94,53 +72,65 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
llm = GeminiLiveLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system,
|
||||
tools=tools,
|
||||
)
|
||||
# Github MCP docs: https://github.com/github/github-mcp-server
|
||||
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
|
||||
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
|
||||
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
|
||||
async with MCPClient(
|
||||
server_params=StreamableHttpParameters(
|
||||
url="https://api.githubcopilot.com/mcp/",
|
||||
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
|
||||
)
|
||||
) as mcp:
|
||||
tools = await mcp.get_tools_schema()
|
||||
|
||||
await mcp.register_tools_schema(tools, llm)
|
||||
llm = GeminiLiveLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system,
|
||||
tools=tools,
|
||||
)
|
||||
|
||||
context = LLMContext([{"role": "developer", "content": "Please introduce yourself."}])
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
await mcp.register_tools_schema(tools, llm)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
context = LLMContext([{"role": "user", "content": "Please introduce yourself."}])
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
await runner.run(task)
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
|
||||
@@ -63,83 +63,78 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
),
|
||||
)
|
||||
|
||||
system_prompt = f"""
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to answer questions about the user's GitHub repositories and account.
|
||||
You have access to a number of tools provided by Github. Use any and all tools to help users.
|
||||
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
system_prompt = """\
|
||||
You are a helpful LLM in a voice call.
|
||||
Your goal is to answer questions about the user's GitHub repositories and account.
|
||||
You have access to a number of tools provided by Github. Use any and all tools to help users.
|
||||
Your output will be spoken aloud, so avoid special characters that can't easily be spoken, such as emojis or bullet points.
|
||||
Don't overexplain what you are doing.
|
||||
Just respond with short sentences when you are carrying out tool calls.
|
||||
"""
|
||||
|
||||
llm = GoogleLLMService(
|
||||
api_key=os.getenv("GOOGLE_API_KEY"),
|
||||
system_instruction=system_prompt,
|
||||
)
|
||||
|
||||
try:
|
||||
# Github MCP docs: https://github.com/github/github-mcp-server
|
||||
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
|
||||
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
|
||||
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
|
||||
mcp = MCPClient(
|
||||
server_params=StreamableHttpParameters(
|
||||
url="https://api.githubcopilot.com/mcp/",
|
||||
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"error setting up mcp")
|
||||
logger.exception("error trace:")
|
||||
|
||||
tools = {}
|
||||
try:
|
||||
tools = await mcp.register_tools(llm)
|
||||
except Exception as e:
|
||||
logger.error(f"error registering tools")
|
||||
logger.exception("error trace:")
|
||||
|
||||
context = LLMContext(tools=tools)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
settings=GoogleLLMService.Settings(
|
||||
system_instruction=system_prompt,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
# Github MCP docs: https://github.com/github/github-mcp-server
|
||||
# Enable Github Copilot on your GitHub account. Free tier is ok. (https://github.com/settings/copilot)
|
||||
# Generate a personal access token. It must be a Fine-grained token, classic tokens are not supported. (https://github.com/settings/personal-access-tokens)
|
||||
# Set permissions you want to use (eg. "all repositories", "profile: read/write", etc)
|
||||
async with MCPClient(
|
||||
server_params=StreamableHttpParameters(
|
||||
url="https://api.githubcopilot.com/mcp/",
|
||||
headers={"Authorization": f"Bearer {os.getenv('GITHUB_PERSONAL_ACCESS_TOKEN')}"},
|
||||
)
|
||||
) as mcp:
|
||||
tools = await mcp.register_tools(llm)
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
context = LLMContext(
|
||||
messages=[{"role": "user", "content": "Please introduce yourself."}],
|
||||
tools=tools,
|
||||
)
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User spoken responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses and tool context
|
||||
]
|
||||
)
|
||||
|
||||
await runner.run(task)
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected: {client}")
|
||||
# Kick off the conversation.
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
|
||||
162
examples/realtime/realtime-inworld.py
Normal file
162
examples/realtime/realtime-inworld.py
Normal file
@@ -0,0 +1,162 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""
|
||||
Inworld Realtime Example
|
||||
|
||||
This example demonstrates using Inworld's Realtime API for real-time voice
|
||||
conversations. The Inworld Realtime API is OpenAI-compatible and operates
|
||||
as a cascade STT/LLM/TTS pipeline under the hood, with built-in semantic
|
||||
voice activity detection for turn management.
|
||||
|
||||
Features:
|
||||
- Real-time audio streaming with low latency
|
||||
- Built-in semantic VAD (voice activity detection)
|
||||
- Streaming user transcription
|
||||
- Text and audio input
|
||||
|
||||
Requirements:
|
||||
- INWORLD_API_KEY environment variable set
|
||||
- pip install pipecat-ai[inworld]
|
||||
|
||||
Usage:
|
||||
python realtime-inworld.py --transport webrtc
|
||||
python realtime-inworld.py --transport daily
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.observers.loggers.transcription_log_observer import (
|
||||
TranscriptionLogObserver,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
AssistantTurnStoppedMessage,
|
||||
LLMContextAggregatorPair,
|
||||
UserTurnStoppedMessage,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.inworld.realtime.llm import InworldRealtimeLLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# --- Transport Configuration ---
|
||||
|
||||
# No local VAD needed — Inworld's server-side semantic VAD handles turn detection.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info("Starting Inworld Realtime bot")
|
||||
|
||||
# Create the Inworld Realtime LLM service.
|
||||
# Common params (llm_model, voice, tts_model, stt_model) are top-level.
|
||||
# For full control, use settings=InworldRealtimeLLMService.Settings(session_properties=...)
|
||||
#
|
||||
# llm_model can be any supported model or an Inworld Router.
|
||||
# See: https://docs.inworld.ai/router/introduction
|
||||
llm = InworldRealtimeLLMService(
|
||||
api_key=os.getenv("INWORLD_API_KEY"),
|
||||
llm_model="xai/grok-4-1-fast-non-reasoning",
|
||||
voice="Sarah",
|
||||
settings=InworldRealtimeLLMService.Settings(
|
||||
system_instruction="""You are a helpful and friendly AI assistant powered by Inworld.
|
||||
|
||||
Your voice and personality should be warm and engaging. Keep your responses
|
||||
concise and conversational since this is a voice interaction.
|
||||
|
||||
Always be helpful and proactive in offering assistance.""",
|
||||
),
|
||||
)
|
||||
|
||||
# Create context with initial message
|
||||
context = LLMContext(
|
||||
[{"role": "developer", "content": "Say hello and introduce yourself!"}],
|
||||
)
|
||||
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(context)
|
||||
|
||||
# Build the pipeline
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(),
|
||||
user_aggregator,
|
||||
llm, # Inworld Realtime (handles STT + LLM + TTS)
|
||||
transport.output(),
|
||||
assistant_aggregator,
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
observers=[TranscriptionLogObserver()],
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info("Client connected")
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
@user_aggregator.event_handler("on_user_turn_stopped")
|
||||
async def on_user_turn_stopped(aggregator, strategy, message: UserTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
logger.info(f"Transcript: {timestamp}user: {message.content}")
|
||||
|
||||
@assistant_aggregator.event_handler("on_assistant_turn_stopped")
|
||||
async def on_assistant_turn_stopped(aggregator, message: AssistantTurnStoppedMessage):
|
||||
timestamp = f"[{message.timestamp}] " if message.timestamp else ""
|
||||
logger.info(f"Transcript: {timestamp}assistant: {message.content}")
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -16,7 +16,7 @@ from pipecat.pipeline.task import PipelineTask
|
||||
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.gladia import GladiaSTTService
|
||||
from pipecat.services.gladia.stt import GladiaSTTService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
@@ -40,7 +40,7 @@ class TranscriptHandler:
|
||||
Maintains a list of conversation messages and outputs them either to a log
|
||||
or to a file as they are received. Each message includes its timestamp and role.
|
||||
|
||||
Attributes:
|
||||
Parameters:
|
||||
messages: List of all processed transcript messages
|
||||
output_file: Optional path to file where transcript is saved. If None, outputs to log only.
|
||||
"""
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
@@ -25,7 +25,6 @@ from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.whisper.stt import MLXModel, WhisperSTTServiceMLX
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
@@ -25,7 +25,6 @@ from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.cartesia.tts import CartesiaTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.services.whisper.stt import Model, WhisperSTTService
|
||||
from pipecat.transcriptions.language import Language
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
@@ -114,6 +114,14 @@ async def main():
|
||||
logger.info("Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
@transport.event_handler("on_avatar_connected")
|
||||
async def on_avatar_connected(transport, participant):
|
||||
logger.info("Avatar connected")
|
||||
|
||||
@transport.event_handler("on_avatar_disconnected")
|
||||
async def on_avatar_disconnected(transport, participant, reason):
|
||||
logger.info(f"Avatar disconnected. Reason: {reason}")
|
||||
|
||||
runner = PipelineRunner()
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
@@ -28,7 +28,6 @@ from pipecat.frames.frames import (
|
||||
Frame,
|
||||
OutputImageRawFrame,
|
||||
StartFrame,
|
||||
SystemFrame,
|
||||
)
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
|
||||
127
examples/voice/voice-mistral.py
Normal file
127
examples/voice/voice-mistral.py
Normal file
@@ -0,0 +1,127 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.silero import SileroVADAnalyzer
|
||||
from pipecat.frames.frames import LLMRunFrame
|
||||
from pipecat.pipeline.pipeline import Pipeline
|
||||
from pipecat.pipeline.runner import PipelineRunner
|
||||
from pipecat.pipeline.task import PipelineParams, PipelineTask
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext
|
||||
from pipecat.processors.aggregators.llm_response_universal import (
|
||||
LLMContextAggregatorPair,
|
||||
LLMUserAggregatorParams,
|
||||
)
|
||||
from pipecat.runner.types import RunnerArguments
|
||||
from pipecat.runner.utils import create_transport
|
||||
from pipecat.services.deepgram.stt import DeepgramSTTService
|
||||
from pipecat.services.mistral.tts import MistralTTSService
|
||||
from pipecat.services.openai.llm import OpenAILLMService
|
||||
from pipecat.transports.base_transport import BaseTransport, TransportParams
|
||||
from pipecat.transports.daily.transport import DailyParams
|
||||
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
|
||||
|
||||
load_dotenv(override=True)
|
||||
|
||||
|
||||
# We use lambdas to defer transport parameter creation until the transport
|
||||
# type is selected at runtime.
|
||||
transport_params = {
|
||||
"daily": lambda: DailyParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"twilio": lambda: FastAPIWebsocketParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
"webrtc": lambda: TransportParams(
|
||||
audio_in_enabled=True,
|
||||
audio_out_enabled=True,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
logger.info(f"Starting bot")
|
||||
|
||||
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
|
||||
|
||||
tts = MistralTTSService(
|
||||
api_key=os.getenv("MISTRAL_API_KEY"),
|
||||
settings=MistralTTSService.Settings(
|
||||
voice="c69964a6-ab8b-4f8a-9465-ec0925096ec8",
|
||||
),
|
||||
)
|
||||
|
||||
llm = OpenAILLMService(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
settings=OpenAILLMService.Settings(
|
||||
system_instruction="You are a helpful assistant in a voice conversation. Your responses will be spoken aloud, so avoid emojis, bullet points, or other formatting that can't be spoken. Respond to what the user said in a creative, helpful, and brief way.",
|
||||
),
|
||||
)
|
||||
|
||||
context = LLMContext()
|
||||
user_aggregator, assistant_aggregator = LLMContextAggregatorPair(
|
||||
context,
|
||||
user_params=LLMUserAggregatorParams(vad_analyzer=SileroVADAnalyzer()),
|
||||
)
|
||||
|
||||
pipeline = Pipeline(
|
||||
[
|
||||
transport.input(), # Transport user input
|
||||
stt,
|
||||
user_aggregator, # User responses
|
||||
llm, # LLM
|
||||
tts, # TTS
|
||||
transport.output(), # Transport bot output
|
||||
assistant_aggregator, # Assistant spoken responses
|
||||
]
|
||||
)
|
||||
|
||||
task = PipelineTask(
|
||||
pipeline,
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
),
|
||||
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
|
||||
)
|
||||
|
||||
@transport.event_handler("on_client_connected")
|
||||
async def on_client_connected(transport, client):
|
||||
logger.info(f"Client connected")
|
||||
# Kick off the conversation.
|
||||
context.add_message(
|
||||
{"role": "developer", "content": "Please introduce yourself to the user."}
|
||||
)
|
||||
await task.queue_frames([LLMRunFrame()])
|
||||
|
||||
@transport.event_handler("on_client_disconnected")
|
||||
async def on_client_disconnected(transport, client):
|
||||
logger.info(f"Client disconnected")
|
||||
await task.cancel()
|
||||
|
||||
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
|
||||
|
||||
await runner.run(task)
|
||||
|
||||
|
||||
async def bot(runner_args: RunnerArguments):
|
||||
"""Main bot entry point compatible with Pipecat Cloud."""
|
||||
transport = await create_transport(runner_args, transport_params)
|
||||
await run_bot(transport, runner_args)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
from pipecat.runner.run import main
|
||||
|
||||
main()
|
||||
@@ -96,7 +96,6 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
|
||||
params=PipelineParams(
|
||||
enable_metrics=True,
|
||||
enable_usage_metrics=True,
|
||||
allow_interruptions=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ description = "An open source framework for voice (and multimodal) assistants"
|
||||
license = "BSD-2-Clause"
|
||||
license-files = ["LICENSE"]
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.10"
|
||||
requires-python = ">=3.11"
|
||||
keywords = ["webrtc", "audio", "video", "ai"]
|
||||
classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
@@ -41,7 +41,7 @@ dependencies = [
|
||||
# Required by LocalSmartTurnAnalyzerV3
|
||||
# Inlined here instead of using a self-referential extra for Poetry compatibility.
|
||||
"transformers>=4.48.0,<6",
|
||||
"onnxruntime~=1.23.2",
|
||||
"onnxruntime~=1.24.3",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
@@ -77,7 +77,7 @@ groq = [ "groq>=0.23.0,<2" ]
|
||||
gstreamer = [ "pygobject~=3.50.0" ]
|
||||
heygen = [ "livekit>=1.0.13,<2", "pipecat-ai[websockets-base]" ]
|
||||
hume = [ "hume>=0.11.2,<1" ]
|
||||
inworld = []
|
||||
inworld = [ "pipecat-ai[websockets-base]" ]
|
||||
koala = [ "pvkoala~=2.0.3" ]
|
||||
kokoro = [ "kokoro-onnx>=0.5.0,<1", "requests>=2.32.5,<3" ]
|
||||
langchain = [ "langchain>=1.2.13,<2", "langchain-community>=0.4.1,<1", "langchain-openai>=1.1.12,<2" ]
|
||||
@@ -88,7 +88,7 @@ local = [ "pyaudio~=0.2.14" ]
|
||||
local-smart-turn = [ "coremltools>=8.0", "transformers>=4.48.0,<6", "torch>=2.5.0,<3", "torchaudio>=2.5.0,<3" ]
|
||||
mcp = [ "mcp[cli]>=1.11.0,<2" ]
|
||||
mem0 = [ "mem0ai>=1.0.8,<2" ]
|
||||
mistral = []
|
||||
mistral = ["mistralai>=2.0.0,<3"]
|
||||
mlx-whisper = [ "mlx-whisper~=0.4.2" ]
|
||||
moondream = [ "accelerate~=1.10.0", "einops~=0.8.0", "pyvips[binary]~=3.0.0", "timm~=1.0.13", "transformers>=4.48.0,<6" ]
|
||||
nebius = []
|
||||
@@ -101,10 +101,8 @@ openrouter = []
|
||||
perplexity = []
|
||||
piper = [ "piper-tts>=1.3.0,<2", "requests>=2.32.5,<3" ]
|
||||
qwen = []
|
||||
remote-smart-turn = []
|
||||
resembleai = [ "pipecat-ai[websockets-base]" ]
|
||||
rime = [ "pipecat-ai[websockets-base]" ]
|
||||
riva = [ "pipecat-ai[nvidia]" ]
|
||||
runner = [ "python-dotenv>=1.0.0,<2.0.0", "uvicorn>=0.32.0,<1.0.0", "fastapi>=0.115.6,<1", "pipecat-ai-small-webrtc-prebuilt>=2.4.0"]
|
||||
sagemaker = ["aws_sdk_sagemaker_runtime_http2; python_version>='3.12'"]
|
||||
sambanova = []
|
||||
@@ -135,9 +133,9 @@ dev = [
|
||||
"pip-tools~=7.5.3",
|
||||
"pre-commit~=4.5.1",
|
||||
"pyright>=1.1.404,<1.2",
|
||||
"pytest~=8.4.1",
|
||||
"pytest-asyncio~=1.3.0",
|
||||
"pytest-aiohttp==1.1.0",
|
||||
"pytest>=9.0.0,<10",
|
||||
"pytest-asyncio>=1.0.0,<2",
|
||||
"pytest-aiohttp>=1.0.0,<2",
|
||||
"ruff>=0.12.11,<1",
|
||||
"setuptools~=78.1.1",
|
||||
"setuptools_scm~=8.3.1",
|
||||
@@ -211,7 +209,6 @@ ignore = [
|
||||
"**/__init__.py" = ["D104"]
|
||||
# Skip specific rules for generated protobuf files
|
||||
"**/*_pb2.py" = ["D"]
|
||||
"src/pipecat/services/__init__.py" = ["D"]
|
||||
|
||||
[tool.ruff.lint.pydocstyle]
|
||||
convention = "google"
|
||||
|
||||
@@ -171,6 +171,7 @@ class EvalRunner:
|
||||
async def save_audio(self, name: str, audio: bytes, sample_rate: int, num_channels: int):
|
||||
if len(audio) > 0:
|
||||
filename = self._recording_file_name(name)
|
||||
os.makedirs(os.path.dirname(filename), exist_ok=True)
|
||||
logger.debug(f"Saving {name} audio to {filename}")
|
||||
with io.BytesIO() as buffer:
|
||||
with wave.open(buffer, "wb") as wf:
|
||||
|
||||
@@ -10,11 +10,13 @@ This module provides the abstract base class for implementing LLM provider-speci
|
||||
adapters that handle tool format conversion and standardization.
|
||||
"""
|
||||
|
||||
import warnings
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Dict, Generic, List, Optional, TypeVar
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import (
|
||||
LLMContext,
|
||||
@@ -48,6 +50,21 @@ class BaseLLMAdapter(ABC, Generic[TLLMInvocationParams]):
|
||||
def __init__(self):
|
||||
"""Initialize the adapter."""
|
||||
self._warned_system_instruction = False
|
||||
self._builtin_tools: Dict[str, FunctionSchema] = {}
|
||||
|
||||
@property
|
||||
def builtin_tools(self) -> Dict[str, FunctionSchema]:
|
||||
"""Built-in tools automatically merged into every inference request.
|
||||
|
||||
Keyed by tool name for O(1) lookup, insertion, and removal. The
|
||||
service injects tools here so they are sent transparently on every
|
||||
inference request without the user having to add them to their
|
||||
``ToolsSchema``.
|
||||
|
||||
Returns:
|
||||
Mutable dict mapping tool name to ``FunctionSchema``.
|
||||
"""
|
||||
return self._builtin_tools
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
@@ -108,20 +125,29 @@ class BaseLLMAdapter(ABC, Generic[TLLMInvocationParams]):
|
||||
"""
|
||||
return LLMSpecificMessage(llm=self.id_for_llm_specific_messages, message=message)
|
||||
|
||||
def get_messages(self, context: LLMContext) -> List[LLMContextMessage]:
|
||||
def get_messages(
|
||||
self, context: LLMContext, *, truncate_large_values: bool = False
|
||||
) -> List[LLMContextMessage]:
|
||||
"""Get messages from the LLM context, including standard and LLM-specific messages.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing messages.
|
||||
truncate_large_values: If True, return deep copies of messages with
|
||||
large values replaced by short placeholders.
|
||||
|
||||
Returns:
|
||||
List of messages including standard and LLM-specific messages.
|
||||
"""
|
||||
return context.get_messages(self.id_for_llm_specific_messages)
|
||||
return context.get_messages(
|
||||
self.id_for_llm_specific_messages, truncate_large_values=truncate_large_values
|
||||
)
|
||||
|
||||
def from_standard_tools(self, tools: Any) -> List[Any] | NotGiven:
|
||||
"""Convert tools from standard format to provider format.
|
||||
|
||||
Built-in tools are automatically merged into the schema before conversion so that every
|
||||
inference request receives them without the user having to declare them explicitly.
|
||||
|
||||
Args:
|
||||
tools: Tools in standard format or provider-specific format.
|
||||
|
||||
@@ -129,8 +155,31 @@ class BaseLLMAdapter(ABC, Generic[TLLMInvocationParams]):
|
||||
List of tools converted to provider format, or original tools
|
||||
if not in standard format.
|
||||
"""
|
||||
if self._builtin_tools:
|
||||
if isinstance(tools, ToolsSchema):
|
||||
tools = ToolsSchema(
|
||||
standard_tools=tools.standard_tools + list(self._builtin_tools.values()),
|
||||
custom_tools=tools.custom_tools,
|
||||
)
|
||||
else:
|
||||
# User supplied tools in a legacy/provider-specific format.
|
||||
# Built-in tools cannot be safely merged, so they will not be injected.
|
||||
# Migrate to ToolsSchema to enable built-in tool support; use custom_tools
|
||||
# as an escape hatch for any provider-specific tools that don't fit the
|
||||
# standard schema.
|
||||
if tools is not None:
|
||||
warnings.warn(
|
||||
"Built-in tools (e.g. async tool cancellation) could not be injected "
|
||||
"because the supplied tools are not a ToolsSchema instance. "
|
||||
"Migrate to ToolsSchema to enable built-in tool support. "
|
||||
"Use ToolsSchema(custom_tools=...) as an escape hatch for any "
|
||||
"provider-specific tools that don't fit the standard schema.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
# Fall through and return the original tools unchanged.
|
||||
|
||||
if isinstance(tools, ToolsSchema):
|
||||
logger.debug(f"Retrieving the tools using the adapter: {type(self)}")
|
||||
return self.to_provider_tools_format(tools)
|
||||
# Fallback to return the same tools in case they are not in a standard format
|
||||
return tools
|
||||
|
||||
@@ -21,10 +21,12 @@ class AdapterType(Enum):
|
||||
"""Supported adapter types for custom tools.
|
||||
|
||||
Parameters:
|
||||
GEMINI: Google Gemini adapter - currently the only service supporting custom tools.
|
||||
GEMINI: Google Gemini adapter.
|
||||
OPENAI: OpenAI adapter (Chat Completions, Responses, and Realtime API).
|
||||
"""
|
||||
|
||||
GEMINI = "gemini" # that is the only service where we are able to add custom tools for now
|
||||
GEMINI = "gemini"
|
||||
OPENAI = "openai"
|
||||
|
||||
|
||||
class ToolsSchema:
|
||||
|
||||
@@ -16,7 +16,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
|
||||
|
||||
|
||||
|
||||
@@ -19,7 +19,7 @@ from loguru import logger
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
|
||||
from pipecat.services.xai.realtime import events
|
||||
|
||||
@@ -27,7 +27,7 @@ from pipecat.services.xai.realtime import events
|
||||
class GrokRealtimeLLMInvocationParams(TypedDict):
|
||||
"""Context-based parameters for invoking Grok Realtime API.
|
||||
|
||||
Attributes:
|
||||
Parameters:
|
||||
system_instruction: System prompt/instructions for the session.
|
||||
messages: List of conversation items formatted for Grok Realtime.
|
||||
tools: List of tool definitions (function, web_search, x_search, file_search).
|
||||
@@ -77,7 +77,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
|
||||
"""Get messages from context in a format safe for logging.
|
||||
|
||||
Removes or truncates sensitive data like audio content.
|
||||
Binary data (images, audio) is replaced with short placeholders.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing messages.
|
||||
@@ -85,18 +85,7 @@ class GrokRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
Returns:
|
||||
List of messages with sensitive data redacted.
|
||||
"""
|
||||
msgs = []
|
||||
for message in self.get_messages(context):
|
||||
msg = copy.deepcopy(message)
|
||||
if "content" in msg:
|
||||
if isinstance(msg["content"], list):
|
||||
for item in msg["content"]:
|
||||
if item.get("type") == "input_audio":
|
||||
item["audio"] = "..."
|
||||
if item.get("type") == "audio":
|
||||
item["audio"] = "..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
return self.get_messages(context, truncate_large_values=True)
|
||||
|
||||
@dataclass
|
||||
class ConvertedMessages:
|
||||
|
||||
244
src/pipecat/adapters/services/inworld_realtime_adapter.py
Normal file
244
src/pipecat/adapters/services/inworld_realtime_adapter.py
Normal file
@@ -0,0 +1,244 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Inworld Realtime LLM adapter for Pipecat.
|
||||
|
||||
Converts Pipecat's tool schemas and context into the format required by
|
||||
Inworld's Realtime API.
|
||||
"""
|
||||
|
||||
import copy
|
||||
import json
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.function_schema import FunctionSchema
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage
|
||||
from pipecat.services.inworld.realtime import events
|
||||
|
||||
|
||||
class InworldRealtimeLLMInvocationParams(TypedDict):
|
||||
"""Context-based parameters for invoking Inworld Realtime API.
|
||||
|
||||
Attributes:
|
||||
system_instruction: System prompt/instructions for the session.
|
||||
messages: List of conversation items formatted for Inworld Realtime.
|
||||
tools: List of tool definitions.
|
||||
"""
|
||||
|
||||
system_instruction: Optional[str]
|
||||
messages: List[events.ConversationItem]
|
||||
tools: List[Dict[str, Any]]
|
||||
|
||||
|
||||
class InworldRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
"""LLM adapter for Inworld Realtime API.
|
||||
|
||||
Converts Pipecat's universal context and tool schemas into the specific
|
||||
format required by Inworld's Realtime API.
|
||||
"""
|
||||
|
||||
@property
|
||||
def id_for_llm_specific_messages(self) -> str:
|
||||
"""Get the identifier used in LLMSpecificMessage instances for Inworld Realtime."""
|
||||
return "inworld-realtime"
|
||||
|
||||
def get_llm_invocation_params(
|
||||
self, context: LLMContext, *, system_instruction: Optional[str] = None
|
||||
) -> InworldRealtimeLLMInvocationParams:
|
||||
"""Get Inworld Realtime-specific LLM invocation parameters from a universal LLM context.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing messages, tools, etc.
|
||||
system_instruction: Optional system instruction from service settings.
|
||||
|
||||
Returns:
|
||||
Dictionary of parameters for invoking Inworld's Realtime API.
|
||||
"""
|
||||
messages = self._from_universal_context_messages(self.get_messages(context))
|
||||
effective_system = self._resolve_system_instruction(
|
||||
messages.system_instruction,
|
||||
system_instruction,
|
||||
discard_context_system=True,
|
||||
)
|
||||
return {
|
||||
"system_instruction": effective_system,
|
||||
"messages": messages.messages,
|
||||
"tools": self.from_standard_tools(context.tools) or [],
|
||||
}
|
||||
|
||||
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
|
||||
"""Get messages from context in a format safe for logging.
|
||||
|
||||
Binary data (images, audio) is replaced with short placeholders.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing messages.
|
||||
|
||||
Returns:
|
||||
List of messages with sensitive data redacted.
|
||||
"""
|
||||
return self.get_messages(context, truncate_large_values=True)
|
||||
|
||||
@dataclass
|
||||
class ConvertedMessages:
|
||||
"""Container for Inworld-formatted messages converted from universal context."""
|
||||
|
||||
messages: List[events.ConversationItem]
|
||||
system_instruction: Optional[str] = None
|
||||
|
||||
def _from_universal_context_messages(
|
||||
self, universal_context_messages: List[LLMContextMessage]
|
||||
) -> ConvertedMessages:
|
||||
"""Convert universal context messages to Inworld Realtime format.
|
||||
|
||||
Similar to OpenAI Realtime, we pack conversation history into a single
|
||||
user message since the realtime API doesn't support loading long histories.
|
||||
|
||||
Args:
|
||||
universal_context_messages: List of messages in universal format.
|
||||
|
||||
Returns:
|
||||
ConvertedMessages with Inworld-formatted messages and system instruction.
|
||||
"""
|
||||
if not universal_context_messages:
|
||||
return self.ConvertedMessages(messages=[])
|
||||
|
||||
messages = copy.deepcopy(universal_context_messages)
|
||||
system_instruction = None
|
||||
|
||||
# Extract system message as session instructions
|
||||
if messages[0].get("role") == "system":
|
||||
system = messages.pop(0)
|
||||
content = system.get("content")
|
||||
if isinstance(content, str):
|
||||
system_instruction = content
|
||||
elif isinstance(content, list):
|
||||
system_instruction = content[0].get("text")
|
||||
if not messages:
|
||||
return self.ConvertedMessages(messages=[], system_instruction=system_instruction)
|
||||
|
||||
# Convert any remaining "system"/"developer" messages to "user"
|
||||
for msg in messages:
|
||||
if msg.get("role") in ("system", "developer"):
|
||||
msg["role"] = "user"
|
||||
|
||||
# Single user message can be sent normally
|
||||
if len(messages) == 1 and messages[0].get("role") == "user":
|
||||
return self.ConvertedMessages(
|
||||
messages=[self._from_universal_context_message(messages[0])],
|
||||
system_instruction=system_instruction,
|
||||
)
|
||||
|
||||
# Pack multiple messages into a single user message
|
||||
intro_text = """
|
||||
This is a previously saved conversation. Please treat this conversation history as a
|
||||
starting point for the current conversation."""
|
||||
|
||||
trailing_text = """
|
||||
This is the end of the previously saved conversation. Please continue the conversation
|
||||
from here. If the last message is a user instruction or question, act on that instruction
|
||||
or answer the question. If the last message is an assistant response, simply say that you
|
||||
are ready to continue the conversation."""
|
||||
|
||||
return self.ConvertedMessages(
|
||||
messages=[
|
||||
events.ConversationItem(
|
||||
role="user",
|
||||
type="message",
|
||||
content=[
|
||||
events.ItemContent(
|
||||
type="input_text",
|
||||
text="\n\n".join(
|
||||
[
|
||||
intro_text,
|
||||
json.dumps(messages, indent=2),
|
||||
trailing_text,
|
||||
]
|
||||
),
|
||||
)
|
||||
],
|
||||
)
|
||||
],
|
||||
system_instruction=system_instruction,
|
||||
)
|
||||
|
||||
def _from_universal_context_message(
|
||||
self, message: LLMContextMessage
|
||||
) -> events.ConversationItem:
|
||||
"""Convert a single universal context message to Inworld format.
|
||||
|
||||
Args:
|
||||
message: Message in universal format.
|
||||
|
||||
Returns:
|
||||
ConversationItem formatted for Inworld Realtime API.
|
||||
"""
|
||||
if message.get("role") == "user":
|
||||
content = message.get("content")
|
||||
if isinstance(content, list):
|
||||
text_content = ""
|
||||
for c in content:
|
||||
if c.get("type") == "text":
|
||||
text_content += " " + c.get("text")
|
||||
else:
|
||||
logger.error(
|
||||
f"Unhandled content type in context message: {c.get('type')} - {message}"
|
||||
)
|
||||
content = text_content.strip()
|
||||
return events.ConversationItem(
|
||||
role="user",
|
||||
type="message",
|
||||
content=[events.ItemContent(type="input_text", text=content)],
|
||||
)
|
||||
|
||||
if message.get("role") == "assistant" and message.get("tool_calls"):
|
||||
tc = message.get("tool_calls")[0]
|
||||
return events.ConversationItem(
|
||||
type="function_call",
|
||||
call_id=tc["id"],
|
||||
name=tc["function"]["name"],
|
||||
arguments=tc["function"]["arguments"],
|
||||
)
|
||||
|
||||
logger.error(f"Unhandled message type in _from_universal_context_message: {message}")
|
||||
|
||||
@staticmethod
|
||||
def _to_inworld_function_format(function: FunctionSchema) -> Dict[str, Any]:
|
||||
"""Convert a function schema to Inworld Realtime function format.
|
||||
|
||||
Args:
|
||||
function: The function schema to convert.
|
||||
|
||||
Returns:
|
||||
Dictionary in Inworld Realtime function format.
|
||||
"""
|
||||
return {
|
||||
"type": "function",
|
||||
"name": function.name,
|
||||
"description": function.description,
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": function.properties,
|
||||
"required": function.required,
|
||||
},
|
||||
}
|
||||
|
||||
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[Dict[str, Any]]:
|
||||
"""Convert tool schemas to Inworld Realtime format.
|
||||
|
||||
Args:
|
||||
tools_schema: The tools schema containing functions to convert.
|
||||
|
||||
Returns:
|
||||
List of tool definitions in Inworld Realtime format.
|
||||
"""
|
||||
functions_schema = tools_schema.standard_tools
|
||||
return [self._to_inworld_function_format(func) for func in functions_schema]
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
"""OpenAI LLM adapter for Pipecat."""
|
||||
|
||||
import copy
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
from openai._types import NotGiven as OpenAINotGiven
|
||||
@@ -17,7 +16,7 @@ from openai.types.chat import (
|
||||
)
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import (
|
||||
LLMContext,
|
||||
LLMContextMessage,
|
||||
@@ -107,15 +106,19 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
|
||||
with ChatCompletion API.
|
||||
"""
|
||||
functions_schema = tools_schema.standard_tools
|
||||
return [
|
||||
formatted_standard_tools = [
|
||||
ChatCompletionToolParam(type="function", function=func.to_default_dict())
|
||||
for func in functions_schema
|
||||
]
|
||||
custom_openai_tools = []
|
||||
if tools_schema.custom_tools:
|
||||
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
|
||||
return formatted_standard_tools + custom_openai_tools
|
||||
|
||||
def get_messages_for_logging(self, context: LLMContext) -> List[Dict[str, Any]]:
|
||||
"""Get messages from a universal LLM context in a format ready for logging about OpenAI.
|
||||
|
||||
Removes or truncates sensitive data like image content for safe logging.
|
||||
Binary data (images, audio) is replaced with short placeholders.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing messages.
|
||||
@@ -123,21 +126,7 @@ class OpenAILLMAdapter(BaseLLMAdapter[OpenAILLMInvocationParams]):
|
||||
Returns:
|
||||
List of messages in a format ready for logging about OpenAI.
|
||||
"""
|
||||
msgs = []
|
||||
for message in self.get_messages(context):
|
||||
msg = copy.deepcopy(message)
|
||||
if "content" in msg:
|
||||
if isinstance(msg["content"], list):
|
||||
for item in msg["content"]:
|
||||
if item["type"] == "image_url":
|
||||
if item["image_url"]["url"].startswith("data:image/"):
|
||||
item["image_url"]["url"] = "data:image/..."
|
||||
if item["type"] == "input_audio":
|
||||
item["input_audio"]["data"] = "..."
|
||||
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
|
||||
msg["data"] = "..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
return self.get_messages(context, truncate_large_values=True)
|
||||
|
||||
def _from_universal_context_messages(
|
||||
self,
|
||||
|
||||
@@ -71,7 +71,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
def get_messages_for_logging(self, context) -> List[Dict[str, Any]]:
|
||||
"""Get messages from a universal LLM context in a format ready for logging about OpenAI Realtime.
|
||||
|
||||
Removes or truncates sensitive data like image content for safe logging.
|
||||
Binary data (images, audio) is replaced with short placeholders.
|
||||
|
||||
This is a placeholder until support for universal LLMContext machinery is added for OpenAI Realtime.
|
||||
|
||||
@@ -81,25 +81,7 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
Returns:
|
||||
List of messages in a format ready for logging about OpenAI Realtime.
|
||||
"""
|
||||
# NOTE: this is the same as in OpenAIAdapter, as that's what it was
|
||||
# prior to a refactor. Worth noting that for OpenAI Realtime
|
||||
# specifically, not everything handled here is necessarily supported
|
||||
# (or supported yet).
|
||||
msgs = []
|
||||
for message in self.get_messages(context):
|
||||
msg = copy.deepcopy(message)
|
||||
if "content" in msg:
|
||||
if isinstance(msg["content"], list):
|
||||
for item in msg["content"]:
|
||||
if item["type"] == "image_url":
|
||||
if item["image_url"]["url"].startswith("data:image/"):
|
||||
item["image_url"]["url"] = "data:image/..."
|
||||
if item["type"] == "input_audio":
|
||||
item["input_audio"]["data"] = "..."
|
||||
if "mime_type" in msg and msg["mime_type"].startswith("image/"):
|
||||
msg["data"] = "..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
return self.get_messages(context, truncate_large_values=True)
|
||||
|
||||
@dataclass
|
||||
class ConvertedMessages:
|
||||
@@ -236,4 +218,10 @@ class OpenAIRealtimeLLMAdapter(BaseLLMAdapter):
|
||||
List of function definitions in OpenAI Realtime format.
|
||||
"""
|
||||
functions_schema = tools_schema.standard_tools
|
||||
return [self._to_openai_realtime_function_format(func) for func in functions_schema]
|
||||
formatted_standard_tools = [
|
||||
self._to_openai_realtime_function_format(func) for func in functions_schema
|
||||
]
|
||||
custom_openai_tools = []
|
||||
if tools_schema.custom_tools:
|
||||
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
|
||||
return formatted_standard_tools + custom_openai_tools
|
||||
|
||||
@@ -6,19 +6,17 @@
|
||||
|
||||
"""OpenAI Responses API adapter for Pipecat."""
|
||||
|
||||
import copy
|
||||
from typing import Any, Dict, List, Optional, TypedDict
|
||||
|
||||
from openai._types import NotGiven as OpenAINotGiven
|
||||
from openai.types.responses import FunctionToolParam, ResponseInputItemParam
|
||||
from openai.types.responses import FunctionToolParam, ResponseInputItemParam, ToolParam
|
||||
|
||||
from pipecat.adapters.base_llm_adapter import BaseLLMAdapter
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
|
||||
from pipecat.processors.aggregators.llm_context import (
|
||||
LLMContext,
|
||||
LLMContextMessage,
|
||||
LLMSpecificMessage,
|
||||
NotGiven,
|
||||
)
|
||||
|
||||
|
||||
@@ -26,7 +24,7 @@ class OpenAIResponsesLLMInvocationParams(TypedDict, total=False):
|
||||
"""Context-based parameters for invoking OpenAI Responses API."""
|
||||
|
||||
input: List[ResponseInputItemParam]
|
||||
tools: List[FunctionToolParam] | OpenAINotGiven
|
||||
tools: List[ToolParam] | OpenAINotGiven
|
||||
instructions: str
|
||||
|
||||
|
||||
@@ -107,7 +105,7 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
|
||||
|
||||
return params
|
||||
|
||||
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[FunctionToolParam]:
|
||||
def to_provider_tools_format(self, tools_schema: ToolsSchema) -> List[ToolParam]:
|
||||
"""Convert function schemas to Responses API function tool format.
|
||||
|
||||
Args:
|
||||
@@ -129,12 +127,15 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
|
||||
if "description" in d:
|
||||
tool["description"] = d["description"]
|
||||
result.append(tool)
|
||||
return result
|
||||
custom_openai_tools = []
|
||||
if tools_schema.custom_tools:
|
||||
custom_openai_tools = tools_schema.custom_tools.get(AdapterType.OPENAI, [])
|
||||
return result + custom_openai_tools
|
||||
|
||||
def get_messages_for_logging(self, context: LLMContext) -> List[Dict[str, Any]]:
|
||||
"""Get messages from context in a format ready for logging.
|
||||
|
||||
Removes or truncates sensitive data like image content for safe logging.
|
||||
Binary data (images, audio) is replaced with short placeholders.
|
||||
|
||||
Args:
|
||||
context: The LLM context containing messages.
|
||||
@@ -142,19 +143,7 @@ class OpenAIResponsesLLMAdapter(BaseLLMAdapter[OpenAIResponsesLLMInvocationParam
|
||||
Returns:
|
||||
List of messages in a format ready for logging.
|
||||
"""
|
||||
msgs = []
|
||||
for message in self.get_messages(context):
|
||||
msg = copy.deepcopy(message)
|
||||
if "content" in msg:
|
||||
if isinstance(msg["content"], list):
|
||||
for item in msg["content"]:
|
||||
if item.get("type") == "image_url":
|
||||
if item["image_url"]["url"].startswith("data:image/"):
|
||||
item["image_url"]["url"] = "data:image/..."
|
||||
if item.get("type") == "input_audio":
|
||||
item["input_audio"]["data"] = "..."
|
||||
msgs.append(msg)
|
||||
return msgs
|
||||
return self.get_messages(context, truncate_large_values=True)
|
||||
|
||||
def _convert_messages_to_input(
|
||||
self, messages: List[LLMContextMessage]
|
||||
|
||||
@@ -1,58 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Base interruption strategy for determining when users can interrupt bot speech."""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
|
||||
class BaseInterruptionStrategy(ABC):
|
||||
"""Base class for interruption strategies.
|
||||
|
||||
This is a base class for interruption strategies. Interruption strategies
|
||||
decide when the user can interrupt the bot while the bot is speaking. For
|
||||
example, there could be strategies based on audio volume or strategies based
|
||||
on the number of words the user spoke.
|
||||
"""
|
||||
|
||||
async def append_audio(self, audio: bytes, sample_rate: int):
|
||||
"""Append audio data to the strategy for analysis.
|
||||
|
||||
Not all strategies handle audio. Default implementation does nothing.
|
||||
|
||||
Args:
|
||||
audio: Raw audio bytes to append.
|
||||
sample_rate: Sample rate of the audio data in Hz.
|
||||
"""
|
||||
pass
|
||||
|
||||
async def append_text(self, text: str):
|
||||
"""Append text data to the strategy for analysis.
|
||||
|
||||
Not all strategies handle text. Default implementation does nothing.
|
||||
|
||||
Args:
|
||||
text: Text string to append for analysis.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def should_interrupt(self) -> bool:
|
||||
"""Determine if the user should interrupt the bot.
|
||||
|
||||
This is called when the user stops speaking and it's time to decide
|
||||
whether the user should interrupt the bot. The decision will be based on
|
||||
the aggregated audio and/or text.
|
||||
|
||||
Returns:
|
||||
True if the user should interrupt the bot, False otherwise.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def reset(self):
|
||||
"""Reset the current accumulated text and/or audio."""
|
||||
pass
|
||||
@@ -1,75 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Minimum words interruption strategy for word count-based interruptions."""
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
|
||||
|
||||
|
||||
class MinWordsInterruptionStrategy(BaseInterruptionStrategy):
|
||||
"""Interruption strategy based on minimum number of words spoken.
|
||||
|
||||
This is an interruption strategy based on a minimum number of words said
|
||||
by the user. That is, the strategy will be true if the user has said at
|
||||
least that amount of words.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
|
||||
This class is deprecated, use
|
||||
`pipecat.turns.user_start.MinWordsUserTurnStartStrategy` with `PipelineTask`'s
|
||||
new `user_turn_strategies` parameter instead.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, *, min_words: int):
|
||||
"""Initialize the minimum words interruption strategy.
|
||||
|
||||
Args:
|
||||
min_words: Minimum number of words required to trigger an interruption.
|
||||
"""
|
||||
super().__init__()
|
||||
self._min_words = min_words
|
||||
self._text = ""
|
||||
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"'pipecat.audio.interruptions' is deprecated. "
|
||||
"Use `pipecat.turns.user_start.MinWordsUserTurnStartStrategy` with `PipelineTask`'s "
|
||||
"new `user_turn_strategies` parameter instead.",
|
||||
DeprecationWarning,
|
||||
)
|
||||
|
||||
async def append_text(self, text: str):
|
||||
"""Append text for word count analysis.
|
||||
|
||||
Args:
|
||||
text: Text string to append to the accumulated text.
|
||||
|
||||
Note: Not all strategies need to handle text.
|
||||
"""
|
||||
self._text += text
|
||||
|
||||
async def should_interrupt(self) -> bool:
|
||||
"""Check if the minimum word count has been reached.
|
||||
|
||||
Returns:
|
||||
True if the user has spoken at least the minimum number of words.
|
||||
"""
|
||||
word_count = len(self._text.split())
|
||||
interrupt = word_count >= self._min_words
|
||||
logger.debug(
|
||||
f"should_interrupt={interrupt} num_spoken_words={word_count} min_words={self._min_words}"
|
||||
)
|
||||
return interrupt
|
||||
|
||||
async def reset(self):
|
||||
"""Reset the accumulated text for the next analysis cycle."""
|
||||
self._text = ""
|
||||
@@ -10,8 +10,11 @@ This module provides a controller that wraps a VADAnalyzer to track speech state
|
||||
and emit events when speech starts, stops, or is actively detected.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from typing import Type
|
||||
from typing import Optional, Type
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.audio.vad.vad_analyzer import VADAnalyzer, VADState
|
||||
from pipecat.frames.frames import (
|
||||
@@ -22,6 +25,7 @@ from pipecat.frames.frames import (
|
||||
VADParamsUpdateFrame,
|
||||
)
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
from pipecat.utils.asyncio.task_manager import BaseTaskManager
|
||||
from pipecat.utils.base_object import BaseObject
|
||||
|
||||
|
||||
@@ -35,7 +39,8 @@ class VADController(BaseObject):
|
||||
Event handlers available:
|
||||
|
||||
- on_speech_started: Called when speech begins.
|
||||
- on_speech_stopped: Called when speech ends.
|
||||
- on_speech_stopped: Called when speech ends, including forced stop when
|
||||
the audio stream goes idle (no frames received while speaking).
|
||||
- on_speech_activity: Called periodically while speech is detected.
|
||||
- on_push_frame: Called when the controller wants to push a frame.
|
||||
- on_broadcast_frame: Called when the controller wants to broadcast a frame.
|
||||
@@ -63,30 +68,62 @@ class VADController(BaseObject):
|
||||
...
|
||||
"""
|
||||
|
||||
def __init__(self, vad_analyzer: VADAnalyzer, *, speech_activity_period: float = 0.2):
|
||||
def __init__(
|
||||
self,
|
||||
vad_analyzer: VADAnalyzer,
|
||||
*,
|
||||
speech_activity_period: float = 0.2,
|
||||
audio_idle_timeout: float = 1.0,
|
||||
):
|
||||
"""Initialize the VAD controller.
|
||||
|
||||
Args:
|
||||
vad_analyzer: The `VADAnalyzer` instance for processing audio.
|
||||
speech_activity_period: Minimum interval in seconds between
|
||||
`on_speech_activity` events. Defaults to 0.2.
|
||||
audio_idle_timeout: Timeout in seconds to force speech stop
|
||||
when no audio frames are received while in SPEAKING state.
|
||||
This handles cases like mic mute mid-speech.
|
||||
Set to 0 to disable. Defaults to 1.0.
|
||||
"""
|
||||
super().__init__()
|
||||
self._vad_analyzer = vad_analyzer
|
||||
self._vad_state: VADState = VADState.QUIET
|
||||
|
||||
self._task_manager: Optional[BaseTaskManager] = None
|
||||
|
||||
# Last time a on_speech_activity was triggered.
|
||||
self._speech_activity_time = 0
|
||||
# How often a on_speech_activity event should be triggered (value should
|
||||
# be greater than the audio chunks to have any effect).
|
||||
self._speech_activity_period = speech_activity_period
|
||||
|
||||
# Audio idle detection: force speech stop when no audio arrives
|
||||
# while in SPEAKING state (e.g. user mutes mic mid-speech).
|
||||
self._last_audio_time: float = 0.0
|
||||
self._audio_idle_timeout = audio_idle_timeout
|
||||
self._audio_idle_task: Optional[asyncio.Task] = None
|
||||
|
||||
self._register_event_handler("on_speech_started", sync=True)
|
||||
self._register_event_handler("on_speech_stopped", sync=True)
|
||||
self._register_event_handler("on_speech_activity", sync=True)
|
||||
self._register_event_handler("on_push_frame", sync=True)
|
||||
self._register_event_handler("on_broadcast_frame", sync=True)
|
||||
|
||||
async def setup(self, task_manager: BaseTaskManager):
|
||||
"""Initialize the controller with the given task manager.
|
||||
|
||||
Args:
|
||||
task_manager: The task manager to be associated with this instance.
|
||||
"""
|
||||
self._task_manager = task_manager
|
||||
self._last_audio_time = time.monotonic()
|
||||
if self._audio_idle_timeout > 0 and not self._audio_idle_task:
|
||||
self._audio_idle_task = self._task_manager.create_task(
|
||||
self._audio_idle_handler(),
|
||||
f"{self}::_audio_idle_handler",
|
||||
)
|
||||
|
||||
async def process_frame(self, frame: Frame):
|
||||
"""Process a frame and handle VAD-related events.
|
||||
|
||||
@@ -116,6 +153,10 @@ class VADController(BaseObject):
|
||||
It waits for all currently executing event handler tasks to finish
|
||||
before returning.
|
||||
"""
|
||||
await super().cleanup()
|
||||
if self._audio_idle_task and self._task_manager:
|
||||
await self._task_manager.cancel_task(self._audio_idle_task)
|
||||
self._audio_idle_task = None
|
||||
if self._vad_analyzer:
|
||||
await self._vad_analyzer.cleanup()
|
||||
|
||||
@@ -128,6 +169,8 @@ class VADController(BaseObject):
|
||||
Args:
|
||||
frame: Audio frame to process.
|
||||
"""
|
||||
self._last_audio_time = time.monotonic()
|
||||
|
||||
self._vad_state = await self._handle_vad(frame.audio, self._vad_state)
|
||||
|
||||
if self._vad_state == VADState.SPEAKING:
|
||||
@@ -149,6 +192,29 @@ class VADController(BaseObject):
|
||||
vad_state = new_vad_state
|
||||
return vad_state
|
||||
|
||||
async def _audio_idle_handler(self):
|
||||
"""Monitor for an idle audio stream while in SPEAKING state.
|
||||
|
||||
When no audio frames arrive for `audio_idle_timeout` seconds
|
||||
(e.g. user mutes mic mid-speech), forces a transition to QUIET and
|
||||
emits `on_speech_stopped`.
|
||||
"""
|
||||
while True:
|
||||
deadline = self._last_audio_time + self._audio_idle_timeout
|
||||
remaining = deadline - time.monotonic()
|
||||
if remaining > 0:
|
||||
# Audio is still recent; sleep only for the remaining window.
|
||||
await asyncio.sleep(remaining)
|
||||
continue
|
||||
|
||||
if self._vad_state == VADState.SPEAKING:
|
||||
logger.warning(f"{self}: no audio received while speaking, forcing speech stop")
|
||||
self._vad_state = VADState.QUIET
|
||||
await self._call_event_handler("on_speech_stopped")
|
||||
|
||||
# Wait for the next potential idle window.
|
||||
await asyncio.sleep(self._audio_idle_timeout)
|
||||
|
||||
async def _maybe_speech_activity(self):
|
||||
"""Handle user speaking frame."""
|
||||
diff_time = time.time() - self._speech_activity_time
|
||||
|
||||
@@ -29,7 +29,6 @@ from typing import (
|
||||
|
||||
from pipecat.adapters.schemas.tools_schema import ToolsSchema
|
||||
from pipecat.audio.dtmf.types import KeypadEntry
|
||||
from pipecat.audio.interruptions.base_interruption_strategy import BaseInterruptionStrategy
|
||||
from pipecat.audio.turn.base_turn_analyzer import BaseTurnParams
|
||||
from pipecat.audio.vad.vad_analyzer import VADParams
|
||||
from pipecat.metrics.metrics import MetricsData
|
||||
@@ -39,7 +38,7 @@ from pipecat.utils.time import nanoseconds_to_str
|
||||
from pipecat.utils.utils import obj_count, obj_id
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, NotGiven
|
||||
from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage, NotGiven
|
||||
from pipecat.processors.frame_processor import FrameProcessor
|
||||
from pipecat.services.settings import ServiceSettings
|
||||
from pipecat.utils.context.llm_context_summarization import LLMContextSummaryConfig
|
||||
@@ -462,137 +461,6 @@ class LLMContextAssistantTimestampFrame(DataFrame):
|
||||
timestamp: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class TranscriptionMessage:
|
||||
"""A message in a conversation transcript.
|
||||
|
||||
A message in a conversation transcript containing the role and content.
|
||||
Messages are in standard format with roles normalized to user/assistant.
|
||||
|
||||
Parameters:
|
||||
role: The role of the message sender (user or assistant).
|
||||
content: The message content/text.
|
||||
user_id: Optional identifier for the user.
|
||||
timestamp: Optional timestamp when the message was created.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
`TranscriptionMessage` is deprecated and will be removed in a future version.
|
||||
Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.
|
||||
"""
|
||||
|
||||
role: Literal["user", "assistant"]
|
||||
content: str
|
||||
user_id: Optional[str] = None
|
||||
timestamp: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"TranscriptionMessage is deprecated and will be removed in a future version. "
|
||||
"Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThoughtTranscriptionMessage:
|
||||
"""An LLM thought message in a conversation transcript.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
`ThoughtTranscriptionMessage` is deprecated and will be removed in a future version.
|
||||
Use `LLMAssistantAggregator`'s new events instead.
|
||||
"""
|
||||
|
||||
role: Literal["assistant"] = field(default="assistant", init=False)
|
||||
content: str
|
||||
timestamp: Optional[str] = None
|
||||
|
||||
def __post_init__(self):
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"ThoughtTranscriptionMessage is deprecated and will be removed in a future version. "
|
||||
"Use `LLMAssistantAggregator`'s new events instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TranscriptionUpdateFrame(DataFrame):
|
||||
"""Frame containing new messages added to conversation transcript.
|
||||
|
||||
A frame containing new messages added to the conversation transcript.
|
||||
This frame is emitted when new messages are added to the conversation history,
|
||||
containing only the newly added messages rather than the full transcript.
|
||||
Messages have normalized roles (user/assistant) regardless of the LLM service used.
|
||||
Messages are always in the OpenAI standard message format, which supports both:
|
||||
|
||||
Examples:
|
||||
Simple format::
|
||||
|
||||
[
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Hi, how are you?"
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "Great! And you?"
|
||||
}
|
||||
]
|
||||
|
||||
Content list format::
|
||||
|
||||
[
|
||||
{
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": "Hi, how are you?"}]
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "Great! And you?"}]
|
||||
}
|
||||
]
|
||||
|
||||
OpenAI supports both formats. Anthropic and Google messages are converted to the
|
||||
content list format.
|
||||
|
||||
Parameters:
|
||||
messages: List of new transcript messages that were added.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
`TranscriptionUpdateFrame` is deprecated and will be removed in a future version.
|
||||
Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.
|
||||
"""
|
||||
|
||||
messages: List[TranscriptionMessage | ThoughtTranscriptionMessage]
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"TranscriptionUpdateFrame is deprecated and will be removed in a future version. "
|
||||
"Use `LLMUserAggregator`'s and `LLMAssistantAggregator`'s new events instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
pts = format_pts(self.pts)
|
||||
return f"{self.name}(pts: {pts}, messages: {len(self.messages)})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMContextFrame(Frame):
|
||||
"""Frame containing a universal LLM context.
|
||||
@@ -719,6 +587,23 @@ class LLMMessagesUpdateFrame(DataFrame):
|
||||
run_llm: Optional[bool] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMMessagesTransformFrame(DataFrame):
|
||||
"""Frame containing a transform function to modify the current context's LLM messages.
|
||||
|
||||
A frame containing a transform function that takes the context's current list
|
||||
of LLM messages and returns a modified list.
|
||||
|
||||
Parameters:
|
||||
transform: A function that takes a list of messages and returns a
|
||||
modified list.
|
||||
run_llm: Whether the context update should be sent to the LLM.
|
||||
"""
|
||||
|
||||
transform: Callable[[List["LLMContextMessage"]], List["LLMContextMessage"]]
|
||||
run_llm: Optional[bool] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class LLMSetToolsFrame(DataFrame):
|
||||
"""Frame containing tools for LLM function calling.
|
||||
@@ -778,10 +663,14 @@ class FunctionCallResultProperties:
|
||||
Parameters:
|
||||
run_llm: Whether to run the LLM after receiving this result.
|
||||
on_context_updated: Callback to execute when context is updated.
|
||||
is_final: Whether this is the final result for the function call. When
|
||||
``False`` the result is treated as an intermediate update. Defaults to ``True``.
|
||||
Only meaningful for async function calls (``cancel_on_interruption=False``).
|
||||
"""
|
||||
|
||||
run_llm: Optional[bool] = None
|
||||
on_context_updated: Optional[Callable[[], Awaitable[None]]] = None
|
||||
is_final: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -841,26 +730,66 @@ class OutputTransportMessageFrame(DataFrame):
|
||||
|
||||
@dataclass
|
||||
class DTMFFrame:
|
||||
"""Base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
|
||||
"""Marker base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
|
||||
|
||||
Parameters:
|
||||
button: The DTMF keypad entry that was pressed.
|
||||
Used only as a shared tag so that both input and output DTMF frames can
|
||||
be identified via ``isinstance(frame, DTMFFrame)``. The concrete frames
|
||||
define their own fields.
|
||||
"""
|
||||
|
||||
button: KeypadEntry
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class OutputDTMFFrame(DTMFFrame, DataFrame):
|
||||
"""DTMF keypress output frame for transport queuing.
|
||||
|
||||
A DTMF keypress output that will be queued. If your transport supports
|
||||
multiple dial-out destinations, use the `transport_destination` field to
|
||||
specify where the DTMF keypress should be sent.
|
||||
Parameters:
|
||||
button: Convenience shortcut for sending a single DTMF keypad
|
||||
entry. Equivalent to ``buttons=[button]``. If both ``buttons``
|
||||
and ``button`` are provided, ``buttons`` takes precedence.
|
||||
buttons: Sequence of one or more DTMF keypad buttons to send. Use
|
||||
:meth:`from_string` to build this from a string like ``"123#"``.
|
||||
"""
|
||||
|
||||
button: KeypadEntry | None = None
|
||||
buttons: list[KeypadEntry] | None = None
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
if self.buttons is None and self.button is not None:
|
||||
self.buttons = [self.button]
|
||||
if not self.buttons:
|
||||
raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(tone: {self.button})"
|
||||
return f"{self.name}(buttons: {self.to_string()})"
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, buttons: str, **kwargs) -> "OutputDTMFFrame":
|
||||
"""Build an ``OutputDTMFFrame`` from a string of DTMF characters.
|
||||
|
||||
Args:
|
||||
buttons: A string like ``"123#"``. Each character must be a
|
||||
valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value.
|
||||
**kwargs: Additional keyword arguments forwarded to the frame
|
||||
constructor.
|
||||
|
||||
Returns:
|
||||
A frame of type ``cls`` with ``buttons`` populated as a list of
|
||||
:class:`~pipecat.audio.dtmf.types.KeypadEntry`.
|
||||
"""
|
||||
return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
|
||||
|
||||
def to_string(self) -> str:
|
||||
"""Return the frame's ``buttons`` as a dial string.
|
||||
|
||||
Returns:
|
||||
A string such as ``"123#"`` formed by concatenating the values
|
||||
of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in
|
||||
``buttons``, or an empty string if ``buttons`` is not set.
|
||||
"""
|
||||
return "".join(b.value for b in self.buttons) if self.buttons else ""
|
||||
|
||||
|
||||
#
|
||||
@@ -878,30 +807,18 @@ class StartFrame(SystemFrame):
|
||||
Parameters:
|
||||
audio_in_sample_rate: Input audio sample rate in Hz.
|
||||
audio_out_sample_rate: Output audio sample rate in Hz.
|
||||
allow_interruptions: Whether to allow user interruptions.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
Use `LLMUserAggregator`'s new `user_mute_strategies` parameter instead.
|
||||
|
||||
enable_metrics: Whether to enable performance metrics collection.
|
||||
enable_tracing: Whether to enable OpenTelemetry tracing.
|
||||
enable_usage_metrics: Whether to enable usage metrics collection.
|
||||
interruption_strategies: List of interruption handling strategies.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
Use `LLMUserAggregator`'s new `user_turn_strategies` parameter instead.
|
||||
|
||||
report_only_initial_ttfb: Whether to report only initial time-to-first-byte.
|
||||
tracing_context: Pipeline-scoped tracing context for span hierarchy.
|
||||
"""
|
||||
|
||||
audio_in_sample_rate: int = 16000
|
||||
audio_out_sample_rate: int = 24000
|
||||
allow_interruptions: bool = False
|
||||
enable_metrics: bool = False
|
||||
enable_tracing: bool = False
|
||||
enable_usage_metrics: bool = False
|
||||
interruption_strategies: List[BaseInterruptionStrategy] = field(default_factory=list)
|
||||
report_only_initial_ttfb: bool = False
|
||||
tracing_context: Optional["TracingContext"] = None
|
||||
|
||||
@@ -1010,16 +927,9 @@ class UserStartedSpeakingFrame(SystemFrame):
|
||||
|
||||
Emitted when the user turn starts, which usually means that some
|
||||
transcriptions are already available.
|
||||
|
||||
Parameters:
|
||||
emulated: Whether this event was emulated rather than detected by VAD.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
This field is deprecated and will be removed in a future version.
|
||||
|
||||
"""
|
||||
|
||||
emulated: bool = False
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -1028,16 +938,9 @@ class UserStoppedSpeakingFrame(SystemFrame):
|
||||
|
||||
Emitted when the user turn ends. This usually coincides with the start of
|
||||
the bot turn.
|
||||
|
||||
Parameters:
|
||||
emulated: Whether this event was emulated rather than detected by VAD.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
This field is deprecated and will be removed in a future version.
|
||||
|
||||
"""
|
||||
|
||||
emulated: bool = False
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -1072,56 +975,6 @@ class UserSpeakingFrame(SystemFrame):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmulateUserStartedSpeakingFrame(SystemFrame):
|
||||
"""Frame to emulate user started speaking behavior.
|
||||
|
||||
Emitted by internal processors upstream to emulate VAD behavior when a
|
||||
user starts speaking.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"EmulateUserStartedSpeakingFrame is deprecated and will be removed in a future version.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EmulateUserStoppedSpeakingFrame(SystemFrame):
|
||||
"""Frame to emulate user stopped speaking behavior.
|
||||
|
||||
Emitted by internal processors upstream to emulate VAD behavior when a
|
||||
user stops speaking.
|
||||
|
||||
.. deprecated:: 0.0.99
|
||||
This frame is deprecated and will be removed in a future version.
|
||||
"""
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"EmulateUserStoppedSpeakingFrame is deprecated and will be removed in a future version.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VADUserStartedSpeakingFrame(SystemFrame):
|
||||
"""Frame emitted when VAD definitively detects user started speaking.
|
||||
@@ -1300,7 +1153,6 @@ class UserImageRequestFrame(SystemFrame):
|
||||
function_name: Name of function that generated this request (if any).
|
||||
tool_call_id: Tool call ID if generated by function call (if any).
|
||||
result_callback: Optional callback to invoke when the image is retrieved.
|
||||
context: [DEPRECATED] Optional context for the image request.
|
||||
"""
|
||||
|
||||
user_id: str
|
||||
@@ -1310,21 +1162,6 @@ class UserImageRequestFrame(SystemFrame):
|
||||
function_name: Optional[str] = None
|
||||
tool_call_id: Optional[str] = None
|
||||
result_callback: Optional[Any] = None
|
||||
context: Optional[Any] = None
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
|
||||
if self.context:
|
||||
import warnings
|
||||
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter("always")
|
||||
warnings.warn(
|
||||
"`UserImageRequestFrame` field `context` is deprecated.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(user: {self.user_id}, text: {self.text}, append_to_context: {self.append_to_context}, {self.video_source})"
|
||||
@@ -1435,7 +1272,13 @@ class AssistantImageRawFrame(OutputImageRawFrame):
|
||||
|
||||
@dataclass
|
||||
class InputDTMFFrame(DTMFFrame, SystemFrame):
|
||||
"""DTMF keypress input frame from transport."""
|
||||
"""DTMF keypress input frame from transport.
|
||||
|
||||
Parameters:
|
||||
button: The DTMF keypad entry that was pressed.
|
||||
"""
|
||||
|
||||
button: KeypadEntry
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(tone: {self.button.value})"
|
||||
@@ -1445,12 +1288,52 @@ class InputDTMFFrame(DTMFFrame, SystemFrame):
|
||||
class OutputDTMFUrgentFrame(DTMFFrame, SystemFrame):
|
||||
"""DTMF keypress output frame for immediate sending.
|
||||
|
||||
A DTMF keypress output that will be sent right away. If your transport
|
||||
supports multiple dial-out destinations, use the `transport_destination`
|
||||
field to specify where the DTMF keypress should be sent.
|
||||
Parameters:
|
||||
button: Convenience shortcut for sending a single DTMF keypad
|
||||
entry. Equivalent to ``buttons=[button]``. If both ``buttons``
|
||||
and ``button`` are provided, ``buttons`` takes precedence.
|
||||
buttons: Sequence of one or more DTMF keypad buttons to send. Use
|
||||
:meth:`from_string` to build this from a string like ``"123#"``.
|
||||
"""
|
||||
|
||||
pass
|
||||
button: KeypadEntry | None = None
|
||||
buttons: list[KeypadEntry] | None = None
|
||||
|
||||
def __post_init__(self):
|
||||
super().__post_init__()
|
||||
if self.buttons is None and self.button is not None:
|
||||
self.buttons = [self.button]
|
||||
if not self.buttons:
|
||||
raise ValueError(f"{self.__class__.__name__} requires `buttons` or `button` to be set")
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.name}(buttons: {self.to_string()})"
|
||||
|
||||
@classmethod
|
||||
def from_string(cls, buttons: str, **kwargs) -> "OutputDTMFUrgentFrame":
|
||||
"""Build an ``OutputDTMFUrgentFrame`` from a string of DTMF characters.
|
||||
|
||||
Args:
|
||||
buttons: A string like ``"123#"``. Each character must be a
|
||||
valid :class:`~pipecat.audio.dtmf.types.KeypadEntry` value.
|
||||
**kwargs: Additional keyword arguments forwarded to the frame
|
||||
constructor.
|
||||
|
||||
Returns:
|
||||
A frame of type ``cls`` with ``buttons`` populated as a list of
|
||||
:class:`~pipecat.audio.dtmf.types.KeypadEntry`.
|
||||
"""
|
||||
return cls(buttons=[KeypadEntry(c) for c in buttons], **kwargs)
|
||||
|
||||
def to_string(self) -> str:
|
||||
"""Return the frame's ``buttons`` as a dial string.
|
||||
|
||||
Returns:
|
||||
A string such as ``"123#"`` formed by concatenating the values
|
||||
of each :class:`~pipecat.audio.dtmf.types.KeypadEntry` in
|
||||
``buttons``, or an empty string if ``buttons`` is not set.
|
||||
"""
|
||||
return "".join(b.value for b in self.buttons) if self.buttons else ""
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -1850,12 +1733,19 @@ class FunctionCallInProgressFrame(ControlFrame, UninterruptibleFrame):
|
||||
tool_call_id: Unique identifier for this function call.
|
||||
arguments: Arguments passed to the function.
|
||||
cancel_on_interruption: Whether to cancel this call if interrupted.
|
||||
When ``False`` the call is treated as asynchronous: the LLM
|
||||
continues the conversation immediately without waiting for the
|
||||
result, and the result is injected later via a developer message.
|
||||
group_id: Identifier shared by all function calls originating from the
|
||||
same LLM response batch. Used to determine when the last call in a
|
||||
group completes so the LLM can be triggered exactly once.
|
||||
"""
|
||||
|
||||
function_name: str
|
||||
tool_call_id: str
|
||||
arguments: Any
|
||||
cancel_on_interruption: bool = False
|
||||
group_id: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -1,109 +0,0 @@
|
||||
#
|
||||
# Copyright (c) 2024-2026, Daily
|
||||
#
|
||||
# SPDX-License-Identifier: BSD 2-Clause License
|
||||
#
|
||||
|
||||
"""Observer for measuring user-to-bot response latency.
|
||||
|
||||
.. deprecated:: 0.0.102
|
||||
This module is deprecated. Use :class:`UserBotLatencyObserver` directly
|
||||
with its ``on_latency_measured`` event handler instead.
|
||||
"""
|
||||
|
||||
import time
|
||||
import warnings
|
||||
from statistics import mean
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from pipecat.frames.frames import (
|
||||
BotStartedSpeakingFrame,
|
||||
CancelFrame,
|
||||
EndFrame,
|
||||
VADUserStartedSpeakingFrame,
|
||||
VADUserStoppedSpeakingFrame,
|
||||
)
|
||||
from pipecat.observers.base_observer import BaseObserver, FramePushed
|
||||
from pipecat.processors.frame_processor import FrameDirection
|
||||
|
||||
|
||||
class UserBotLatencyLogObserver(BaseObserver):
|
||||
"""Observer that measures time between user stopping speech and bot starting speech.
|
||||
|
||||
This helps measure how quickly the AI services respond by tracking
|
||||
conversation turn timing and logging latency metrics.
|
||||
|
||||
.. deprecated:: 0.0.102
|
||||
This class is deprecated. Use :class:`UserBotLatencyObserver` directly
|
||||
with its ``on_latency_measured`` event handler for custom logging.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize the latency observer.
|
||||
|
||||
Sets up tracking for processed frames and user speech timing
|
||||
to calculate response latencies.
|
||||
|
||||
.. deprecated:: 0.0.102
|
||||
This class is deprecated. Use :class:`UserBotLatencyObserver`
|
||||
directly with its ``on_latency_measured`` event handler.
|
||||
"""
|
||||
warnings.warn(
|
||||
"UserBotLatencyLogObserver is deprecated and will be removed in a future version. "
|
||||
"Use UserBotLatencyObserver directly with its on_latency_measured event handler instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
super().__init__()
|
||||
self._user_bot_latency_processed_frames = set()
|
||||
self._user_stopped_time = 0
|
||||
self._latencies = []
|
||||
|
||||
async def on_push_frame(self, data: FramePushed):
|
||||
"""Process frames to track speech timing and calculate latency.
|
||||
|
||||
Args:
|
||||
data: Frame push event containing the frame and direction information.
|
||||
"""
|
||||
# Only process downstream frames
|
||||
if data.direction != FrameDirection.DOWNSTREAM:
|
||||
return
|
||||
|
||||
# Skip already processed frames
|
||||
if data.frame.id in self._user_bot_latency_processed_frames:
|
||||
return
|
||||
|
||||
self._user_bot_latency_processed_frames.add(data.frame.id)
|
||||
|
||||
if isinstance(data.frame, VADUserStartedSpeakingFrame):
|
||||
self._user_stopped_time = 0
|
||||
elif isinstance(data.frame, VADUserStoppedSpeakingFrame):
|
||||
self._user_stopped_time = data.frame.timestamp - data.frame.stop_secs
|
||||
elif isinstance(data.frame, (EndFrame, CancelFrame)):
|
||||
self._log_summary()
|
||||
elif isinstance(data.frame, BotStartedSpeakingFrame) and self._user_stopped_time:
|
||||
latency = time.time() - self._user_stopped_time
|
||||
self._user_stopped_time = 0
|
||||
self._latencies.append(latency)
|
||||
self._log_latency(latency)
|
||||
|
||||
def _log_summary(self):
|
||||
if not self._latencies:
|
||||
return
|
||||
avg_latency = mean(self._latencies)
|
||||
min_latency = min(self._latencies)
|
||||
max_latency = max(self._latencies)
|
||||
logger.info(
|
||||
f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING - Avg: {avg_latency:.3f}s, Min: {min_latency:.3f}s, Max: {max_latency:.3f}s"
|
||||
)
|
||||
|
||||
def _log_latency(self, latency: float):
|
||||
"""Log the latency.
|
||||
|
||||
Args:
|
||||
latency: The latency to log.
|
||||
"""
|
||||
logger.debug(
|
||||
f"⏱️ LATENCY FROM USER STOPPED SPEAKING TO BOT STARTED SPEAKING: {latency:.3f}s"
|
||||
)
|
||||
@@ -90,7 +90,7 @@ class PipelineRunner(BaseObject):
|
||||
await self._sig_task
|
||||
|
||||
if self._force_gc:
|
||||
self._gc_collect()
|
||||
await self._gc_collect()
|
||||
|
||||
logger.debug(f"Runner {self} finished running {task}")
|
||||
|
||||
@@ -136,8 +136,8 @@ class PipelineRunner(BaseObject):
|
||||
logger.warning(f"Interruption detected. Cancelling runner {self}")
|
||||
await self.cancel()
|
||||
|
||||
def _gc_collect(self):
|
||||
async def _gc_collect(self):
|
||||
"""Force garbage collection and log results."""
|
||||
collected = gc.collect()
|
||||
collected = await asyncio.to_thread(gc.collect)
|
||||
logger.debug(f"Garbage collector: collected {collected} objects.")
|
||||
logger.debug(f"Garbage collector: uncollectable objects {gc.garbage}")
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user