Compare commits

..

171 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
fdaa4e476e Merge pull request #2830 from pipecat-ai/aleix/pipecat-0.0.90
update CHANGELOG for 0.0.90
2025-10-10 10:22:26 -07:00
Aleix Conchillo Flaqué
502e7e42a7 update CHANGELOG for 0.0.90 2025-10-10 10:20:19 -07:00
kompfner
2ab3d4fb42 Merge pull request #2834 from pipecat-ai/pk/update-vertex-disclaimer
Update a Google Vertex disclaimer for accuracy
2025-10-10 13:19:51 -04:00
Paul Kompfner
55014bdd77 Update a Google Vertex disclaimer for accuracy 2025-10-10 13:18:03 -04:00
kompfner
334796bd65 Merge pull request #2833 from pipecat-ai/pk/vertex-non-optional-location
`location` should not be optional when using Google Vertex.
2025-10-10 13:02:40 -04:00
Paul Kompfner
1c25b6fb72 location should not be optional when using Google Vertex.
Also, update `GoogleVertexLLMService` initialization pattern in the example file.
2025-10-10 12:58:45 -04:00
Mark Backman
91b29de7ca Merge pull request #2832 from pipecat-ai/mb/docs-fixes-0.0.90
Docs fixes for 0.0.90 release
2025-10-10 12:46:40 -04:00
Mark Backman
21d610cd30 Docs fixes for 0.0.90 release 2025-10-10 12:43:31 -04:00
Mark Backman
f7fe673ad1 Merge pull request #2831 from pipecat-ai/mb/update-evals
Update release evals for OpenAI Realtime, Gemini Live Vertex; shorten…
2025-10-10 12:34:27 -04:00
Mark Backman
4b415721e2 Update release evals for OpenAI Realtime, Gemini Live Vertex; shorten 26 foundational names 2025-10-10 12:26:23 -04:00
kompfner
8d2a98e0e7 Merge pull request #2829 from pipecat-ai/pk/fix-gemini-live-deprecation-messages
Fix deprecation messages pointing users to the new import paths for G…
2025-10-10 10:42:15 -04:00
Paul Kompfner
523e890c8c Fix deprecation messages pointing users to the new import paths for Gemini Live 2025-10-10 10:30:38 -04:00
kompfner
3c748fe772 Merge pull request #2823 from pipecat-ai/pk/vertex-init-args-fixup
Move `location` and `project_id` out of `InputParams` in `GoogleVerte…
2025-10-10 10:18:51 -04:00
kompfner
d293cee372 Merge pull request #2822 from pipecat-ai/pk/make-pause-processing-frames-more-robust
Make `pause_processing_frames()` and `pause_processing_system_frames(…
2025-10-10 10:16:27 -04:00
Paul Kompfner
8b62a96878 Improve how we're deprecating location and project_id in GoogleVertexLLMService, allowing user code to (correctly) continue referring to GoogleVertexLLMService.InputParams.
Also fix the slightly wrong (but so far harmless) pattern of initializing `OpenAILLMService.InputParams()` in the `GoogleVertexLLMService` if `params` wasn't provided—we should be letting the superclass decide what to do if the argument isn't specified.
2025-10-10 10:12:00 -04:00
Mark Backman
0c102ce70b Merge pull request #2826 from pipecat-ai/mb/deprecate-livekit-frame-serializer
Deprecate LivekitFrameSerializer
2025-10-10 10:01:45 -04:00
Mark Backman
3894d2a4b9 Deprecate LivekitFrameSerializer 2025-10-10 09:51:57 -04:00
Aleix Conchillo Flaqué
1f6b61c0db Merge pull request #2828 from pipecat-ai/aleix/gemini-live-gemini-to-llm
google: rename google.gemini_live.gemini to google.gemini_live.llm
2025-10-10 06:42:51 -07:00
Aleix Conchillo Flaqué
8ee28b37cd google: rename google.gemini_live.vertext to google.gemini_live.llm_vertex 2025-10-10 06:41:19 -07:00
Filipi da Silva Fuchter
e85e7e4d84 Merge pull request #2773 from pipecat-ai/filipi/krisp_viva
Added audio filter `KrispVivaFilter` using the Krisp VIVA SDK.
2025-10-10 09:51:15 -03:00
Filipi Fuchter
1b3afb5511 Added audio filter KrispVivaFilter using the Krisp VIVA SDK 2025-10-10 09:44:47 -03:00
Aleix Conchillo Flaqué
7cec013666 google: rename google.gemini_live.gemini to google.gemini_live.llm 2025-10-09 22:20:09 -07:00
Aleix Conchillo Flaqué
86127167fb Merge pull request #2827 from pipecat-ai/aleix/openai-realtime-move
move openai_realtime to openai.realtime
2025-10-09 22:18:04 -07:00
Aleix Conchillo Flaqué
9935a68018 examples(19b): fix deprecations 2025-10-09 22:14:52 -07:00
Aleix Conchillo Flaqué
5679dde70f ai_service: use openai.realtime.events instead of openai_realtime_beta.events 2025-10-09 22:14:46 -07:00
Aleix Conchillo Flaqué
d81b0f6368 update CHANGELOG with openai_realtime deprecation 2025-10-09 22:14:46 -07:00
Aleix Conchillo Flaqué
9698b008da deprecate openai_realtime 2025-10-09 22:14:46 -07:00
Aleix Conchillo Flaqué
7b05c9283b move openai.realtime.azure to azure.realtime.llm 2025-10-09 22:14:46 -07:00
Aleix Conchillo Flaqué
303dd2ec35 move openai.realtime.openai to openai.realtime.llm 2025-10-09 22:14:46 -07:00
Aleix Conchillo Flaqué
aa6e81648a move openai_realtime to openai.realtime 2025-10-09 22:14:46 -07:00
Aleix Conchillo Flaqué
1a87870ef3 Merge pull request #2825 from pipecat-ai/aleix/aws-nova-sonic-move
move aws_nova_sonic to aws.nova_sonic
2025-10-09 18:37:46 -07:00
Aleix Conchillo Flaqué
aac4ce2d12 update CHANGELOG with aws_nova_sonic deprecation 2025-10-09 18:32:26 -07:00
Aleix Conchillo Flaqué
2a79b2c853 aws: deprecate aws_nova_sonic 2025-10-09 17:44:29 -07:00
Aleix Conchillo Flaqué
15bf5b1533 aws: move aws_nova_sonic to aws.nova_sonic 2025-10-09 17:35:47 -07:00
Aleix Conchillo Flaqué
cdc86db8ce update CHANGELOG with GoogleVertexLLMService token fix 2025-10-09 16:58:22 -07:00
Aleix Conchillo Flaqué
9d2ad750b5 Merge pull request #2779 from LucasStringPay/patch-1
Ignore None value for 'completion_tokens' or similar for Gemini
2025-10-09 16:55:33 -07:00
Aleix Conchillo Flaqué
19ceb1a48f Merge pull request #2817 from pipecat-ai/aleix/runner-download-folder
runner: add --folder argument to allow file downloads
2025-10-09 16:55:17 -07:00
Aleix Conchillo Flaqué
59217eae38 runner: add --folder argument to allow file downloads 2025-10-09 16:49:51 -07:00
Aleix Conchillo Flaqué
bea0aee835 Merge pull request #2824 from pipecat-ai/aleix/gemini-under-google
google: move gemini_live inside google service
2025-10-09 16:40:15 -07:00
Aleix Conchillo Flaqué
aeace9b9be google: move gemini_live inside google service 2025-10-09 16:06:42 -07:00
Paul Kompfner
2994640f47 Move location and project_id out of InputParams in GoogleVertexLLMService, making them top-level init args instead. We do this for two reasons:
- Conceptually, these args comprise project-level setup, akin to credentials, whereas everything in `InputParams` is concerned with model configuration
- Providing a `project_id` when initializing `GoogleVertexLLMService` should not be optional, but prior to the change in this commit it was (erroneously) treated as optional by dint of `InputParams` being optional

This improvement was discussed [in this comment](https://github.com/pipecat-ai/pipecat/pull/2795#discussion_r2408279142).
2025-10-09 16:53:21 -04:00
Paul Kompfner
10069719e4 Make pause_processing_frames() and pause_processing_system_frames() more robust in FrameProcessor.
To understand this fix, let's look exclusively at `pause_processing_frames()` (`pause_processing_system_frames()` works the same way).

`pause_processing_frames()` works by setting a `__should_block_frames` flag, which is then read each time through the loop in the long-running `__process_frame_task_handler`. if `__should_block_frames` is `True`, it pauses processing frames until it's resumed.

Prior to this fix, the check for `__should_block_frames` was before `await self.__process_queue.get()`. The problem is that a lot of the time spent in the loop is waiting for a frame from the process queue. So if `pause_processing_frames()` is set at any time other than within `process_frame()` itself, it actually won't have an effect by the next frame, only on the frame *after* the next, which is later than intended.

Because thus far in the Pipecat codebase we've only ever called `pause_processing_frames()` and `pause_processing_system_frames()` from within `process_frame()`, this change should have no behavioral effect. But it will be helpful if we ever need to call it from anywhere else. I noticed this issue while developing a feature that did exactly that (though I later abandoned that code).
2025-10-09 15:57:31 -04:00
kompfner
046b76df60 Merge pull request #2820 from pipecat-ai/pk/gemini-live-vertex-support
Support Gemini Live + Vertex AI
2025-10-09 11:53:41 -04:00
Paul Kompfner
f2d9063984 Renames: remove "multimodal" from Gemini Live types 2025-10-09 10:58:36 -04:00
Paul Kompfner
99f008e927 Make a note in our examples that there's an issue with Gemini Live + Vertex around specifying a modality other than AUDIO 2025-10-08 21:03:07 -04:00
Paul Kompfner
2699f0c2a6 Fix tool calls when using Gemini Live + Vertex AI 2025-10-08 21:03:07 -04:00
Paul Kompfner
0b6dd98000 Make a note in our examples that there's an issue with Gemini Live + Vertex around using "google_search" alongside other tools 2025-10-08 21:03:07 -04:00
Paul Kompfner
a14fb20d15 Fix Gemini Live w/Vertex AI not being able to handle an empty list provided for "function_declarations" 2025-10-08 21:03:07 -04:00
Paul Kompfner
728361a6a7 Add GeminiVertexMultimodalLiveLLMService 2025-10-08 21:03:01 -04:00
kompfner
106db69e8e Merge pull request #2816 from pipecat-ai/pk/gemini-live-await-ongoing-response-after-endframe
Implement ending `GeminiMultimodalLiveLLMService` gracefully (i.e. af…
2025-10-08 17:20:14 -04:00
Paul Kompfner
cf90071926 Format fix 2025-10-08 17:19:46 -04:00
Paul Kompfner
deaeb75a1f Fix changelog after rebase (and add a missing item) 2025-10-08 17:16:31 -04:00
Paul Kompfner
a666327d70 Implement ending GeminiMultimodalLiveLLMService gracefully (i.e. after the bot is finished) 2025-10-08 17:13:04 -04:00
kompfner
13a0522546 Merge pull request #2804 from pipecat-ai/pk/gemini-live-session-resumption
Add (relatively spartan) reconnection logic to `GeminiMultimodalLiveLLMService`
2025-10-08 17:10:45 -04:00
Paul Kompfner
7da37a0d1f Pull _connection_established_threshold and _max_consecutive_failures into file-level constants 2025-10-08 17:04:05 -04:00
Paul Kompfner
7efb22a323 Add (relatively spartan) reconnection logic to GeminiMultimodalLiveLLMService, leveraging the Gemini Live session resumption mechanism 2025-10-08 16:53:21 -04:00
kompfner
8084e2f909 Merge pull request #2776 from pipecat-ai/pk/gemini-live-gen-ai-library
Gemini Live service uses the `genai` library rather than WebSockets directly
2025-10-08 16:50:16 -04:00
Paul Kompfner
86127c6a6e Add to the changelog the GeminiMultimodalLiveLLMService update to use google-genai 2025-10-08 16:46:41 -04:00
Paul Kompfner
402e019ae2 Make a bit of code clearer 2025-10-08 16:45:55 -04:00
Paul Kompfner
f09e4e238b Fix some mishandling of enum values 2025-10-08 16:45:55 -04:00
Paul Kompfner
2921162b3b Add deprecation warning around importing StartSensitivity and EndSensitivity from pipecat.services.gemini_multimodal_live.events 2025-10-08 16:45:55 -04:00
Paul Kompfner
ac1582c906 Let users directly use google-genai types rather than aliased re-exported types 2025-10-08 16:45:55 -04:00
Paul Kompfner
e4b01a5844 Bumping deprecation version of GeminiMultimodalLiveLLMService's base_url arg 2025-10-08 16:45:55 -04:00
Paul Kompfner
fa663abbbc Add CHANGELOG entry for new GeminiMultimodalLiveLLMService configuration options 2025-10-08 16:45:55 -04:00
Paul Kompfner
d19e6111c3 Bumping deprecation version of GeminiMultimodalLiveLLMService's base_url arg 2025-10-08 16:45:55 -04:00
Paul Kompfner
8a6d504a7e Add enable_affective_dialog and proactivity settings to GeminiMultimodalLiveLLMService 2025-10-08 16:45:55 -04:00
Paul Kompfner
43915937f2 Update how we import and re-export some types in GeminiMultimodalLiveLLMService 2025-10-08 16:45:55 -04:00
Paul Kompfner
48e92a22fe Add thinking settings to GeminiMultimodalLiveLLMService 2025-10-08 16:45:55 -04:00
Paul Kompfner
566af6b0b8 Minor comment improvement 2025-10-08 16:45:55 -04:00
Paul Kompfner
12e7613d5f Deprecate the base_url argument to GeminiMultimodalLiveLLMService.
It expected a WebSocket URL, but we're no longer (directly) using WebSockets to talk to Gemini. Instead of trying to (potentially erroneously) map a given custom WebSocket URL to an `HttpOptions` object (the new preferred way of customizing requests made by the Gemini API client), we're simply deprecating `base_url` and pointing users to the `http_options` argument instead.
2025-10-08 16:45:55 -04:00
Paul Kompfner
04a68f2c57 Fix tracing in GeminiMultimodalLiveLLMService 2025-10-08 16:45:55 -04:00
Paul Kompfner
9b4ca12f49 Revert to only supporting providing a single modality - looks like specifying a list of modalities results in an API error.
Also, fix some missing `await`s in error handling.
2025-10-08 16:45:55 -04:00
Paul Kompfner
453ce715a6 Add some error handling to GeminiMultimodalLiveLLMService 2025-10-08 16:45:55 -04:00
Paul Kompfner
d87b6189ba Update GeminiMultimodalLiveLLMService to use the google-genai library, which is the new recommended approach for interfacing with Gemini Live. 2025-10-08 16:45:55 -04:00
Mark Backman
8293347b77 Merge pull request #2814 from pipecat-ai/mb/openai-service-tier
Add service_tier to BaseOpenAILLMService
2025-10-08 16:44:27 -04:00
Mark Backman
c85a3f0b94 Add service_tier to BaseOpenAILLMService 2025-10-08 16:33:36 -04:00
Aleix Conchillo Flaqué
233fb25e6c Merge pull request #2810 from pipecat-ai/aleix/on-pipeline-error
PipelineTask: add on_pipeline_error event
2025-10-08 11:26:46 -07:00
Aleix Conchillo Flaqué
080978daa6 Merge pull request #2808 from pipecat-ai/aleix/readme-pipecat-tv
README: add Pipecat TV reference
2025-10-08 11:26:17 -07:00
Aleix Conchillo Flaqué
62b7c3d3b2 PipelineTask: add on_pipeline_error event 2025-10-07 18:36:38 -07:00
Mark Backman
4b2379cba8 Merge pull request #2798 from ivaaan/hume-rtvi
Hume add RTVI
2025-10-07 21:20:50 -04:00
Aleix Conchillo Flaqué
92087bdfa8 update CHANGELOG 2025-10-07 18:08:18 -07:00
Aleix Conchillo Flaqué
617919ac09 Merge pull request #2809 from pipecat-ai/aleix/revert-interruption-strategies-ordering
revert interruption strategies ordering
2025-10-07 18:07:07 -07:00
Aleix Conchillo Flaqué
0669daec3d update CHANGELOG for 0.0.89 2025-10-07 17:44:10 -07:00
Aleix Conchillo Flaqué
7c15a8c800 Revert "fix context order when using interruption strategies"
This reverts commit de8ee96927.
2025-10-07 17:42:35 -07:00
Aleix Conchillo Flaqué
066b77fba0 README: add Pipecat TV reference 2025-10-07 15:01:28 -07:00
Aleix Conchillo Flaqué
d9aef5f916 some last release CHANGELOG updates 2025-10-07 14:29:27 -07:00
Aleix Conchillo Flaqué
91ae3f8a9b Merge pull request #2807 from pipecat-ai/aleix/pipecat-0.0.88
update CHANGELOG for 0.0.88
2025-10-07 14:16:05 -07:00
Aleix Conchillo Flaqué
36da623352 update CHANGELOG for 0.0.88 2025-10-07 14:12:12 -07:00
Filipi da Silva Fuchter
31b9087ea6 Merge pull request #2805 from pipecat-ai/filipi/allowing_update_smallwebrtc_properties
Allowing to update smallwebrtc and whatsapp properties.
2025-10-07 17:57:26 -03:00
Mark Backman
1851fed22e Merge pull request #2806 from pipecat-ai/mb/deprecate-play-ht
Deprecate PlayHT TTS services
2025-10-07 16:44:53 -04:00
Mark Backman
eddce460da Deprecate PlayHT TTS services 2025-10-07 16:40:01 -04:00
Filipi Fuchter
da4f30cb6d Allowing to update smallwebrtc and whatsapp properties. 2025-10-07 17:28:14 -03:00
Mark Backman
250cf2d8f1 Merge pull request #2803 from pipecat-ai/mb/fix-11labs-stt-deprecation
Remove deprecation warning for ElevenLabsSTTService
2025-10-07 13:04:12 -04:00
Mark Backman
7bbdb4f991 Remove deprecation warning for ElevenLabsSTTService 2025-10-07 12:32:32 -04:00
Mark Backman
051c4782fb Merge pull request #2802 from pipecat-ai/mb/fix-aws-nova-sonic
Fix AWS Nova Sonic authentication
2025-10-07 10:46:03 -04:00
Mark Backman
b1ccec74b2 Fix AWS Nova Sonic authentication 2025-10-07 09:48:18 -04:00
Filipi da Silva Fuchter
92bf0d9eda Merge pull request #2794 from pipecat-ai/filipi/verifying_whatsapp_signature
Verifying WhatsApp signature to ensure the webhook request is from WhatsApp.
2025-10-07 08:57:47 -03:00
Aleix Conchillo Flaqué
f985550441 Merge pull request #2796 from pipecat-ai/aleix/fix-interruption-strategies-context-order
fix context order when using interruption strategies
2025-10-06 22:46:31 -07:00
Aleix Conchillo Flaqué
de8ee96927 fix context order when using interruption strategies 2025-10-06 22:43:01 -07:00
Aleix Conchillo Flaqué
2576d0f340 Merge pull request #2792 from pipecat-ai/aleix/google-nano-banana
GoogleLLMService: added support for image generation
2025-10-06 22:42:14 -07:00
ivaaan
f38f4711ac wip 2025-10-06 20:24:27 -07:00
ivaaan
c2f3ddd329 add RTVI to Hume 2025-10-06 19:41:31 -07:00
ivaaan
73ffe96228 add RTVI to Hume 2025-10-06 19:37:05 -07:00
Aleix Conchillo Flaqué
bd13a80da7 pyproject: update google dependencies 2025-10-06 17:38:08 -07:00
Aleix Conchillo Flaqué
312959f97e GoogleLLMService: update default model to gemini-2.5-flash 2025-10-06 17:38:08 -07:00
Aleix Conchillo Flaqué
fe168e3c68 GoogleLLMService: added support for image generation 2025-10-06 17:38:08 -07:00
Filipi Fuchter
28929a47f7 Verifying WhatsApp signature to ensure the webhook request is from WhatsApp. 2025-10-06 16:16:59 -03:00
Mark Backman
03f5defbc3 Merge pull request #2793 from pipecat-ai/mb/fix-flux-deprecation
Fix: Resolve Flux deprecation warning
2025-10-06 12:07:27 -04:00
Mark Backman
b216648315 Fix: Resolve Flux deprecation warning 2025-10-06 09:55:02 -04:00
Mark Backman
084b133a01 Merge pull request #2790 from pipecat-ai/add-security-md
Add SECURITY.md
2025-10-06 09:45:02 -04:00
Mark Backman
e589876176 Merge pull request #2786 from pipecat-ai/mb/nltk-download-error
Catch PermissionError when NLTK data can't be downloaded
2025-10-06 09:27:22 -04:00
Vanessa Pyne
a826313bf9 Add SECURITY.md 2025-10-05 13:24:47 -05:00
Mark Backman
49f44aa7c8 Catch PermissionError when NLTK data can't be downloaded 2025-10-04 08:41:32 -04:00
Mark Backman
64ceef9cf0 Merge pull request #2783 from pipecat-ai/mb/community-integrations-submission
Update to Community Integrations submission process
2025-10-03 12:41:13 -04:00
Mark Backman
cd6567c1f1 Update to Community Integrations submission process 2025-10-03 12:15:48 -04:00
Mark Backman
ac67ca1555 Merge pull request #2778 from pipecat-ai/mb/hume-cleanup
Tidying up the Hume example and service
2025-10-03 11:09:18 -04:00
mattie ruth backman
8d38994756 Transports now send InputTransportMessageFrames (not Urgent Frames) 2025-10-03 09:47:44 -04:00
LucasStringPay
607e3040d4 Ignore None 'completion_tokens' or similar
Similar as 144ea36c81 , reported in https://github.com/pipecat-ai/pipecat/issues/2207
2025-10-02 15:16:11 -07:00
Mark Backman
60604a9449 Tidying up the Hume example and service 2025-10-02 17:34:40 -04:00
Aleix Conchillo Flaqué
4abe4a6253 Merge pull request #2777 from pipecat-ai/aleix/readme-mention-tail
README: add tail terminal dashboard
2025-10-02 14:31:26 -07:00
Aleix Conchillo Flaqué
4c054af17b README: remove setup editor instructions 2025-10-02 14:30:31 -07:00
Aleix Conchillo Flaqué
dcba940d42 README: add tail terminal dashboard 2025-10-02 14:27:55 -07:00
Mark Backman
ad2adb0c58 Merge pull request #2518 from zgreathouse/hume-tts-service
Hume tts service
2025-10-02 17:26:39 -04:00
ivaaan
76923010b5 upd Hume version to 2 2025-10-02 13:57:07 -07:00
ivaaan
1b511557b2 upd evals 2025-10-02 13:48:30 -07:00
ivaaan
fdadb12933 upd Changelog 2025-10-02 13:46:22 -07:00
ivaaan
f1bbb7ba22 Regenerate uv.lock after resolving merge conflicts 2025-10-02 13:44:07 -07:00
ivaaan
c1492c5275 fixes based on markbackman review 2025-10-02 13:38:36 -07:00
ivaaan
4ffdabcfde add Hume example, small fixes 2025-10-02 13:38:36 -07:00
zach
b489de2fc3 adds hume tts service 2025-10-02 13:38:05 -07:00
zach
d9656cbb1a add hume sdk for hume tts service 2025-10-02 13:38:05 -07:00
zach
05fb223985 Add hume to .env.example 2025-10-02 13:34:37 -07:00
Mark Backman
62a5f07ad2 Merge pull request #2701 from pipecat-ai/mb/third-party-integrations
Add a third-party integrations guide
2025-10-02 15:59:38 -04:00
Mark Backman
b669e3a481 Update name to Community Integrations and streamline guide 2025-10-02 15:54:04 -04:00
Mark Backman
99f1041a47 More review fixes 2025-10-02 14:48:12 -04:00
Mark Backman
37b1345bfa Changes from review feedback 2025-10-02 14:48:12 -04:00
Mark Backman
8994ac17eb Add a third-party integrations guide 2025-10-02 14:48:12 -04:00
Mark Backman
63bc825008 Merge pull request #2771 from pipecat-ai/mb/update-publish-workflows
Updates to publish workflows
2025-10-02 12:35:43 -04:00
Mark Backman
e7ffde1c4c Merge pull request #2774 from pipecat-ai/mb/docs-fixes-0.0.87
Fix: Resolve docstring build issues before 0.0.87 release
2025-10-02 12:34:27 -04:00
Mark Backman
1c88565725 Merge pull request #2772 from pipecat-ai/mb/fix-openai-realtime-import
Fix: Change import for OpenAIRealtimeLLMContext in OpenAIRealtimeLLMS…
2025-10-02 12:34:16 -04:00
Aleix Conchillo Flaqué
07a6c2fb0e Merge pull request #2775 from pipecat-ai/aleix/pipecat-0.0.87
update CHANGELOG for 0.0.87
2025-10-02 09:12:41 -07:00
Aleix Conchillo Flaqué
e99f3bf75a update CHANGELOG for 0.0.87 2025-10-02 09:11:30 -07:00
Mark Backman
f09d780413 Fix: Resolve docstring build issues before 0.0.87 release 2025-10-02 10:09:25 -04:00
Mark Backman
e370d23374 Fix: Change import for OpenAIRealtimeLLMContext in OpenAIRealtimeLLMService 2025-10-02 09:39:44 -04:00
Mark Backman
b68ec14146 Updates to publish workflows 2025-10-02 08:25:35 -04:00
Filipi da Silva Fuchter
c567fd71b1 Merge pull request #2747 from pipecat-ai/filipi/whatsapp_runner
Creating the whatsapp routes inside the runner.
2025-10-01 21:21:34 -03:00
Filipi da Silva Fuchter
2ca1b2d6f8 Merge pull request #2612 from pipecat-ai/filipi/deepgram_flux
Integrating the new Deepgram model (Flux) with Pipecat
2025-10-01 21:20:47 -03:00
Mark Backman
04041a9a9a Merge pull request #2757 from pipecat-ai/hush/retryTimeout
Fix AWS Bedrock timeout exception handling
2025-10-01 19:08:09 -04:00
Aleix Conchillo Flaqué
6c498dc70f Merge pull request #2745 from pipecat-ai/aleix/transport-message-frames-deprecations
transport message frames deprecations
2025-10-01 16:05:55 -07:00
James Hush
32b07c1720 Fix AWS Bedrock timeout exception handling
- Use ReadTimeoutError and asyncio.TimeoutError which are the actual exceptions thrown by boto3
2025-10-01 19:04:35 -04:00
Aleix Conchillo Flaqué
ad507ce23d FrameLogger: it's fine to print transport messages 2025-10-01 16:00:42 -07:00
Aleix Conchillo Flaqué
be562cedfc DailyTransport: deprecate DailyTransportMessage(Urgent)Frame 2025-10-01 16:00:42 -07:00
Aleix Conchillo Flaqué
089e703e1f LiveKitTransport: deprecate LiveKitTransportMessage(Urgent)Frame 2025-10-01 16:00:42 -07:00
Aleix Conchillo Flaqué
4dc1e15a99 frames: use OutputTransportMessage(Urgent)Frame instead of TransportMessage(Urgent)Frame 2025-10-01 16:00:42 -07:00
Aleix Conchillo Flaqué
c7dc2e886f frames: use InputTransportMessageFrame instead of InputTransportMessageUrgentFrame
By default, input frames are already urgent.
2025-10-01 15:30:45 -07:00
Filipi Fuchter
11bc4ea854 Adding deepgram flux to release evals. 2025-10-01 19:24:58 -03:00
Mark Backman
029d76033d Merge pull request #2765 from pipecat-ai/mb/remove-daily-logging-04a
Remove DailyLogLevel from 04a example
2025-10-01 17:52:33 -04:00
Aleix Conchillo Flaqué
924d7dea9a Merge pull request #2766 from pipecat-ai/aleix/rtvi-properly-deprecate-errors-enabled
RTVIParams: properly deprecate errors_enabled
2025-10-01 14:49:12 -07:00
Aleix Conchillo Flaqué
244e94f3ce RTVIParams: properly deprecate errors_enabled 2025-10-01 14:30:41 -07:00
Mark Backman
af1f51d49e Remove DailyLogLevel from 04a example 2025-10-01 17:06:35 -04:00
Filipi da Silva Fuchter
9ba3c168b8 Merge pull request #2756 from pipecat-ai/filipi/esp32
SDP munging fixes.
2025-10-01 16:05:47 -03:00
Filipi Fuchter
e6ee8f7a16 New example using DeepgramFluxSTTService. 2025-10-01 15:43:25 -03:00
Filipi Fuchter
2ea2bd99e0 Deepgram Flux speech-to-text service implementation. 2025-10-01 15:43:09 -03:00
Filipi Fuchter
0c2ced7c52 Created WebsocketSTTService base class. 2025-10-01 15:42:56 -03:00
Filipi Fuchter
fb160646b8 Fixing the SDP munging to keep it working on Chrome. 2025-10-01 14:18:39 -03:00
Filipi da Silva Fuchter
89fed57af2 Merge pull request #2748 from pipecat-ai/filipi/remove_smallwebrtc_queue
Removing the message queue inside the SmallWebRTCConnection.
2025-10-01 08:07:47 -03:00
Filipi Fuchter
032032df65 Only remove ESP32 ICE candidates if host is defined. 2025-09-29 15:42:23 -03:00
Filipi Fuchter
a5595b82ea removing the message queue inside the SmallWebRTCConnection. 2025-09-26 11:02:17 -03:00
Filipi Fuchter
4d1915eb41 Fixing ruff format. 2025-09-26 10:49:52 -03:00
Filipi Fuchter
b3a84fc772 Refactoring how we are handling the lifespan inside the runner. 2025-09-26 10:47:04 -03:00
Filipi Fuchter
403d22e62c Creating the whatsapp routes inside the runner. 2025-09-26 10:28:19 -03:00
116 changed files with 9686 additions and 6290 deletions

View File

@@ -5,25 +5,25 @@ on:
inputs:
gitref:
type: string
description: "what git tag to build (e.g. v0.0.74)"
description: 'what git tag to build (e.g. v0.0.74)'
required: true
jobs:
build:
name: "Build and upload wheels"
name: 'Build and upload wheels'
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.gitref }}
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
version: 'latest'
- name: Set up Python
run: uv python install 3.10
run: uv python install 3.12
- name: Install development dependencies
run: uv sync --group dev
- name: Build project
@@ -35,9 +35,9 @@ jobs:
path: ./dist
publish-to-pypi:
name: "Publish to PyPI"
name: 'Publish to PyPI'
runs-on: ubuntu-latest
needs: [ build ]
needs: [build]
environment:
name: pypi
url: https://pypi.org/p/pipecat-ai
@@ -56,12 +56,12 @@ jobs:
print-hash: true
publish-to-test-pypi:
name: "Publish to Test PyPI"
name: 'Publish to Test PyPI'
runs-on: ubuntu-latest
needs: [ build ]
needs: [build]
environment:
name: testpypi
url: https://pypi.org/p/pipecat-ai
url: https://test.pypi.org/p/pipecat-ai
permissions:
id-token: write
steps:
@@ -70,7 +70,7 @@ jobs:
with:
name: wheels
path: ./dist
- name: Publish to PyPI
- name: Publish to Test PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
verbose: true

View File

@@ -4,7 +4,7 @@ on: workflow_dispatch
jobs:
build:
name: "Build and upload wheels"
name: 'Build and upload wheels'
runs-on: ubuntu-latest
steps:
- name: Checkout repo
@@ -15,9 +15,9 @@ jobs:
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
version: "latest"
version: 'latest'
- name: Set up Python
run: uv python install 3.10
run: uv python install 3.12
- name: Install development dependencies
run: uv sync --group dev
- name: Build project
@@ -29,12 +29,12 @@ jobs:
path: ./dist
publish-to-test-pypi:
name: "Publish to Test PyPI"
name: 'Publish to Test PyPI'
runs-on: ubuntu-latest
needs: [build]
environment:
name: testpypi
url: https://pypi.org/p/pipecat-ai
url: https://test.pypi.org/p/pipecat-ai
permissions:
id-token: write
steps:
@@ -43,7 +43,7 @@ jobs:
with:
name: wheels
path: ./dist
- name: Publish to PyPI
- name: Publish to Test PyPI
uses: pypa/gh-action-pypi-publish@release/v1
with:
verbose: true

View File

@@ -5,10 +5,126 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.90] - 2025-10-10
### Added
- Added audio filter `KrispVivaFilter` using the Krisp VIVA SDK.
- Added `--folder` argument to the runner, allowing files saved in that folder
to be downloaded from `http://HOST:PORT/file/FILE`.
- Added `GeminiLiveVertexLLMService`, for accessing Gemini Live via Google
Vertex AI.
- Added some new configuration options to `GeminiLiveLLMService`:
- `thinking`
- `enable_affective_dialog`
- `proactivity`
Note that these new configuration options require using a newer model than
the default, like "gemini-2.5-flash-native-audio-preview-09-2025". The last
two require specifying `http_options=HttpOptions(api_version="v1alpha")`.
- Added `on_pipeline_error` event to `PipelineTask`. This event will get fired
when an `ErrorFrame` is pushed (use `FrameProcessor.push_error()`).
```python
@task.event_handler("on_pipeline_error")
async def on_pipeline_error(task: PipelineTask, frame: ErrorFrame):
...
```
- Added a `service_tier` `InputParam` to the `BaseOpenAILLMService`. This
parameter can influence the latency of the response. For example `"priority"`
will result in faster completions, but in exchange for a higher price.
### Changed
- Updated `GeminiLiveLLMService` to use the `google-genai` library rather than
use WebSockets directly.
### Deprecated
- `LivekitFrameSerializer` is now deprecated. Use `LiveKitTransport` instead.
- `pipecat.service.openai_realtime` is now deprecated, use
`pipecat.services.openai.realtime` instead or
`pipecat.services.azure.realtime` for Azure Realtime.
- `pipecat.service.aws_nova_sonic` is now deprecated, use
`pipecat.services.aws.nova_sonic` instead.
- `GeminiMultimodalLiveLLMService` is now deprecated, use
`GeminiLiveLLMService`.
### Fixed
- Fixed a `GoogleVertexLLMService` issue that would generate an error if no
token information was returned.
- `GeminiLiveLLMService` will now end gracefully (i.e. after the bot has
finished) upon receiving an `EndFrame`.
- `GeminiLiveLLMService` will try to seamlessly reconnect when it loses its
connection.
## [0.0.89] - 2025-10-07
### Fixed
- Reverted a change introduced in 0.0.88 that was causing pipelines to be frozen
when using interruption strategies and processors that block interruption
frames (e.g. `STTMuteFilter`).
## [0.0.88] - 2025-10-07
### Added
- Added support for Nano Banana models to `GoogleLLMService`. For example, you
can now use the `gemini-2.5-flash-image` model to generate images.
- Added `HumeTTSService` for text-to-speech synthesis using Hume AI's expressive
voice models. Provides high-quality, emotionally expressive speech synthesis
with support for various voice models. Includes example in
`examples/foundational/07ad-interruptible-hume.py`. Use with:
`uv pip install pipecat-ai[hume]`.
### Changed
- Updated default `GoogleLLMService` model to `gemini-2.5-flash`.
### Deprecated
- PlayHT is shutting down their API on December 31st, 2025. As a result,
`PlayHTTTSService` and `PlayHTHttpTTSService` are deprecated and will be
removed in a future version.
### Fixed
- Fixed an issue with `AWSNovaSonicLLMService` where the client wouldn't
connect due to a breaking change in the AWS dependency chain.
- `PermissionError` is now caught if NLTK's `punkt_tab` can't be downloaded.
- Fixed an issue that would cause wrong user/assistant context ordering when
using interruption strategies.
- Fixed RTVI incoming message handling, broken in 0.0.87.
## [0.0.87] - 2025-10-02
### Added
- Added `WebsocketSTTService` base class for websocket-based STT services.
Combines STT functionality with websocket connectivity, providing automatic
error handling and reconnection capabilities with exponential backoff.
- Added `DeepgramFluxSTTService` for real-time speech recognition using
Deepgram's Flux WebSocket API. Flux understands conversational flow and
automatically handles turn-taking.
- Added RTVI messages for user/bot audio levels and system logs.
- Include OpenAI-based LLM services cached tokens to `MetricsFrame`.
@@ -20,6 +136,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Deprecated
- `DailyTransportMessageFrame` and `DailyTransportMessageUrgentFrame` are
deprecated, use `DailyOutputTransportMessageFrame` and
`DailyOutputTransportMessageUrgentFrame` respectively instead.
- `LiveKitTransportMessageFrame` and `LiveKitTransportMessageUrgentFrame` are
deprecated, use `LiveKitOutputTransportMessageFrame` and
`LiveKitOutputTransportMessageUrgentFrame` respectively instead.
- `TransportMessageFrame` and `TransportMessageUrgentFrame` are deprecated, use
`OutputTransportMessageFrame` and `OutputTransportMessageUrgentFrame`
respectively instead.
- `InputTransportMessageUrgentFrame` is deprecated, use
`InputTransportMessageFrame` instead.
- `DailyUpdateRemoteParticipantsFrame` is deprecated and will be removed in a
future version. Instead, create your own custom frame and handle it in the
`@transport.output().event_handler("on_after_push_frame")` event handler or a
@@ -27,6 +158,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Fixed
- Fixed an issue in `AWSBedrockLLMService` where timeout exceptions weren't
being detected.
- Fixed a `PipelineTask` issue that could prevent the application to exit if
`task.cancel()` was called when the task was already finished.
@@ -1353,7 +1487,7 @@ quality and critical bugs impacting `ParallelPipelines` functionality.**
- Added `session_token` parameter to `AWSNovaSonicLLMService`.
- Added Gemini Multimodal Live File API for uploading, fetching, listing, and
deleting files. See `26f-gemini-multimodal-live-files-api.py` for example usage.
deleting files. See `26f-gemini-live-files-api.py` for example usage.
### Changed
@@ -3359,7 +3493,7 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
- Added the new modalities option and helper function to set Gemini output
modalities.
- Added `examples/foundational/26d-gemini-multimodal-live-text.py` which is
- Added `examples/foundational/26d-gemini-live-text.py` which is
using Gemini as TEXT modality and using another TTS provider for TTS process.
### Changed
@@ -3546,9 +3680,9 @@ stt = DeepgramSTTService(..., live_options=LiveOptions(model="nova-2-general"))
- Added new foundational examples for `GeminiMultimodalLiveLLMService`:
- `26-gemini-multimodal-live.py`
- `26a-gemini-multimodal-live-transcription.py`
- `26b-gemini-multimodal-live-video.py`
- `26c-gemini-multimodal-live-video.py`
- `26a-gemini-live-transcription.py`
- `26b-gemini-live-video.py`
- `26c-gemini-live-video.py`
- Added `SimliVideoService`. This is an integration for Simli AI avatars.
(see https://www.simli.com)

336
COMMUNITY_INTEGRATIONS.md Normal file
View File

@@ -0,0 +1,336 @@
# Community Integrations Guide
Pipecat welcomes community-maintained integrations! As our ecosystem grows, we've established a process for any developer to create and maintain their own service integrations while ensuring discoverability for the Pipecat community.
## Overview
**What we support:** Community-maintained integrations that live in separate repositories and are maintained by their authors.
**What we don't do:** The Pipecat team does not code review, test, or maintain community integrations. We provide guidance and list approved integrations for discoverability.
**Why this approach:** This allows the community to move quickly while keeping the Pipecat core team focused on maintaining the framework itself.
## Submitting your Integration
To be listed as an official community integration, follow these steps:
### Step 1: Build Your Integration
Create your integration following the patterns and examples shown in the "Integration Patterns and Examples" section below.
### Step 2: Set Up Your Repository
Your repository must contain these components:
- **Source code** - Complete implementation following Pipecat patterns
- **Foundational example** - Single file example showing basic usage (see [Pipecat examples](https://github.com/pipecat-ai/pipecat/tree/main/examples/foundational))
- **README.md** - Must include:
- Introduction and explanation of your integration
- Installation instructions
- Usage instructions with Pipecat Pipeline
- How to run your example
- Pipecat version compatibility (e.g., "Tested with Pipecat v0.0.86")
- Company attribution: If you work for the company providing the service, please mention this in your README. This helps build confidence that the integration will be actively maintained.
- **LICENSE** - Permissive license (BSD-2 like Pipecat, or equivalent open source terms)
- **Code documentation** - Source code with docstrings (we recommend following [Pipecat's docstring conventions](https://github.com/pipecat-ai/pipecat/blob/main/CONTRIBUTING.md#docstring-conventions))
- **Changelog** - Maintain a changelog for version updates
### Step 3: Join Discord
Join our Discord: https://discord.gg/pipecat
### Step 4: Submit for Listing
Submit a pull request to add your integration to our [Community Integrations documentation page](https://docs.pipecat.ai/server/services/community-integrations).
**To submit:**
1. Fork the [Pipecat docs repository](https://github.com/pipecat-ai/docs)
2. Edit the file `server/services/community-integrations.mdx`
3. Add your integration to the appropriate service category table with:
- Service name
- Link to your repository
- Maintainer GitHub username(s)
4. Include a link to your demo video (approx 30-60 seconds) in your PR description showing:
- Core functionality of your integration
- Handling of an interruption (if applicable to service type)
5. Submit your pull request
Once your PR is submitted, post in the `#community-integrations` Discord channel to let us know.
## Integration Patterns and Examples
### STT (Speech-to-Text) Services
#### Websocket-based Services
**Base class:** `STTService`
**Examples:**
- [DeepgramSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/deepgram/stt.py)
- [SpeechmaticsSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/speechmatics/stt.py)
#### File-based Services
**Base class:** `SegmentedSTTService`
**Examples:**
- [RivaSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/riva/stt.py)
- [FalSTTService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/fal/stt.py)
#### Key requirements:
- STT services should push `InterimTranscriptionFrames` and `TranscriptionFrames`
- If confidence values are available, filter for values >50% confidence
### LLM (Large Language Model) Services
#### OpenAI-Compatible Services
**Base class:** `OpenAILLMService`
**Examples:**
- [AzureLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/azure/llm.py)
- [GrokLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/grok/llm.py) - Shows overriding the base class where needed
#### Non-OpenAI Compatible Services
**Requires:** Full implementation
**Examples:**
- [AnthropicLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/anthropic/llm.py)
- [GoogleLLMService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/llm.py)
#### Key requirements:
- **Frame sequence:** Output must follow this frame sequence pattern:
- `LLMFullResponseStartFrame` - Signals the start of an LLM response
- `LLMTextFrame` - Contains LLM content, typically streamed as tokens
- `LLMFullResponseEndFrame` - Signals the end of an LLM response
- **Context aggregation:** Implement context aggregation to collect user and assistant content:
- Aggregators come in pairs with a `user()` instance and `assistant()` instance
- Context must adhere to the `LLMContext` universal format
- Aggregators should handle adding messages, function calls, and images to the context
### TTS (Text-to-Speech) Services
#### AudioContextWordTTSService
**Use for:** Websocket-based services supporting word/timestamp alignment
**Example:**
- [CartesiaTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/cartesia/tts.py)
#### InterruptibleTTSService
**Use for:** Websocket-based services without word/timestamp alignment, requiring disconnection on interruption
**Example:**
- [SarvamTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/sarvam/tts.py)
#### WordTTSService
**Use for:** HTTP-based services supporting word/timestamp alignment
**Example:**
- [ElevenLabsHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/elevenlabs/tts.py)
#### TTSService
**Use for:** HTTP-based services without word/timestamp alignment
**Example:**
- [GoogleHttpTTSService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/tts.py)
#### Key requirements:
- For websocket services, use asyncio WebSocket implementation (required for v13+ support)
- Handle idle service timeouts with keepalives
- TTSServices push both audio (`TTSRawAudioFrame`) and text (`TTSTextFrame`) frames
### Telephony Serializers
Pipecat supports telephony provider integration using websocket connections to exchange MediaStreams. These services use a FrameSerializer to serialize and deserialize inputs from the FastAPIWebsocketTransport.
**Examples:**
- [Twilio](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/serializers/twilio.py)
- [Telnyx](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/serializers/telnyx.py)
#### Key requirements:
- Include hang-up functionality using the provider's native API, ideally using `aiohttp`
- Support DTMF (dual-tone multi-frequency) events if the provider supports them:
- Deserialize DTMF events from the provider's protocol to `InputDTMFFrame`
- Use `KeypadEntry` enum for valid keypad entries (0-9, \*, #, A-D)
- Handle invalid DTMF digits gracefully by returning `None`
### Image Generation Services
**Base class:** `ImageGenService`
**Examples:**
- [FalImageGenService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/fal/image.py)
- [GoogleImageGenService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/google/image.py)
#### Key requirements:
- Must implement `run_image_gen` method returning an `AsyncGenerator`
### Vision Services
Vision services process images and provide analysis such as descriptions, object detection, or visual question answering.
**Base class:** `VisionService`
**Example:**
- [MoondreamVisionService](https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/services/moondream/vision.py)
#### Key requirements:
- Must implement `run_vision` method that takes an `LLMContext` and returns an `AsyncGenerator[Frame, None]`
- The method processes the latest image in the context and yields frames with analysis results
- Typically yields `TextFrame` objects containing descriptions or answers
## Implementation Guidelines
### Naming Conventions
- **STT:** `VendorSTTService`
- **LLM:** `VendorLLMService`
- **TTS:**
- Websocket: `VendorTTSService`
- HTTP: `VendorHttpTTSService`
- **Image:** `VendorImageGenService`
- **Vision:** `VendorVisionService`
- **Telephony:** `VendorFrameSerializer`
### Metrics Support
Enable metrics in your service:
```python
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as this service supports metrics.
"""
return True
```
### Dynamic Settings Updates
STT, LLM, and TTS services support `ServiceUpdateSettingsFrame` for dynamic configuration changes. The base STTService has an `_update_settings()` method that handles settings, and the private `_settings` `Dict` is used to store settings and provide access to the subclass.
```python
async def set_language(self, language: Language):
"""Set the recognition language and reconnect.
Args:
language: The language to use for speech recognition.
"""
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
```
Note that, in this example, Deepgram requires the websocket connection be disconnected and reconnected to reinitialize the service with the new value. Consider if your service requires reconnection.
### Sample Rate Handling
Sample rates are set via PipelineParams and passed to each frame processor at initialization. The pattern is to _not_ set the sample rate value in the constructor of a given service. Instead, use the `start()` method to initialize sample rates from the frame:
```python
async def start(self, frame: StartFrame):
"""Start the service."""
await super().start(frame)
self._settings["output_format"]["sample_rate"] = self.sample_rate
await self._connect()
```
Note that `self.sample_rate` is a `@property` set in the TTSService base class, which provides access to the private sample rate value obtained from the StartFrame.
### Tracing Decorators
Use Pipecat's tracing decorators:
- **STT:** `@traced_stt` - decorate a function that handles `transcript`, `is_final`, `language` as args
- **LLM:** `@traced_llm` - decorate the `_process_context()` method
- **TTS:** `@traced_tts` - decorate the `run_tts()` method
## Best Practices
### Packaging and Distribution
- Use [uv](https://docs.astral.sh/uv/) for packaging (encouraged)
- Consider releasing to PyPI for easier installation
- Follow semantic versioning principles
- Maintain a changelog
### HTTP Communication
For REST-based communication, use aiohttp. Pipecat includes this as a required dependency, so using it prevents adding an additional dependency to your integration.
### Error Handling
- Wrap API calls in appropriate try/catch blocks
- Handle rate limits and network failures gracefully
- Provide meaningful error messages
- When errors occur, raise exceptions AND push `ErrorFrame`s to notify the pipeline:
```python
from pipecat.frames.frames import ErrorFrame
try:
# Your API call
result = await self._make_api_call()
except Exception as e:
# Push error frame to pipeline
await self.push_error(ErrorFrame(error=f"{self} error: {e}"))
# Raise or handle as appropriate
raise
```
### Testing
- Your foundational example serves as a valuable integration-level test
- Unit tests are nice to have. As the Pipecat teams provides better guidance, we will encourage unit testing more
## Disclaimer
Community integrations are community-maintained and not officially supported by the Pipecat team. Users should evaluate these integrations independently. The Pipecat team reserves the right to remove listings that become unmaintained or problematic.
## Staying Up to Date
Pipecat evolves rapidly to support the latest AI technologies and patterns. While we strive to minimize breaking changes, they do occur as the framework matures.
**We strongly recommend:**
- Join our Discord at https://discord.gg/pipecat and monitor the `#announcements` channel for release notifications
- Follow our changelog: https://github.com/pipecat-ai/pipecat/blob/main/CHANGELOG.md
- Test your integration against new Pipecat releases promptly
- Update your README with the last tested Pipecat version
This helps ensure your integration remains compatible and your users have clear expectations about version support.
## Questions?
Join our Discord community at https://discord.gg/pipecat and post in the `#community-integrations` channel for guidance and support.
For additional questions, you can also reach out to us at pipecat-ai@daily.co.

View File

@@ -1,5 +1,9 @@
## Contributing to Pipecat
**Want to add a new service integration?**
We encourage community-maintained integrations! Please see our [Community Integration Guide](COMMUNITY_INTEGRATIONS.md) for the process and requirements.
**Want to contribute to Pipecat core?**
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.

112
README.md
View File

@@ -19,10 +19,6 @@
- **Business Agents** customer intake, support bots, guided flows
- **Complex Dialog Systems** design logic with structured conversations
🧭 Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
🔍 Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
## 🧠 Why Pipecat?
- **Voice-first**: Integrates speech recognition, text-to-speech, and conversation handling
@@ -30,40 +26,34 @@
- **Composable Pipelines**: Build complex behavior from modular components
- **Real-Time**: Ultra-low latency interaction with different transports (e.g. WebSockets or WebRTC)
## 📱 Client SDKs
## 🌐 Pipecat Ecosystem
You can connect to Pipecat from any platform using our official SDKs:
### 📱 Client SDKs
<table>
<tr>
<td>
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/javascript/javascript-original.svg" width="40" height="40" alt="JavaScript"/>
<a href="https://docs.pipecat.ai/client/js/introduction">JavaScript</a>
</td>
<td>
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/react/react-original.svg" width="40" height="40" alt="React"/>
<a href="https://docs.pipecat.ai/client/react/introduction">React</a>
</td>
<td>
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/react/react-original.svg" width="40" height="40" alt="React Native"/>
<a href="https://docs.pipecat.ai/client/react-native/introduction">React Native</a>
</td>
</tr>
<tr>
<td>
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/swift/swift-original.svg" width="40" height="40" alt="Swift"/>
<a href="https://docs.pipecat.ai/client/ios/introduction">Swift</a>
</td>
<td>
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/kotlin/kotlin-original.svg" width="40" height="40" alt="Kotlin"/>
<a href="https://docs.pipecat.ai/client/android/introduction">Kotlin</a>
</td>
<td>
<img src="https://cdn.jsdelivr.net/gh/devicons/devicon/icons/cplusplus/cplusplus-original.svg" width="40" height="40" alt="JavaScript"/>
<a href="https://docs.pipecat.ai/client/c++/introduction">C++</a>
</td>
</tr>
</table>
Building client applications? You can connect to Pipecat from any platform using our official SDKs:
<a href="https://docs.pipecat.ai/client/js/introduction">JavaScript</a> | <a href="https://docs.pipecat.ai/client/react/introduction">React</a> | <a href="https://docs.pipecat.ai/client/react-native/introduction">React Native</a> |
<a href="https://docs.pipecat.ai/client/ios/introduction">Swift</a> | <a href="https://docs.pipecat.ai/client/android/introduction">Kotlin</a> | <a href="https://docs.pipecat.ai/client/c++/introduction">C++</a> | <a href="https://github.com/pipecat-ai/pipecat-esp32">ESP32</a>
### 🧭 Structured conversations
Looking to build structured conversations? Check out [Pipecat Flows](https://github.com/pipecat-ai/pipecat-flows) for managing complex conversational states and transitions.
### 🪄 Beautiful UIs
Want to build beautiful and engaging experiences? Checkout the [Voice UI Kit](https://github.com/pipecat-ai/voice-ui-kit), a collection of components, hooks and templates for building voice AI applications quickly.
### 🔍 Debugging
Looking for help debugging your pipeline and processors? Check out [Whisker](https://github.com/pipecat-ai/whisker), a real-time Pipecat debugger.
### 🖥️ Terminal
Love terminal applications? Check out [Tail](https://github.com/pipecat-ai/tail), a terminal dashboard for Pipecat.
### 📺️ Pipecat TV Channel
Catch new features, interviews, and how-tos on our [Pipecat TV](https://www.youtube.com/playlist?list=PLzU2zoMTQIHjqC3v4q2XVSR3hGSzwKFwH) channel.
## 🎬 See it in action
@@ -81,7 +71,7 @@ You can connect to Pipecat from any platform using our official SDKs:
| ------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Speech-to-Text | [AssemblyAI](https://docs.pipecat.ai/server/services/stt/assemblyai), [AWS](https://docs.pipecat.ai/server/services/stt/aws), [Azure](https://docs.pipecat.ai/server/services/stt/azure), [Cartesia](https://docs.pipecat.ai/server/services/stt/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/stt/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/stt/elevenlabs), [Fal Wizper](https://docs.pipecat.ai/server/services/stt/fal), [Gladia](https://docs.pipecat.ai/server/services/stt/gladia), [Google](https://docs.pipecat.ai/server/services/stt/google), [Groq (Whisper)](https://docs.pipecat.ai/server/services/stt/groq), [NVIDIA Riva](https://docs.pipecat.ai/server/services/stt/riva), [OpenAI (Whisper)](https://docs.pipecat.ai/server/services/stt/openai), [SambaNova (Whisper)](https://docs.pipecat.ai/server/services/stt/sambanova), [Soniox](https://docs.pipecat.ai/server/services/stt/soniox), [Speechmatics](https://docs.pipecat.ai/server/services/stt/speechmatics), [Ultravox](https://docs.pipecat.ai/server/services/stt/ultravox), [Whisper](https://docs.pipecat.ai/server/services/stt/whisper) |
| LLMs | [Anthropic](https://docs.pipecat.ai/server/services/llm/anthropic), [AWS](https://docs.pipecat.ai/server/services/llm/aws), [Azure](https://docs.pipecat.ai/server/services/llm/azure), [Cerebras](https://docs.pipecat.ai/server/services/llm/cerebras), [DeepSeek](https://docs.pipecat.ai/server/services/llm/deepseek), [Fireworks AI](https://docs.pipecat.ai/server/services/llm/fireworks), [Gemini](https://docs.pipecat.ai/server/services/llm/gemini), [Grok](https://docs.pipecat.ai/server/services/llm/grok), [Groq](https://docs.pipecat.ai/server/services/llm/groq), [Mistral](https://docs.pipecat.ai/server/services/llm/mistral), [NVIDIA NIM](https://docs.pipecat.ai/server/services/llm/nim), [Ollama](https://docs.pipecat.ai/server/services/llm/ollama), [OpenAI](https://docs.pipecat.ai/server/services/llm/openai), [OpenRouter](https://docs.pipecat.ai/server/services/llm/openrouter), [Perplexity](https://docs.pipecat.ai/server/services/llm/perplexity), [Qwen](https://docs.pipecat.ai/server/services/llm/qwen), [SambaNova](https://docs.pipecat.ai/server/services/llm/sambanova) [Together AI](https://docs.pipecat.ai/server/services/llm/together) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Text-to-Speech | [Async](https://docs.pipecat.ai/server/services/tts/asyncai), [AWS](https://docs.pipecat.ai/server/services/tts/aws), [Azure](https://docs.pipecat.ai/server/services/tts/azure), [Cartesia](https://docs.pipecat.ai/server/services/tts/cartesia), [Deepgram](https://docs.pipecat.ai/server/services/tts/deepgram), [ElevenLabs](https://docs.pipecat.ai/server/services/tts/elevenlabs), [Fish](https://docs.pipecat.ai/server/services/tts/fish), [Google](https://docs.pipecat.ai/server/services/tts/google), [Groq](https://docs.pipecat.ai/server/services/tts/groq), [Hume](https://docs.pipecat.ai/server/services/tts/hume), [Inworld](https://docs.pipecat.ai/server/services/tts/inworld), [LMNT](https://docs.pipecat.ai/server/services/tts/lmnt), [MiniMax](https://docs.pipecat.ai/server/services/tts/minimax), [Neuphonic](https://docs.pipecat.ai/server/services/tts/neuphonic), [NVIDIA Riva](https://docs.pipecat.ai/server/services/tts/riva), [OpenAI](https://docs.pipecat.ai/server/services/tts/openai), [Piper](https://docs.pipecat.ai/server/services/tts/piper), [PlayHT](https://docs.pipecat.ai/server/services/tts/playht), [Rime](https://docs.pipecat.ai/server/services/tts/rime), [Sarvam](https://docs.pipecat.ai/server/services/tts/sarvam), [XTTS](https://docs.pipecat.ai/server/services/tts/xtts) |
| Speech-to-Speech | [AWS Nova Sonic](https://docs.pipecat.ai/server/services/s2s/aws), [Gemini Multimodal Live](https://docs.pipecat.ai/server/services/s2s/gemini), [OpenAI Realtime](https://docs.pipecat.ai/server/services/s2s/openai) |
| Transport | [Daily (WebRTC)](https://docs.pipecat.ai/server/services/transport/daily), [FastAPI Websocket](https://docs.pipecat.ai/server/services/transport/fastapi-websocket), [SmallWebRTCTransport](https://docs.pipecat.ai/server/services/transport/small-webrtc), [WebSocket Server](https://docs.pipecat.ai/server/services/transport/websocket-server), Local |
| Serializers | [Plivo](https://docs.pipecat.ai/server/utilities/serializers/plivo), [Twilio](https://docs.pipecat.ai/server/utilities/serializers/twilio), [Telnyx](https://docs.pipecat.ai/server/utilities/serializers/telnyx) |
@@ -184,54 +174,6 @@ Run a specific test suite:
uv run pytest tests/test_name.py
```
### Setting up your editor
This project uses strict [PEP 8](https://peps.python.org/pep-0008/) formatting via [Ruff](https://github.com/astral-sh/ruff).
#### Emacs
You can use [use-package](https://github.com/jwiegley/use-package) to install [emacs-lazy-ruff](https://github.com/christophermadsen/emacs-lazy-ruff) package and configure `ruff` arguments:
```elisp
(use-package lazy-ruff
:ensure t
:hook ((python-mode . lazy-ruff-mode))
:config
(setq lazy-ruff-format-command "ruff format")
(setq lazy-ruff-check-command "ruff check --select I"))
```
`ruff` was installed in the `venv` environment described before, so you should be able to use [pyvenv-auto](https://github.com/ryotaro612/pyvenv-auto) to automatically load that environment inside Emacs.
```elisp
(use-package pyvenv-auto
:ensure t
:defer t
:hook ((python-mode . pyvenv-auto-run)))
```
#### Visual Studio Code
Install the
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
```json
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true
}
```
#### PyCharm
`ruff` was installed in the `venv` environment described before, now to enable autoformatting on save, go to `File` -> `Settings` -> `Tools` -> `File Watchers` and add a new watcher with the following settings:
1. **Name**: `Ruff formatter`
2. **File type**: `Python`
3. **Working directory**: `$ContentRoot$`
4. **Arguments**: `format $FilePath$`
5. **Program**: `$PyInterpreterDirectory$/ruff`
## 🤝 Contributing
We welcome contributions from the community! Whether you're fixing bugs, improving documentation, or adding new features, here's how you can help:

5
SECURITY.md Normal file
View File

@@ -0,0 +1,5 @@
# Security Policy
## Reporting a Vulnerability
Please email `disclosures@daily.co`.

View File

@@ -50,6 +50,7 @@ autodoc_mock_imports = [
# Krisp - has build issues on some platforms
"pipecat_ai_krisp",
"krisp",
"krisp_audio",
# System-specific GUI libraries
"_tkinter",
"tkinter",

View File

@@ -58,6 +58,9 @@ GOOGLE_CLOUD_PROJECT_ID=...
GOOGLE_TEST_CREDENTIALS=...
GOOGLE_VERTEX_TEST_CREDENTIALS=...
# Hume
HUME_API_KEY=...
# LMNT
LMNT_API_KEY=...
LMNT_VOICE_ID=...
@@ -87,6 +90,9 @@ SIMLI_FACE_ID=...
# Krisp
KRISP_MODEL_PATH=...
# Krisp Viva
KRISP_VIVA_MODEL_PATH=...
# DeepSeek
DEEPSEEK_API_KEY=...
@@ -155,3 +161,9 @@ NVIDIA_API_KEY=...
# Qwen
QWEN_API_KEY=...
# WhatsApp
WHATSAPP_TOKEN=
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN=
WHATSAPP_PHONE_NUMBER_ID=
WHATSAPP_APP_SECRET=

View File

@@ -25,7 +25,7 @@ from pipecat.processors.aggregators.llm_response_universal import LLMContextAggr
from pipecat.runner.daily import configure
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.daily.transport import DailyLogLevel, DailyParams, DailyTransport
from pipecat.transports.daily.transport import DailyParams, DailyTransport
load_dotenv(override=True)
@@ -49,7 +49,6 @@ async def main():
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
)
transport.set_log_level(DailyLogLevel.Info)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),

View File

@@ -5,7 +5,6 @@
#
import os
import wave
from dotenv import load_dotenv
from loguru import logger
@@ -14,14 +13,7 @@ from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMRunFrame,
LLMTextFrame,
OutputAudioRawFrame,
TextFrame,
)
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -111,27 +103,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
audio_file_path = os.path.join(os.path.dirname(__file__), "assets", "pre-recorded.wav")
with wave.open(audio_file_path, "rb") as wav_file:
llm_text_frame = TextFrame(text="This is a pre-recorded message.")
llm_text_frame.skip_tts = True
audio_data = wav_file.readframes(wav_file.getnframes())
output_audio_raw_frame = OutputAudioRawFrame(
audio=audio_data, sample_rate=44100, num_channels=1
)
await task.queue_frames(
[
LLMRunFrame(),
LLMFullResponseStartFrame(),
llm_text_frame,
output_audio_raw_frame,
LLMFullResponseEndFrame(),
]
)
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):

View File

@@ -0,0 +1,138 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.hume.tts import HUME_SAMPLE_RATE, HumeTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = HumeTTSService(
api_key=os.getenv("HUME_API_KEY"),
# Replace with your Hume voice ID
voice_id="f898a92e-685f-43fa-985b-a46920f0650b",
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
pipeline = Pipeline(
[
transport.input(), # Transport user input
rtvi,
stt,
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
audio_out_sample_rate=HUME_SAMPLE_RATE,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
observers=[RTVIObserver(rtvi)],
)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,118 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response_universal import (
LLMContext,
LLMContextAggregatorPair,
)
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.flux.stt import DeepgramFluxSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramFluxSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-2-andromeda-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -23,7 +23,6 @@ from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.elevenlabs.stt import ElevenLabsSTTService
from pipecat.services.elevenlabs.tts import ElevenLabsHttpTTSService
from pipecat.services.openai.llm import OpenAILLMService

View File

@@ -0,0 +1,151 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""
A conversational AI bot using Gemini for both LLM, STT and TTS.
This example demonstrates how to use Gemini's image generation capabilities.
Features showcased:
- Gemini LLM for conversation and image generation
- Google TTS and STT
Run with:
python examples/foundational/07n-interruptible-gemini-image.py
Make sure to set your environment variables:
export GOOGLE_API_KEY=your_api_key_here
"""
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.llm import GoogleLLMService
from pipecat.services.google.stt import GoogleSTTService
from pipecat.services.google.tts import GoogleTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=1024,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = GoogleSTTService(
params=GoogleSTTService.InputParams(languages=Language.EN_US),
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
)
tts = GoogleTTSService(
voice_id="en-US-Chirp3-HD-Charon",
params=GoogleTTSService.InputParams(language=Language.EN_US),
credentials=os.getenv("GOOGLE_TEST_CREDENTIALS"),
)
llm = GoogleLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
model="gemini-2.5-flash-image",
)
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # Gemini TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation with a styled introduction
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,129 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from dotenv import load_dotenv
from loguru import logger
from pipecat.audio.filters.krisp_viva_filter import KrispVivaFilter
from pipecat.audio.turn.smart_turn.base_smart_turn import SmartTurnParams
from pipecat.audio.turn.smart_turn.local_smart_turn_v3 import LocalSmartTurnAnalyzerV3
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_context import LLMContext
from pipecat.processors.aggregators.llm_response_universal import LLMContextAggregatorPair
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=KrispVivaFilter(),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=KrispVivaFilter(),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
turn_analyzer=LocalSmartTurnAnalyzerV3(params=SmartTurnParams()),
audio_in_filter=KrispVivaFilter(),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = LLMContext(messages)
context_aggregator = LLMContextAggregatorPair(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -76,9 +76,8 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = GoogleVertexLLMService(
credentials=os.getenv("GOOGLE_VERTEX_TEST_CREDENTIALS"),
params=GoogleVertexLLMService.InputParams(
project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
),
project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
location=os.getenv("GOOGLE_CLOUD_LOCATION"),
)
# You can aslo register a function_name of None to get all functions
# sent to the same callback with an additional function_name parameter.

View File

@@ -26,7 +26,11 @@ from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.deepgram.tts import DeepgramTTSService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams, DailyTransportMessageFrame
from pipecat.transports.daily.transport import (
DailyOutputTransportMessageFrame,
DailyOutputTransportMessageUrgentFrame,
DailyParams,
)
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
@@ -128,14 +132,14 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.debug(f"Received latency ping app message: {message}")
ts = message["latency-ping"]["ts"]
# Send immediately
transport.output().send_message(
DailyTransportMessageFrame(
await task.queue_frame(
DailyOutputTransportMessageUrgentFrame(
message={"latency-pong-msg-handler": {"ts": ts}}, participant_id=sender
)
)
# And push to the pipeline for the Daily transport.output to send
await task.queue_frame(
DailyTransportMessageFrame(
DailyOutputTransportMessageFrame(
message={"latency-pong-pipeline-delivery": {"ts": ts}},
participant_id=sender,
)

View File

@@ -24,14 +24,15 @@ from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai_realtime import (
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioNoiseReduction,
InputAudioTranscription,
OpenAIRealtimeLLMService,
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -21,13 +21,14 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai_realtime import (
AzureRealtimeLLMService,
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
SessionProperties,
)
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -22,16 +22,17 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai_realtime import (
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioNoiseReduction,
InputAudioTranscription,
OpenAIRealtimeLLMService,
SemanticTurnDetection,
SessionProperties,
)
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -25,13 +25,14 @@ from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.services.openai_realtime import (
from pipecat.services.openai.realtime.events import (
AudioConfiguration,
AudioInput,
InputAudioTranscription,
OpenAIRealtimeLLMService,
SessionProperties,
TurnDetection,
)
from pipecat.services.openai_realtime.events import AudioConfiguration, AudioInput
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams

View File

@@ -23,7 +23,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

View File

@@ -17,7 +17,7 @@ from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -65,7 +65,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
Respond to what the user said in a creative and helpful way.
"""
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck

View File

@@ -20,7 +20,7 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.transcript_processor import TranscriptProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -65,7 +65,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."

View File

@@ -22,7 +22,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -122,12 +122,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
required=["location"],
)
search_tool = {"google_search": {}}
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,

View File

@@ -24,7 +24,7 @@ from pipecat.runner.utils import (
maybe_capture_participant_camera,
maybe_capture_participant_screen,
)
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
@@ -58,7 +58,7 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Aoede", # Puck, Charon, Kore, Fenrir, Aoede
# system_instruction="Talk like a pirate."

View File

@@ -20,9 +20,9 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.gemini_multimodal_live.gemini import (
GeminiMultimodalLiveLLMService,
GeminiMultimodalModalities,
from pipecat.services.google.gemini_live.llm import (
GeminiLiveLLMService,
GeminiModalities,
InputParams,
)
from pipecat.transports.base_transport import BaseTransport, TransportParams
@@ -80,11 +80,15 @@ transport_params = {
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
llm = GeminiMultimodalLiveLLMService(
# KNOWN ISSUE: If using GeminiLiveVertexLLMService, you cannot specify a
# modality other than AUDIO (at least not if using the service's default
# model, which is a native audio model:
# https://cloud.google.com/vertex-ai/generative-ai/docs/live-api/tools#native-audio).
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=SYSTEM_INSTRUCTION,
tools=[{"google_search": {}}, {"code_execution": {}}],
params=InputParams(modalities=GeminiMultimodalModalities.TEXT),
params=InputParams(modalities=GeminiModalities.TEXT),
)
# Optionally, you can set the response modalities via a function

View File

@@ -19,7 +19,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -83,7 +83,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
# Initialize the Gemini Multimodal Live model
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck
system_instruction=system_instruction,

View File

@@ -19,9 +19,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import (
GeminiMultimodalLiveLLMService,
)
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -110,7 +108,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""
# Initialize Gemini service with File API support
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck

View File

@@ -9,13 +9,13 @@ from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import Frame, LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live.gemini import GeminiMultimodalLiveLLMService
from pipecat.services.google.frames import LLMSearchResponseFrame
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
@@ -105,7 +105,7 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
custom_tools={AdapterType.GEMINI: [{"google_search": {}}, {"code_execution": {}}]},
)
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=SYSTEM_INSTRUCTION,
voice_id="Charon", # Aoede, Charon, Fenrir, Kore, Puck

View File

@@ -0,0 +1,191 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from datetime import datetime
from dotenv import load_dotenv
from google.genai.types import HttpOptions
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.google.gemini_live.llm_vertex import GeminiLiveVertexLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
"""
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events. This doesn't really
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events. This doesn't really
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events. This doesn't really
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
# KNOWN ISSUE: If using GeminiVertexLiveLLMService, it appears
# you cannot use the "google_search" tool alongside other tools.
# See https://github.com/googleapis/python-genai/issues/941.
tools = ToolsSchema(standard_tools=[weather_function, restaurant_function])
llm = GeminiLiveVertexLLMService(
credentials=os.getenv("GOOGLE_VERTEX_TEST_CREDENTIALS"),
project_id=os.getenv("GOOGLE_CLOUD_PROJECT_ID"),
location=os.getenv("GOOGLE_CLOUD_LOCATION"),
system_instruction=system_instruction,
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck
tools=tools,
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello."}],
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -0,0 +1,204 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import os
from datetime import datetime
from dotenv import load_dotenv
from loguru import logger
from pipecat.adapters.schemas.function_schema import FunctionSchema
from pipecat.adapters.schemas.tools_schema import AdapterType, ToolsSchema
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import EndTaskFrame, LLMRunFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
load_dotenv(override=True)
async def fetch_weather_from_api(params: FunctionCallParams):
temperature = 75 if params.arguments["format"] == "fahrenheit" else 24
await params.result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": params.arguments["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def fetch_restaurant_recommendation(params: FunctionCallParams):
await params.result_callback({"name": "The Golden Dragon"})
async def end_conversation(params: FunctionCallParams):
await params.result_callback({"success": True})
await params.llm.push_frame(EndTaskFrame(), FrameDirection.UPSTREAM)
system_instruction = """
You are a helpful assistant who can answer questions and use tools.
You have three tools available to you:
1. get_current_weather: Use this tool to get the current weather in a specific location.
2. get_restaurant_recommendation: Use this tool to get a restaurant recommendation in a specific location.
3. end_conversation: Use this tool to gracefully end the conversation.
After you've responded to the user three times, do two things, in order:
1. Politely let them know that that's all the time you have today and say goodbye.
2. Call the end_conversation tool to gracefully end the conversation.
"""
# We store functions so objects (e.g. SileroVADAnalyzer) don't get
# instantiated. The function will be called when the desired transport gets
# selected.
transport_params = {
"daily": lambda: DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events. This doesn't really
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"twilio": lambda: FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events. This doesn't really
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
# set stop_secs to something roughly similar to the internal setting
# of the Multimodal Live api, just to align events. This doesn't really
# matter because we can only use the Multimodal Live API's phrase
# endpointing, for now.
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
}
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info(f"Starting bot")
weather_function = FunctionSchema(
name="get_current_weather",
description="Get the current weather",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the user's location.",
},
},
required=["location", "format"],
)
restaurant_function = FunctionSchema(
name="get_restaurant_recommendation",
description="Get a restaurant recommendation",
properties={
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
},
required=["location"],
)
end_conversation_function = FunctionSchema(
name="end_conversation",
description="Gracefully end the conversation",
properties={},
required=[],
)
search_tool = {"google_search": {}}
tools = ToolsSchema(
standard_tools=[weather_function, restaurant_function, end_conversation_function],
custom_tools={AdapterType.GEMINI: [search_tool]},
)
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
system_instruction=system_instruction,
tools=tools,
)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("get_restaurant_recommendation", fetch_restaurant_recommendation)
llm.register_function("end_conversation", end_conversation)
context = OpenAILLMContext(
[{"role": "user", "content": "Say hello."}],
)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
enable_metrics=True,
enable_usage_metrics=True,
),
idle_timeout_secs=runner_args.pipeline_idle_timeout_secs,
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info(f"Client connected")
# Kick off the conversation.
await task.queue_frames([LLMRunFrame()])
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info(f"Client disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
"""Main bot entry point compatible with Pipecat Cloud."""
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()

View File

@@ -21,7 +21,7 @@ from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.aws_nova_sonic import AWSNovaSonicLLMService
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService
from pipecat.services.llm_service import FunctionCallParams
from pipecat.transports.base_transport import BaseTransport, TransportParams
from pipecat.transports.daily.transport import DailyParams

View File

@@ -20,7 +20,7 @@ from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIObserver, RTVIProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.services.gemini_multimodal_live import GeminiMultimodalLiveLLMService
from pipecat.services.google.gemini_live.llm import GeminiLiveLLMService
from pipecat.transports.base_transport import TransportParams
from pipecat.transports.daily.transport import DailyParams, DailyTransport
@@ -94,7 +94,7 @@ Respond to what the user said in a creative and helpful way. Keep your responses
async def run_bot(pipecat_transport):
llm = GeminiMultimodalLiveLLMService(
llm = GeminiLiveLLMService(
api_key=os.getenv("GOOGLE_API_KEY"),
voice_id="Puck", # Aoede, Charon, Fenrir, Kore, Puck
transcribe_user_audio=True,

View File

@@ -105,7 +105,7 @@ uv run 07-interruptible.py -t twilio -x NGROK_HOST_NAME
### Vision & Multimodal
- **[12a-describe-video-gemini-flash.py](./12a-describe-video-gemini-flash.py)**: Bot describes user's video (Video input, Multimodal LLMs)
- **[26c-gemini-multimodal-live-video.py](./26c-gemini-multimodal-live-video.py)**: Gemini with video input (Streaming video, Function calls)
- **[26c-gemini-live-video.py](./26c-gemini-live-video.py)**: Gemini with video input (Streaming video, Function calls)
### Voice & Language

View File

@@ -50,7 +50,7 @@ anthropic = [ "anthropic~=0.49.0" ]
assemblyai = [ "pipecat-ai[websockets-base]" ]
asyncai = [ "pipecat-ai[websockets-base]" ]
aws = [ "aioboto3~=15.0.0", "pipecat-ai[websockets-base]" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.0.2; python_version>='3.12'" ]
aws-nova-sonic = [ "aws_sdk_bedrock_runtime~=0.1.0; python_version>='3.12'" ]
azure = [ "azure-cognitiveservices-speech~=1.42.0"]
cartesia = [ "cartesia~=2.0.3", "pipecat-ai[websockets-base]" ]
cerebras = []
@@ -62,11 +62,12 @@ fal = [ "fal-client~=0.5.9" ]
fireworks = []
fish = [ "ormsgpack~=1.7.0", "pipecat-ai[websockets-base]" ]
gladia = [ "pipecat-ai[websockets-base]" ]
google = [ "google-cloud-speech~=2.32.0", "google-cloud-texttospeech~=2.26.0", "google-genai~=1.24.0", "pipecat-ai[websockets-base]" ]
google = [ "google-cloud-speech>=2.33.0,<3", "google-cloud-texttospeech>=2.31.0,<3", "google-genai>=1.41.0,<2", "pipecat-ai[websockets-base]" ]
grok = []
groq = [ "groq~=0.23.0" ]
gstreamer = [ "pygobject~=3.50.0" ]
heygen = [ "livekit>=1.0.13", "pipecat-ai[websockets-base]" ]
hume = [ "hume>=0.11.2" ]
inworld = []
krisp = [ "pipecat-ai-krisp~=0.4.0" ]
koala = [ "pvkoala~=2.0.3" ]

View File

@@ -67,6 +67,7 @@ TESTS_07 = [
("07ac-interruptible-asyncai-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07b-interruptible-langchain.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07c-interruptible-deepgram.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07c-interruptible-deepgram-flux.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07d-interruptible-elevenlabs.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"07d-interruptible-elevenlabs-http.py",
@@ -74,8 +75,6 @@ TESTS_07 = [
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
("07e-interruptible-playht.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07e-interruptible-playht-http.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07f-interruptible-azure.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07g-interruptible-openai.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07h-interruptible-openpipe.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
@@ -102,6 +101,7 @@ TESTS_07 = [
("07w-interruptible-fal.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07y-interruptible-minimax.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07z-interruptible-sarvam.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("07ae-interruptible-hume.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# Needs a local XTTS docker instance running.
# ("07i-interruptible-xtts.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# Needs a Krisp license.
@@ -147,7 +147,10 @@ TESTS_15 = [
]
TESTS_19 = [
("19-openai-realtime.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19-openai-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
# OpenAI Realtime not released on Azure yet
# ("19a-azure-realtime.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19a-azure-realtime-beta.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
("19b-openai-realtime-beta-text.py", PROMPT_WEATHER, EVAL_WEATHER, BOT_SPEAKS_FIRST),
@@ -160,18 +163,18 @@ TESTS_21 = [
TESTS_26 = [
("26-gemini-multimodal-live.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26a-gemini-multimodal-live-transcription.py",
"26a-gemini-live-transcription.py",
PROMPT_SIMPLE_MATH,
EVAL_SIMPLE_MATH,
BOT_SPEAKS_FIRST,
),
(
"26b-gemini-multimodal-live-function-calling.py",
"26b-gemini-live-function-calling.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
("26c-gemini-multimodal-live-video.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
("26c-gemini-live-video.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26e-gemini-multimodal-google-search.py",
PROMPT_ONLINE_SEARCH,
@@ -179,7 +182,13 @@ TESTS_26 = [
BOT_SPEAKS_FIRST,
),
# Currently not working.
# ("26d-gemini-multimodal-live-text.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
# ("26d-gemini-live-text.py", PROMPT_SIMPLE_MATH, EVAL_SIMPLE_MATH, BOT_SPEAKS_FIRST),
(
"26h-gemini-live-vertex-function-calling.py",
PROMPT_WEATHER,
EVAL_WEATHER,
BOT_SPEAKS_FIRST,
),
]
TESTS_27 = [

View File

@@ -87,9 +87,11 @@ class GeminiLLMAdapter(BaseLLMAdapter[GeminiLLMInvocationParams]):
Includes both converted standard tools and any custom Gemini-specific tools.
"""
functions_schema = tools_schema.standard_tools
formatted_standard_tools = [
{"function_declarations": [func.to_default_dict() for func in functions_schema]}
]
formatted_standard_tools = (
[{"function_declarations": [func.to_default_dict() for func in functions_schema]}]
if functions_schema
else []
)
custom_gemini_tools = []
if tools_schema.custom_tools:
custom_gemini_tools = tools_schema.custom_tools.get(AdapterType.GEMINI, [])

View File

@@ -0,0 +1,193 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Krisp noise reduction audio filter for Pipecat.
This module provides an audio filter implementation using Krisp VIVA SDK.
"""
import os
import numpy as np
from loguru import logger
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
try:
import krisp_audio
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the Krisp filter, you need to install krisp_audio.")
raise Exception(f"Missing module: {e}")
def _log_callback(log_message, log_level):
logger.info(f"[{log_level}] {log_message}")
class KrispVivaFilter(BaseAudioFilter):
"""Audio filter using the Krisp VIVA SDK.
Provides real-time noise reduction for audio streams using Krisp's
proprietary noise suppression algorithms. This filter requires a
valid Krisp model file to operate.
Supported sample rates:
- 8000 Hz
- 16000 Hz
- 24000 Hz
- 32000 Hz
- 44100 Hz
- 48000 Hz
"""
# Initialize Krisp Audio SDK globally
krisp_audio.globalInit("", _log_callback, krisp_audio.LogLevel.Off)
SDK_VERSION = krisp_audio.getVersion()
logger.debug(
f"Krisp Audio Python SDK Version: {SDK_VERSION.major}."
f"{SDK_VERSION.minor}.{SDK_VERSION.patch}"
)
SAMPLE_RATES = {
8000: krisp_audio.SamplingRate.Sr8000Hz,
16000: krisp_audio.SamplingRate.Sr16000Hz,
24000: krisp_audio.SamplingRate.Sr24000Hz,
32000: krisp_audio.SamplingRate.Sr32000Hz,
44100: krisp_audio.SamplingRate.Sr44100Hz,
48000: krisp_audio.SamplingRate.Sr48000Hz,
}
FRAME_SIZE_MS = 10 # Krisp requires audio frames of 10ms duration for processing.
def __init__(self, model_path: str = None, noise_suppression_level: int = 100) -> None:
"""Initialize the Krisp noise reduction filter.
Args:
model_path: Path to the Krisp model file (.kef extension).
If None, uses KRISP_VIVA_MODEL_PATH environment variable.
noise_suppression_level: Noise suppression level.
Raises:
ValueError: If model_path is not provided and KRISP_VIVA_MODEL_PATH is not set.
Exception: If model file doesn't have .kef extension.
FileNotFoundError: If model file doesn't exist.
"""
super().__init__()
# Set model path, checking environment if not specified
self._model_path = model_path or os.getenv("KRISP_VIVA_MODEL_PATH")
if not self._model_path:
logger.error("Model path is not provided and KRISP_VIVA_MODEL_PATH is not set.")
raise ValueError("Model path for KrispAudioProcessor must be provided.")
if not self._model_path.endswith(".kef"):
raise Exception("Model is expected with .kef extension")
if not os.path.isfile(self._model_path):
raise FileNotFoundError(f"Model file not found: {self._model_path}")
self._filtering = True
self._session = None
self._samples_per_frame = None
self._noise_suppression_level = noise_suppression_level
# Audio buffer to accumulate samples for complete frames
self._audio_buffer = bytearray()
def _int_to_sample_rate(self, sample_rate):
"""Convert integer sample rate to krisp_audio SamplingRate enum.
Args:
sample_rate: Sample rate as integer
Returns:
krisp_audio.SamplingRate enum value
Raises:
ValueError: If sample rate is not supported
"""
if sample_rate not in self.SAMPLE_RATES:
raise ValueError("Unsupported sample rate")
return self.SAMPLE_RATES[sample_rate]
async def start(self, sample_rate: int):
"""Initialize the Krisp processor with the transport's sample rate.
Args:
sample_rate: The sample rate of the input transport in Hz.
"""
model_info = krisp_audio.ModelInfo()
model_info.path = self._model_path
nc_cfg = krisp_audio.NcSessionConfig()
nc_cfg.inputSampleRate = self._int_to_sample_rate(sample_rate)
nc_cfg.inputFrameDuration = krisp_audio.FrameDuration.Fd10ms
nc_cfg.outputSampleRate = nc_cfg.inputSampleRate
nc_cfg.modelInfo = model_info
self._samples_per_frame = int((sample_rate * self.FRAME_SIZE_MS) / 1000)
self._session = krisp_audio.NcInt16.create(nc_cfg)
async def stop(self):
"""Clean up the Krisp processor when stopping."""
self._session = None
async def process_frame(self, frame: FilterControlFrame):
"""Process control frames to enable/disable filtering.
Args:
frame: The control frame containing filter commands.
"""
if isinstance(frame, FilterEnableFrame):
self._filtering = frame.enable
async def filter(self, audio: bytes) -> bytes:
"""Apply Krisp noise reduction to audio data.
Args:
audio: Raw audio data as bytes to be filtered.
Returns:
Noise-reduced audio data as bytes.
"""
if not self._filtering:
return audio
# Add incoming audio to our buffer
self._audio_buffer.extend(audio)
# Calculate how many complete frames we can process
total_samples = len(self._audio_buffer) // 2 # 2 bytes per int16 sample
num_complete_frames = total_samples // self._samples_per_frame
if num_complete_frames == 0:
# Not enough samples for a complete frame yet, return empty
return b""
# Calculate how many bytes we need for complete frames
complete_samples_count = num_complete_frames * self._samples_per_frame
bytes_to_process = complete_samples_count * 2 # 2 bytes per sample
# Extract the bytes we can process
audio_to_process = bytes(self._audio_buffer[:bytes_to_process])
# Remove processed bytes from buffer, keep the remainder
self._audio_buffer = self._audio_buffer[bytes_to_process:]
# Process the complete frames
samples = np.frombuffer(audio_to_process, dtype=np.int16)
frames = samples.reshape(-1, self._samples_per_frame)
processed_samples = np.empty_like(samples)
for i, frame in enumerate(frames):
cleaned_frame = self._session.process(frame, self._noise_suppression_level)
processed_samples[i * self._samples_per_frame : (i + 1) * self._samples_per_frame] = (
cleaned_frame
)
return processed_samples.tobytes()

View File

@@ -672,7 +672,7 @@ class TTSSpeakFrame(DataFrame):
@dataclass
class TransportMessageFrame(DataFrame):
class OutputTransportMessageFrame(DataFrame):
"""Frame containing transport-specific message data.
Parameters:
@@ -685,6 +685,32 @@ class TransportMessageFrame(DataFrame):
return f"{self.name}(message: {self.message})"
@dataclass
class TransportMessageFrame(OutputTransportMessageFrame):
"""Frame containing transport-specific message data.
.. deprecated:: 0.0.87
This frame is deprecated and will be removed in a future version.
Instead, use `OutputTransportMessageFrame`.
Parameters:
message: The transport message payload.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"TransportMessageFrame is deprecated and will be removed in a future version. "
"Instead, use OutputTransportMessageFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DTMFFrame:
"""Base class for DTMF (Dual-Tone Multi-Frequency) keypad frames.
@@ -1092,8 +1118,8 @@ class STTMuteFrame(SystemFrame):
@dataclass
class TransportMessageUrgentFrame(SystemFrame):
"""Frame for urgent transport messages that need immediate processing.
class InputTransportMessageFrame(SystemFrame):
"""Frame for transport messages received from external sources.
Parameters:
message: The urgent transport message payload.
@@ -1106,20 +1132,69 @@ class TransportMessageUrgentFrame(SystemFrame):
@dataclass
class InputTransportMessageUrgentFrame(TransportMessageUrgentFrame):
class InputTransportMessageUrgentFrame(InputTransportMessageFrame):
"""Frame for transport messages received from external sources.
This frame wraps incoming transport messages to distinguish them from outgoing
urgent transport messages (TransportMessageUrgentFrame), preventing infinite
message loops in the transport layer. It inherits the message payload from
TransportMessageFrame while marking the message as having been received
rather than generated locally.
.. deprecated:: 0.0.87
This frame is deprecated and will be removed in a future version.
Instead, use `InputTransportMessageFrame`.
Used by transport implementations to properly handle bidirectional message
flow without creating feedback loops.
Parameters:
message: The urgent transport message payload.
"""
pass
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"InputTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
"Instead, use InputTransportMessageFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class OutputTransportMessageUrgentFrame(SystemFrame):
"""Frame for urgent transport messages that need to be sent immediately.
Parameters:
message: The urgent transport message payload.
"""
message: Any
def __str__(self):
return f"{self.name}(message: {self.message})"
@dataclass
class TransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
"""Frame for urgent transport messages that need to be sent immediately.
.. deprecated:: 0.0.87
This frame is deprecated and will be removed in a future version.
Instead, use `OutputTransportMessageUrgentFrame`.
Parameters:
message: The urgent transport message payload.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"TransportMessageUrgentFrame is deprecated and will be removed in a future version. "
"Instead, use OutputTransportMessageFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass

View File

@@ -130,12 +130,16 @@ class PipelineTask(BasePipelineTask):
- on_pipeline_finished: Called after the pipeline has reached any terminal state.
This includes:
- StopFrame: pipeline was stopped (processors keep connections open)
- EndFrame: pipeline ended normally
- CancelFrame: pipeline was cancelled
Use this event for cleanup, logging, or post-processing tasks. Users can inspect
the frame if they need to handle specific cases.
- on_pipeline_error: Called when an error occurs with ErrorFrame
Example::
@task.event_handler("on_frame_reached_upstream")
@@ -146,9 +150,17 @@ class PipelineTask(BasePipelineTask):
async def on_pipeline_idle_timeout(task):
...
@task.event_handler("on_pipeline_started")
async def on_pipeline_started(task, frame):
...
@task.event_handler("on_pipeline_finished")
async def on_pipeline_finished(task, frame):
...
@task.event_handler("on_pipeline_error")
async def on_pipeline_error(task, frame):
...
"""
def __init__(
@@ -286,6 +298,7 @@ class PipelineTask(BasePipelineTask):
self._register_event_handler("on_pipeline_ended")
self._register_event_handler("on_pipeline_cancelled")
self._register_event_handler("on_pipeline_finished")
self._register_event_handler("on_pipeline_error")
@property
def params(self) -> PipelineParams:
@@ -692,12 +705,11 @@ class PipelineTask(BasePipelineTask):
logger.debug(f"{self}: received interruption task frame {frame}")
await self._pipeline.queue_frame(InterruptionFrame())
elif isinstance(frame, ErrorFrame):
await self._call_event_handler("on_pipeline_error", frame)
if frame.fatal:
logger.error(f"A fatal error occurred: {frame}")
# Cancel all tasks downstream.
await self.queue_frame(CancelFrame())
# Tell the task we should stop.
await self.queue_frame(StopTaskFrame())
else:
logger.warning(f"{self}: Something went wrong: {frame}")

View File

@@ -877,6 +877,8 @@ class FrameProcessor(BaseObject):
"""
while True:
(frame, direction, callback) = await self.__input_queue.get()
if self.__should_block_system_frames and self.__input_event:
logger.trace(f"{self}: system frame processing paused")
await self.__input_event.wait()
@@ -884,8 +886,6 @@ class FrameProcessor(BaseObject):
self.__should_block_system_frames = False
logger.trace(f"{self}: system frame processing resumed")
(frame, direction, callback) = await self.__input_queue.get()
if isinstance(frame, SystemFrame):
await self.__process_frame(frame, direction, callback)
elif self.__process_queue:
@@ -900,6 +900,8 @@ class FrameProcessor(BaseObject):
async def __process_frame_task_handler(self):
"""Handle non-system frames from the process queue."""
while True:
(frame, direction, callback) = await self.__process_queue.get()
if self.__should_block_frames and self.__process_event:
logger.trace(f"{self}: frame processing paused")
await self.__process_event.wait()
@@ -907,8 +909,6 @@ class FrameProcessor(BaseObject):
self.__should_block_frames = False
logger.trace(f"{self}: frame processing resumed")
(frame, direction, callback) = await self.__process_queue.get()
await self.__process_frame(frame, direction, callback)
self.__process_queue.task_done()

View File

@@ -42,6 +42,7 @@ from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
InputAudioRawFrame,
InputTransportMessageFrame,
InterimTranscriptionFrame,
LLMConfigureOutputFrame,
LLMContextFrame,
@@ -50,10 +51,10 @@ from pipecat.frames.frames import (
LLMMessagesAppendFrame,
LLMTextFrame,
MetricsFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
SystemFrame,
TranscriptionFrame,
TransportMessageUrgentFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
@@ -919,7 +920,7 @@ class RTVIObserverParams:
user_audio_level_enabled: bool = False
metrics_enabled: bool = True
system_logs_enabled: bool = False
errors_enabled: bool = True
errors_enabled: Optional[bool] = None
audio_level_period_secs: float = 0.15
@@ -962,7 +963,7 @@ class RTVIObserver(BaseObserver):
if self._params.system_logs_enabled:
self._system_logger_id = logger.add(self._logger_sink)
if self._params.errors_enabled:
if self._params.errors_enabled is not None:
import warnings
with warnings.catch_warnings():
@@ -1209,11 +1210,10 @@ class RTVIObserver(BaseObserver):
async def _send_error_response(self, frame: RTVIServerResponseFrame):
"""Send a response to the client for a specific request."""
if self._params.errors_enabled:
message = RTVIErrorResponse(
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
)
await self.send_rtvi_message(message)
message = RTVIErrorResponse(
id=str(frame.client_msg.msg_id), data=RTVIErrorResponseData(error=frame.error)
)
await self.send_rtvi_message(message)
class RTVIProcessor(FrameProcessor):
@@ -1346,7 +1346,9 @@ class RTVIProcessor(FrameProcessor):
async def push_transport_message(self, model: BaseModel, exclude_none: bool = True):
"""Push a transport message frame."""
frame = TransportMessageUrgentFrame(message=model.model_dump(exclude_none=exclude_none))
frame = OutputTransportMessageUrgentFrame(
message=model.model_dump(exclude_none=exclude_none)
)
await self.push_frame(frame)
async def handle_message(self, message: RTVIMessage):
@@ -1419,7 +1421,7 @@ class RTVIProcessor(FrameProcessor):
elif isinstance(frame, ErrorFrame):
await self._send_error_frame(frame)
await self.push_frame(frame, direction)
elif isinstance(frame, TransportMessageUrgentFrame):
elif isinstance(frame, InputTransportMessageFrame):
await self._handle_transport_message(frame)
# All other system frames
elif isinstance(frame, SystemFrame):
@@ -1482,7 +1484,7 @@ class RTVIProcessor(FrameProcessor):
await self._handle_message(message)
self._message_queue.task_done()
async def _handle_transport_message(self, frame: TransportMessageUrgentFrame):
async def _handle_transport_message(self, frame: InputTransportMessageFrame):
"""Handle an incoming transport message frame."""
try:
transport_message = frame.message

View File

@@ -15,7 +15,7 @@ from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
TransportMessageFrame,
UserSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -36,9 +36,9 @@ class FrameLogger(FrameProcessor):
color: Optional[str] = None,
ignored_frame_types: Tuple[Type[Frame], ...] = (
BotSpeakingFrame,
UserSpeakingFrame,
InputAudioRawFrame,
OutputAudioRawFrame,
TransportMessageFrame,
),
):
"""Initialize the frame logger.

View File

@@ -67,10 +67,15 @@ To run locally:
import argparse
import asyncio
import mimetypes
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import aiohttp
from fastapi.responses import FileResponse
from loguru import logger
from pipecat.runner.types import (
@@ -82,7 +87,7 @@ from pipecat.runner.types import (
try:
import uvicorn
from dotenv import load_dotenv
from fastapi import BackgroundTasks, FastAPI, Request, WebSocket
from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Request, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, RedirectResponse
except ImportError as e:
@@ -96,6 +101,12 @@ except ImportError as e:
load_dotenv(override=True)
os.environ["ENV"] = "local"
TELEPHONY_TRANSPORTS = ["twilio", "telnyx", "plivo", "exotel"]
RUNNER_DOWNLOADS_FOLDER: Optional[str] = None
RUNNER_HOST: str = "localhost"
RUNNER_PORT: int = 7860
def _get_bot_module():
"""Get the bot module from the calling script."""
@@ -150,7 +161,12 @@ async def _run_telephony_bot(websocket: WebSocket):
def _create_server_app(
transport_type: str, host: str = "localhost", proxy: str = None, esp32_mode: bool = False
*,
transport_type: str,
host: str = "localhost",
proxy: str,
esp32_mode: bool = False,
folder: Optional[str] = None,
):
"""Create FastAPI app with transport-specific routes."""
app = FastAPI()
@@ -165,18 +181,21 @@ def _create_server_app(
# Set up transport-specific routes
if transport_type == "webrtc":
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host)
_setup_webrtc_routes(app, esp32_mode=esp32_mode, host=host, folder=folder)
_setup_whatsapp_routes(app)
elif transport_type == "daily":
_setup_daily_routes(app)
elif transport_type in ["twilio", "telnyx", "plivo", "exotel"]:
_setup_telephony_routes(app, transport_type, proxy)
elif transport_type in TELEPHONY_TRANSPORTS:
_setup_telephony_routes(app, transport_type=transport_type, proxy=proxy)
else:
logger.warning(f"Unknown transport type: {transport_type}")
return app
def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "localhost"):
def _setup_webrtc_routes(
app: FastAPI, *, esp32_mode: bool = False, host: str = "localhost", folder: Optional[str] = None
):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
@@ -198,6 +217,21 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
"""Redirect root requests to client interface."""
return RedirectResponse(url="/client/")
@app.get("/files/{filename}")
async def download_file(filename: str):
"""Handle file downloads."""
if not folder:
logger.warning(f"Attempting to dowload {filename}, but downloads folder not setup.")
return
file_path = Path(folder) / filename
if not os.path.exists(file_path):
raise HTTPException(404)
media_type, _ = mimetypes.guess_type(file_path)
return FileResponse(path=file_path, media_type=media_type, filename=filename)
# Initialize the SmallWebRTC request handler
small_webrtc_handler: SmallWebRTCRequestHandler = SmallWebRTCRequestHandler(
esp32_mode=esp32_mode, host=host
@@ -221,12 +255,184 @@ def _setup_webrtc_routes(app: FastAPI, esp32_mode: bool = False, host: str = "lo
return answer
@asynccontextmanager
async def lifespan(app: FastAPI):
async def smallwebrtc_lifespan(app: FastAPI):
"""Manage FastAPI application lifecycle and cleanup connections."""
yield
await small_webrtc_handler.close()
app.router.lifespan_context = lifespan
# Add the SmallWebRTC lifespan to the app
_add_lifespan_to_app(app, smallwebrtc_lifespan)
def _add_lifespan_to_app(app: FastAPI, new_lifespan):
"""Add a new lifespan context manager to the app, combining with existing if present.
Args:
app: The FastAPI application instance
new_lifespan: The new lifespan context manager to add
"""
if hasattr(app.router, "lifespan_context") and app.router.lifespan_context is not None:
# If there's already a lifespan context, combine them
existing_lifespan = app.router.lifespan_context
@asynccontextmanager
async def combined_lifespan(app: FastAPI):
async with existing_lifespan(app):
async with new_lifespan(app):
yield
app.router.lifespan_context = combined_lifespan
else:
# No existing lifespan, use the new one
app.router.lifespan_context = new_lifespan
def _setup_whatsapp_routes(app: FastAPI):
"""Set up WebRTC-specific routes."""
try:
from pipecat_ai_small_webrtc_prebuilt.frontend import SmallWebRTCPrebuiltUI
from pipecat.transports.smallwebrtc.connection import SmallWebRTCConnection
from pipecat.transports.smallwebrtc.request_handler import (
SmallWebRTCRequest,
SmallWebRTCRequestHandler,
)
from pipecat.transports.whatsapp.api import WhatsAppWebhookRequest
from pipecat.transports.whatsapp.client import WhatsAppClient
except ImportError as e:
logger.error(f"WebRTC transport dependencies not installed: {e}")
return
WHATSAPP_TOKEN = os.getenv("WHATSAPP_TOKEN")
WHATSAPP_PHONE_NUMBER_ID = os.getenv("WHATSAPP_PHONE_NUMBER_ID")
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN = os.getenv("WHATSAPP_WEBHOOK_VERIFICATION_TOKEN")
WHATSAPP_APP_SECRET = os.getenv("WHATSAPP_APP_SECRET")
if not all(
[
WHATSAPP_TOKEN,
WHATSAPP_PHONE_NUMBER_ID,
WHATSAPP_WEBHOOK_VERIFICATION_TOKEN,
]
):
logger.debug(
"Missing required environment variables for WhatsApp transport. Keeping it disabled."
)
return
# Global WhatsApp client instance
whatsapp_client: Optional[WhatsAppClient] = None
@app.get(
"/whatsapp",
summary="Verify WhatsApp webhook",
description="Handles WhatsApp webhook verification requests from Meta",
)
async def verify_webhook(request: Request):
"""Verify WhatsApp webhook endpoint.
This endpoint is called by Meta's WhatsApp Business API to verify
the webhook URL during setup. It validates the verification token
and returns the challenge parameter if successful.
"""
if whatsapp_client is None:
logger.error("WhatsApp client is not initialized")
raise HTTPException(status_code=503, detail="Service unavailable")
params = dict(request.query_params)
logger.debug(f"Webhook verification request received with params: {list(params.keys())}")
try:
result = await whatsapp_client.handle_verify_webhook_request(
params=params, expected_verification_token=WHATSAPP_WEBHOOK_VERIFICATION_TOKEN
)
logger.info("Webhook verification successful")
return result
except ValueError as e:
logger.warning(f"Webhook verification failed: {e}")
raise HTTPException(status_code=403, detail="Verification failed")
@app.post(
"/whatsapp",
summary="Handle WhatsApp webhook events",
description="Processes incoming WhatsApp messages and call events",
)
async def whatsapp_webhook(
body: WhatsAppWebhookRequest,
background_tasks: BackgroundTasks,
request: Request,
x_hub_signature_256: str = Header(None),
):
"""Handle incoming WhatsApp webhook events.
For call events, establishes WebRTC connections and spawns bot instances
in the background to handle real-time communication.
"""
if whatsapp_client is None:
logger.error("WhatsApp client is not initialized")
raise HTTPException(status_code=503, detail="Service unavailable")
# Validate webhook object type
if body.object != "whatsapp_business_account":
logger.warning(f"Invalid webhook object type: {body.object}")
raise HTTPException(status_code=400, detail="Invalid object type")
logger.debug(f"Processing WhatsApp webhook: {body.model_dump()}")
async def connection_callback(connection: SmallWebRTCConnection):
"""Handle new WebRTC connections from WhatsApp calls.
Called when a WebRTC connection is established for a WhatsApp call.
Spawns a bot instance to handle the conversation.
Args:
connection: The established WebRTC connection
"""
bot_module = _get_bot_module()
runner_args = SmallWebRTCRunnerArguments(webrtc_connection=connection)
background_tasks.add_task(bot_module.bot, runner_args)
try:
# Process the webhook request
raw_body = await request.body()
result = await whatsapp_client.handle_webhook_request(
body, connection_callback, sha256_signature=x_hub_signature_256, raw_body=raw_body
)
logger.debug(f"Webhook processed successfully: {result}")
return {"status": "success", "message": "Webhook processed successfully"}
except ValueError as ve:
logger.warning(f"Invalid webhook request format: {ve}")
raise HTTPException(status_code=400, detail=f"Invalid request: {str(ve)}")
except Exception as e:
logger.error(f"Internal error processing webhook: {e}")
raise HTTPException(status_code=500, detail="Internal server error processing webhook")
@asynccontextmanager
async def whatsapp_lifespan(app: FastAPI):
"""Manage WhatsApp client lifecycle and cleanup connections."""
nonlocal whatsapp_client
# Initialize WhatsApp client with persistent HTTP session
async with aiohttp.ClientSession() as session:
whatsapp_client = WhatsAppClient(
whatsapp_token=WHATSAPP_TOKEN,
whatsapp_secret=WHATSAPP_APP_SECRET,
phone_number_id=WHATSAPP_PHONE_NUMBER_ID,
session=session,
)
logger.info("WhatsApp client initialized successfully")
try:
yield # Run the application
finally:
# Cleanup all active calls on shutdown
logger.info("Cleaning up WhatsApp client resources...")
if whatsapp_client:
await whatsapp_client.terminate_all_calls()
logger.info("WhatsApp cleanup completed")
# Add the WhatsApp lifespan to the app
_add_lifespan_to_app(app, whatsapp_lifespan)
def _setup_daily_routes(app: FastAPI):
@@ -314,7 +520,7 @@ def _setup_daily_routes(app: FastAPI):
return await _handle_rtvi_request(request)
def _setup_telephony_routes(app: FastAPI, transport_type: str, proxy: str):
def _setup_telephony_routes(app: FastAPI, *, transport_type: str, proxy: str):
"""Set up telephony-specific routes."""
# XML response templates (Exotel doesn't use XML webhooks)
XML_TEMPLATES = {
@@ -417,6 +623,21 @@ def _validate_and_clean_proxy(proxy: str) -> str:
return proxy
def runner_downloads_folder() -> Optional[str]:
"""Returns the folder where files are stored for later download."""
return RUNNER_DOWNLOADS_FOLDER
def runner_host() -> str:
"""Returns the host name of this runner."""
return RUNNER_HOST
def runner_port() -> int:
"""Returns the port of this runner."""
return RUNNER_PORT
def main():
"""Start the Pipecat development runner.
@@ -437,14 +658,16 @@ def main():
The bot file must contain a `bot(runner_args)` function as the entry point.
"""
global RUNNER_DOWNLOADS_FOLDER, RUNNER_HOST, RUNNER_PORT
parser = argparse.ArgumentParser(description="Pipecat Development Runner")
parser.add_argument("--host", type=str, default="localhost", help="Host address")
parser.add_argument("--port", type=int, default=7860, help="Port number")
parser.add_argument("--host", type=str, default=RUNNER_HOST, help="Host address")
parser.add_argument("--port", type=int, default=RUNNER_PORT, help="Port number")
parser.add_argument(
"-t",
"--transport",
type=str,
choices=["daily", "webrtc", "twilio", "telnyx", "plivo", "exotel"],
choices=["daily", "webrtc", *TELEPHONY_TRANSPORTS],
default="webrtc",
help="Transport type",
)
@@ -462,6 +685,7 @@ def main():
default=False,
help="Connect directly to Daily room (automatically sets transport to daily)",
)
parser.add_argument("-f", "--folder", type=str, help="Path to downloads folder")
parser.add_argument(
"--verbose", "-v", action="count", default=0, help="Increase logging verbosity"
)
@@ -484,6 +708,10 @@ def main():
logger.error("For ESP32, you need to specify `--host IP` so we can do SDP munging.")
return
if args.transport in TELEPHONY_TRANSPORTS and not args.proxy:
logger.error(f"For telephony transports, you need to specify `--proxy PROXY`.")
return
# Log level
logger.remove()
logger.add(sys.stderr, level="TRACE" if args.verbose else "DEBUG")
@@ -514,8 +742,18 @@ def main():
print(f" → Open http://{args.host}:{args.port} in your browser to start a session")
print()
RUNNER_DOWNLOADS_FOLDER = args.folder
RUNNER_HOST = args.host
RUNNER_PORT = args.port
# Create the app with transport-specific setup
app = _create_server_app(args.transport, args.host, args.proxy, args.esp32)
app = _create_server_app(
transport_type=args.transport,
host=args.host,
proxy=args.proxy,
esp32_mode=args.esp32,
folder=args.folder,
)
# Run the server
uvicorn.run(app, host=args.host, port=args.port)

View File

@@ -99,29 +99,41 @@ async def parse_telephony_websocket(websocket: WebSocket):
tuple: (transport_type: str, call_data: dict)
call_data contains provider-specific fields:
- Twilio: {
"stream_id": str,
"call_id": str,
"body": dict
}
- Telnyx: {
"stream_id": str,
"call_control_id": str,
"outbound_encoding": str,
"from": str,
"to": str,
}
- Plivo: {
"stream_id": str,
"call_id": str,
}
- Exotel: {
"stream_id": str,
"call_id": str,
"account_sid": str,
"from": str,
"to": str,
}
- Twilio::
{
"stream_id": str,
"call_id": str,
"body": dict
}
- Telnyx::
{
"stream_id": str,
"call_control_id": str,
"outbound_encoding": str,
"from": str,
"to": str,
}
- Plivo::
{
"stream_id": str,
"call_id": str,
}
- Exotel::
{
"stream_id": str,
"call_id": str,
"account_sid": str,
"from": str,
"to": str,
}
Example usage::
@@ -301,6 +313,7 @@ def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
Returns:
Cleaned SDP text with filtered ICE candidates.
"""
logger.debug("Removing unsupported ICE candidates from SDP")
result = []
lines = text.splitlines()
for line in lines:
@@ -309,7 +322,7 @@ def _smallwebrtc_sdp_cleanup_ice_candidates(text: str, pattern: str) -> str:
result.append(line)
else:
result.append(line)
return "\r\n".join(result)
return "\r\n".join(result) + "\r\n"
def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
@@ -321,15 +334,16 @@ def _smallwebrtc_sdp_cleanup_fingerprints(text: str) -> str:
Returns:
SDP text with sha-384 and sha-512 fingerprints removed.
"""
logger.debug("Removing unsupported fingerprints from SDP")
result = []
lines = text.splitlines()
for line in lines:
if not re.search("sha-384", line) and not re.search("sha-512", line):
result.append(line)
return "\r\n".join(result)
return "\r\n".join(result) + "\r\n"
def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
def smallwebrtc_sdp_munging(sdp: str, host: Optional[str]) -> str:
"""Apply SDP modifications for SmallWebRTC compatibility.
Args:
@@ -340,7 +354,8 @@ def smallwebrtc_sdp_munging(sdp: str, host: str) -> str:
Modified SDP string with fingerprint and ICE candidate cleanup.
"""
sdp = _smallwebrtc_sdp_cleanup_fingerprints(sdp)
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
if host:
sdp = _smallwebrtc_sdp_cleanup_ice_candidates(sdp, host)
return sdp

View File

@@ -21,9 +21,9 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -121,7 +121,7 @@ class ExotelFrameSerializer(FrameSerializer):
}
return json.dumps(answer)
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
return json.dumps(frame.message)
return None

View File

@@ -25,11 +25,31 @@ except ModuleNotFoundError as e:
class LivekitFrameSerializer(FrameSerializer):
"""Serializer for converting between Pipecat frames and LiveKit audio frames.
.. deprecated:: 0.0.90
This class is deprecated and will be removed in a future version.
Please use LiveKitTransport instead, which handles audio streaming
and frame conversion natively.
This serializer handles the conversion of Pipecat's OutputAudioRawFrame objects
to LiveKit AudioFrame objects for transmission, and the reverse conversion
for received audio data.
"""
def __init__(self):
"""Initialize the LiveKit frame serializer."""
super().__init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"LivekitFrameSerializer is deprecated and will be removed in a future version. "
"Please use LiveKitTransport instead, which handles audio streaming natively.",
DeprecationWarning,
stacklevel=2,
)
@property
def type(self) -> FrameSerializerType:
"""Get the serializer type.

View File

@@ -23,9 +23,9 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -148,7 +148,7 @@ class PlivoFrameSerializer(FrameSerializer):
}
return json.dumps(answer)
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
return json.dumps(frame.message)
# Return None for unhandled frames

View File

@@ -15,11 +15,12 @@ import pipecat.frames.protobufs.frames_pb2 as frame_protos
from pipecat.frames.frames import (
Frame,
InputAudioRawFrame,
InputTransportMessageFrame,
OutputAudioRawFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
TextFrame,
TranscriptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -82,7 +83,7 @@ class ProtobufFrameSerializer(FrameSerializer):
Serialized frame as bytes, or None if frame type is not serializable.
"""
# Wrapping this messages as a JSONFrame to send
if isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
if isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
frame = MessageFrame(
data=json.dumps(frame.message),
)
@@ -134,11 +135,11 @@ class ProtobufFrameSerializer(FrameSerializer):
if "pts" in args_dict:
del args_dict["pts"]
# Special handling for MessageFrame -> TransportMessageUrgentFrame
# Special handling for MessageFrame -> OutputTransportMessageUrgentFrame
if class_name == MessageFrame:
try:
msg = json.loads(args_dict["data"])
instance = TransportMessageUrgentFrame(message=msg)
instance = InputTransportMessageFrame(message=msg)
logger.debug(f"ProtobufFrameSerializer: Transport message {instance}")
except Exception as e:
logger.error(f"Error parsing MessageFrame data: {e}")

View File

@@ -23,9 +23,9 @@ from pipecat.frames.frames import (
InputAudioRawFrame,
InputDTMFFrame,
InterruptionFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
StartFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
)
from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
@@ -175,7 +175,7 @@ class TwilioFrameSerializer(FrameSerializer):
}
return json.dumps(answer)
elif isinstance(frame, (TransportMessageFrame, TransportMessageUrgentFrame)):
elif isinstance(frame, (OutputTransportMessageFrame, OutputTransportMessageUrgentFrame)):
return json.dumps(frame.message)
# Return None for unhandled frames

View File

@@ -97,9 +97,7 @@ class AIService(FrameProcessor):
pass
async def _update_settings(self, settings: Mapping[str, Any]):
from pipecat.services.openai_realtime_beta.events import (
SessionProperties,
)
from pipecat.services.openai.realtime.events import SessionProperties
for key, value in settings.items():
logger.debug("Update request for:", key, value)
@@ -111,9 +109,7 @@ class AIService(FrameProcessor):
logger.debug("Attempting to update", key, value)
try:
from pipecat.services.openai_realtime_beta.events import (
TurnDetection,
)
from pipecat.services.openai.realtime.events import TurnDetection
if isinstance(self._session_properties, SessionProperties):
current_properties = self._session_properties

View File

@@ -9,6 +9,7 @@ import sys
from pipecat.services import DeprecatedModuleProxy
from .llm import *
from .nova_sonic import *
from .stt import *
from .tts import *

View File

@@ -61,7 +61,6 @@ from pipecat.utils.tracing.service_decorators import traced_llm
try:
import aioboto3
import httpx
from botocore.config import Config
from botocore.exceptions import ReadTimeoutError
except ModuleNotFoundError as e:
@@ -1117,7 +1116,7 @@ class AWSBedrockLLMService(LLMService):
# also get cancelled.
use_completion_tokens_estimate = True
raise
except httpx.TimeoutException:
except (ReadTimeoutError, asyncio.TimeoutError):
await self._call_event_handler("on_completion_timeout")
except Exception as e:
logger.exception(f"{self} exception: {e}")

View File

@@ -0,0 +1,367 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Context management for AWS Nova Sonic LLM service.
This module provides specialized context aggregators and message handling for AWS Nova Sonic,
including conversation history management and role-specific message processing.
"""
import copy
from dataclasses import dataclass, field
from enum import Enum
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
DataFrame,
Frame,
FunctionCallResultFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
TextFrame,
UserImageRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws.nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
@dataclass
class AWSNovaSonicConversationHistory:
"""Complete conversation history for AWS Nova Sonic initialization.
Parameters:
system_instruction: System-level instruction for the conversation.
messages: List of conversation messages between user and assistant.
"""
system_instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
class AWSNovaSonicLLMContext(OpenAILLMContext):
"""Specialized LLM context for AWS Nova Sonic service.
Extends OpenAI context with Nova Sonic-specific message handling,
conversation history management, and text buffering capabilities.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize AWS Nova Sonic LLM context.
Args:
messages: Initial messages for the context.
tools: Available tools for the context.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self, system_instruction: str = ""):
self._assistant_text = ""
self._user_text = ""
self._system_instruction = system_instruction
@staticmethod
def upgrade_to_nova_sonic(
obj: OpenAILLMContext, system_instruction: str
) -> "AWSNovaSonicLLMContext":
"""Upgrade an OpenAI context to AWS Nova Sonic context.
Args:
obj: The OpenAI context to upgrade.
system_instruction: System instruction for the context.
Returns:
The upgraded AWS Nova Sonic context.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
obj.__class__ = AWSNovaSonicLLMContext
obj.__setup_local(system_instruction)
return obj
# NOTE: this method has the side-effect of updating _system_instruction from messages
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
"""Get conversation history for initializing AWS Nova Sonic session.
Processes stored messages and extracts system instruction and conversation
history in the format expected by AWS Nova Sonic.
Returns:
Formatted conversation history with system instruction and messages.
"""
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
# Bail if there are no messages
if not self.messages:
return history
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
history.system_instruction = content
elif isinstance(content, list):
history.system_instruction = content[0].get("text")
if history.system_instruction:
self._system_instruction = history.system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for message in messages:
history_message = self.from_standard_message(message)
if history_message:
history.messages.append(history_message)
return history
def get_messages_for_persistent_storage(self):
"""Get messages formatted for persistent storage.
Returns:
List of messages including system instruction if present.
"""
messages = super().get_messages_for_persistent_storage()
# If we have a system instruction and messages doesn't already contain it, add it
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
messages.insert(0, {"role": "system", "content": self._system_instruction})
return messages
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
def buffer_user_text(self, text):
"""Buffer user text for later flushing to context.
Args:
text: User text to buffer.
"""
self._user_text += f" {text}" if self._user_text else text
# logger.debug(f"User text buffered: {self._user_text}")
def flush_aggregated_user_text(self) -> str:
"""Flush buffered user text to context as a complete message.
Returns:
The flushed user text, or empty string if no text was buffered.
"""
if not self._user_text:
return ""
user_text = self._user_text
message = {
"role": "user",
"content": [{"type": "text", "text": user_text}],
}
self._user_text = ""
self.add_message(message)
# logger.debug(f"Context updated (user): {self.get_messages_for_logging()}")
return user_text
def buffer_assistant_text(self, text):
"""Buffer assistant text for later flushing to context.
Args:
text: Assistant text to buffer.
"""
self._assistant_text += text
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
def flush_aggregated_assistant_text(self):
"""Flush buffered assistant text to context as a complete message."""
if not self._assistant_text:
return
message = {
"role": "assistant",
"content": [{"type": "text", "text": self._assistant_text}],
}
self._assistant_text = ""
self.add_message(message)
# logger.debug(f"Context updated (assistant): {self.get_messages_for_logging()}")
@dataclass
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
"""Frame containing updated AWS Nova Sonic context.
Parameters:
context: The updated AWS Nova Sonic LLM context.
"""
context: AWSNovaSonicLLMContext
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
"""Context aggregator for user messages in AWS Nova Sonic conversations.
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
context update frames.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process frames and emit Nova Sonic-specific context updates.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
Provides specialized handling for assistant responses and function calls
in AWS Nova Sonic context, with custom frame processing logic.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Nova Sonic-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
# HACK: For now, disable the context aggregator by making it just pass through all frames
# that the parent handles (except the function call stuff, which we still need).
# For an explanation of this hack, see
# AWSNovaSonicLLMService._report_assistant_response_text_added.
if isinstance(
frame,
(
InterruptionFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TextFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMSetToolChoiceFrame,
UserImageRawFrame,
BotStoppedSpeakingFrame,
),
):
await self.push_frame(frame, direction)
else:
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call results for AWS Nova Sonic.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
# itself, so we didn't have a chance to add the result to the AWS Nova Sonic server-side
# context. Let's push a special frame to do that.
await self.push_frame(
AWSNovaSonicFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
)
@dataclass
class AWSNovaSonicContextAggregatorPair:
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
Parameters:
_user: The user context aggregator.
_assistant: The assistant context aggregator.
"""
_user: AWSNovaSonicUserContextAggregator
_assistant: AWSNovaSonicAssistantContextAggregator
def user(self) -> AWSNovaSonicUserContextAggregator:
"""Get the user context aggregator.
Returns:
The user context aggregator instance.
"""
return self._user
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
"""Get the assistant context aggregator.
Returns:
The assistant context aggregator instance.
"""
return self._assistant

View File

@@ -0,0 +1,25 @@
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Custom frames for AWS Nova Sonic LLM service."""
from dataclasses import dataclass
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
@dataclass
class AWSNovaSonicFunctionCallResultFrame(DataFrame):
"""Frame containing function call result for AWS Nova Sonic processing.
This frame wraps a standard function call result frame to enable
AWS Nova Sonic-specific handling and context updates.
Parameters:
result_frame: The underlying function call result frame.
"""
result_frame: FunctionCallResultFrame

File diff suppressed because it is too large Load Diff

View File

@@ -1 +1,19 @@
from .aws import AWSNovaSonicLLMService, Params
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import warnings
from pipecat.services.aws.nova_sonic.llm import AWSNovaSonicLLMService, Params
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws_nova_sonic are deprecated. "
"Please use the equivalent types from "
"pipecat.services.aws.nova_sonic.llm instead.",
DeprecationWarning,
stacklevel=2,
)

File diff suppressed because it is too large Load Diff

View File

@@ -10,358 +10,16 @@ This module provides specialized context aggregators and message handling for AW
including conversation history management and role-specific message processing.
"""
import copy
from dataclasses import dataclass, field
from enum import Enum
from loguru import logger
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
DataFrame,
Frame,
FunctionCallResultFrame,
InterruptionFrame,
LLMFullResponseEndFrame,
LLMFullResponseStartFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolChoiceFrame,
LLMSetToolsFrame,
TextFrame,
UserImageRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.aws_nova_sonic.frames import AWSNovaSonicFunctionCallResultFrame
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
class Role(Enum):
"""Roles supported in AWS Nova Sonic conversations.
Parameters:
SYSTEM: System-level messages (not used in conversation history).
USER: Messages sent by the user.
ASSISTANT: Messages sent by the assistant.
TOOL: Messages sent by tools (not used in conversation history).
"""
SYSTEM = "SYSTEM"
USER = "USER"
ASSISTANT = "ASSISTANT"
TOOL = "TOOL"
@dataclass
class AWSNovaSonicConversationHistoryMessage:
"""A single message in AWS Nova Sonic conversation history.
Parameters:
role: The role of the message sender (USER or ASSISTANT only).
text: The text content of the message.
"""
role: Role # only USER and ASSISTANT
text: str
@dataclass
class AWSNovaSonicConversationHistory:
"""Complete conversation history for AWS Nova Sonic initialization.
Parameters:
system_instruction: System-level instruction for the conversation.
messages: List of conversation messages between user and assistant.
"""
system_instruction: str = None
messages: list[AWSNovaSonicConversationHistoryMessage] = field(default_factory=list)
class AWSNovaSonicLLMContext(OpenAILLMContext):
"""Specialized LLM context for AWS Nova Sonic service.
Extends OpenAI context with Nova Sonic-specific message handling,
conversation history management, and text buffering capabilities.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize AWS Nova Sonic LLM context.
Args:
messages: Initial messages for the context.
tools: Available tools for the context.
**kwargs: Additional arguments passed to parent class.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self, system_instruction: str = ""):
self._assistant_text = ""
self._user_text = ""
self._system_instruction = system_instruction
@staticmethod
def upgrade_to_nova_sonic(
obj: OpenAILLMContext, system_instruction: str
) -> "AWSNovaSonicLLMContext":
"""Upgrade an OpenAI context to AWS Nova Sonic context.
Args:
obj: The OpenAI context to upgrade.
system_instruction: System instruction for the context.
Returns:
The upgraded AWS Nova Sonic context.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, AWSNovaSonicLLMContext):
obj.__class__ = AWSNovaSonicLLMContext
obj.__setup_local(system_instruction)
return obj
# NOTE: this method has the side-effect of updating _system_instruction from messages
def get_messages_for_initializing_history(self) -> AWSNovaSonicConversationHistory:
"""Get conversation history for initializing AWS Nova Sonic session.
Processes stored messages and extracts system instruction and conversation
history in the format expected by AWS Nova Sonic.
Returns:
Formatted conversation history with system instruction and messages.
"""
history = AWSNovaSonicConversationHistory(system_instruction=self._system_instruction)
# Bail if there are no messages
if not self.messages:
return history
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into "instruction"
if messages[0].get("role") == "system":
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
history.system_instruction = content
elif isinstance(content, list):
history.system_instruction = content[0].get("text")
if history.system_instruction:
self._system_instruction = history.system_instruction
# Process remaining messages to fill out conversation history.
# Nova Sonic supports "user" and "assistant" messages in history.
for message in messages:
history_message = self.from_standard_message(message)
if history_message:
history.messages.append(history_message)
return history
def get_messages_for_persistent_storage(self):
"""Get messages formatted for persistent storage.
Returns:
List of messages including system instruction if present.
"""
messages = super().get_messages_for_persistent_storage()
# If we have a system instruction and messages doesn't already contain it, add it
if self._system_instruction and not (messages and messages[0].get("role") == "system"):
messages.insert(0, {"role": "system", "content": self._system_instruction})
return messages
def from_standard_message(self, message) -> AWSNovaSonicConversationHistoryMessage:
"""Convert standard message format to Nova Sonic format.
Args:
message: Standard message dictionary to convert.
Returns:
Nova Sonic conversation history message, or None if not convertible.
"""
role = message.get("role")
if message.get("role") == "user" or message.get("role") == "assistant":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
# There won't be content if this is an assistant tool call entry.
# We're ignoring those since they can't be loaded into AWS Nova Sonic conversation
# history
if content:
return AWSNovaSonicConversationHistoryMessage(role=Role[role.upper()], text=content)
# NOTE: we're ignoring messages with role "tool" since they can't be loaded into AWS Nova
# Sonic conversation history
def buffer_user_text(self, text):
"""Buffer user text for later flushing to context.
Args:
text: User text to buffer.
"""
self._user_text += f" {text}" if self._user_text else text
# logger.debug(f"User text buffered: {self._user_text}")
def flush_aggregated_user_text(self) -> str:
"""Flush buffered user text to context as a complete message.
Returns:
The flushed user text, or empty string if no text was buffered.
"""
if not self._user_text:
return ""
user_text = self._user_text
message = {
"role": "user",
"content": [{"type": "text", "text": user_text}],
}
self._user_text = ""
self.add_message(message)
# logger.debug(f"Context updated (user): {self.get_messages_for_logging()}")
return user_text
def buffer_assistant_text(self, text):
"""Buffer assistant text for later flushing to context.
Args:
text: Assistant text to buffer.
"""
self._assistant_text += text
# logger.debug(f"Assistant text buffered: {self._assistant_text}")
def flush_aggregated_assistant_text(self):
"""Flush buffered assistant text to context as a complete message."""
if not self._assistant_text:
return
message = {
"role": "assistant",
"content": [{"type": "text", "text": self._assistant_text}],
}
self._assistant_text = ""
self.add_message(message)
# logger.debug(f"Context updated (assistant): {self.get_messages_for_logging()}")
@dataclass
class AWSNovaSonicMessagesUpdateFrame(DataFrame):
"""Frame containing updated AWS Nova Sonic context.
Parameters:
context: The updated AWS Nova Sonic LLM context.
"""
context: AWSNovaSonicLLMContext
class AWSNovaSonicUserContextAggregator(OpenAIUserContextAggregator):
"""Context aggregator for user messages in AWS Nova Sonic conversations.
Extends the OpenAI user context aggregator to emit Nova Sonic-specific
context update frames.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process frames and emit Nova Sonic-specific context updates.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(AWSNovaSonicMessagesUpdateFrame(context=self._context))
class AWSNovaSonicAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Context aggregator for assistant messages in AWS Nova Sonic conversations.
Provides specialized handling for assistant responses and function calls
in AWS Nova Sonic context, with custom frame processing logic.
"""
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process frames with Nova Sonic-specific logic.
Args:
frame: The frame to process.
direction: The direction the frame is traveling.
"""
# HACK: For now, disable the context aggregator by making it just pass through all frames
# that the parent handles (except the function call stuff, which we still need).
# For an explanation of this hack, see
# AWSNovaSonicLLMService._report_assistant_response_text_added.
if isinstance(
frame,
(
InterruptionFrame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TextFrame,
LLMMessagesAppendFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMSetToolChoiceFrame,
UserImageRawFrame,
BotStoppedSpeakingFrame,
),
):
await self.push_frame(frame, direction)
else:
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call results for AWS Nova Sonic.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the LLM
# itself, so we didn't have a chance to add the result to the AWS Nova Sonic server-side
# context. Let's push a special frame to do that.
await self.push_frame(
AWSNovaSonicFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
)
@dataclass
class AWSNovaSonicContextAggregatorPair:
"""Pair of user and assistant context aggregators for AWS Nova Sonic.
Parameters:
_user: The user context aggregator.
_assistant: The assistant context aggregator.
"""
_user: AWSNovaSonicUserContextAggregator
_assistant: AWSNovaSonicAssistantContextAggregator
def user(self) -> AWSNovaSonicUserContextAggregator:
"""Get the user context aggregator.
Returns:
The user context aggregator instance.
"""
return self._user
def assistant(self) -> AWSNovaSonicAssistantContextAggregator:
"""Get the assistant context aggregator.
Returns:
The assistant context aggregator instance.
"""
return self._assistant
import warnings
from pipecat.services.aws.nova_sonic.context import *
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws_nova_sonic.context are deprecated. "
"Please use the equivalent types from "
"pipecat.services.aws.nova_sonic.context instead.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -6,20 +6,16 @@
"""Custom frames for AWS Nova Sonic LLM service."""
from dataclasses import dataclass
import warnings
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
from pipecat.services.aws.nova_sonic.frames import *
@dataclass
class AWSNovaSonicFunctionCallResultFrame(DataFrame):
"""Frame containing function call result for AWS Nova Sonic processing.
This frame wraps a standard function call result frame to enable
AWS Nova Sonic-specific handling and context updates.
Parameters:
result_frame: The underlying function call result frame.
"""
result_frame: FunctionCallResultFrame
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.aws_nova_sonic.frames are deprecated. "
"Please use the equivalent types from "
"pipecat.services.aws.nova_sonic.frames instead.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -0,0 +1,65 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Azure OpenAI Realtime LLM service implementation."""
from loguru import logger
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
try:
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Azure Realtime, you need to `pip install pipecat-ai[openai]`.")
raise Exception(f"Missing module: {e}")
class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
"""Azure OpenAI Realtime LLM service with Azure-specific authentication.
Extends the OpenAI Realtime service to work with Azure OpenAI endpoints,
using Azure's authentication headers and endpoint format. Provides the same
real-time audio and text communication capabilities as the base OpenAI service.
"""
def __init__(
self,
*,
api_key: str,
base_url: str,
**kwargs,
):
"""Initialize Azure Realtime LLM service.
Args:
api_key: The API key for the Azure OpenAI service.
base_url: The full Azure WebSocket endpoint URL including api-version and deployment.
Example: "wss://my-project.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=my-realtime-deployment"
**kwargs: Additional arguments passed to parent OpenAIRealtimeLLMService.
"""
super().__init__(base_url=base_url, api_key=api_key, **kwargs)
self.api_key = api_key
self.base_url = base_url
async def _connect(self):
try:
if self._websocket:
# Here we assume that if we have a websocket, we are connected. We
# handle disconnections in the send/recv code paths.
return
logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}")
self._websocket = await websocket_connect(
uri=self.base_url,
additional_headers={
"api-key": self.api_key,
},
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None

View File

@@ -8,6 +8,7 @@ import sys
from pipecat.services import DeprecatedModuleProxy
from .flux import *
from .stt import *
from .tts import *

View File

@@ -0,0 +1,636 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Deepgram Flux speech-to-text service implementation."""
import json
from enum import Enum
from typing import Any, AsyncGenerator, Dict, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
InterimTranscriptionFrame,
StartFrame,
TranscriptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.stt_service import WebsocketSTTService
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
from pipecat.utils.tracing.service_decorators import traced_stt
try:
from websockets.asyncio.client import connect as websocket_connect
from websockets.protocol import State
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Deepgram Flux, you need to `pip install pipecat-ai[deepgram]`.")
raise Exception(f"Missing module: {e}")
class FluxMessageType(str, Enum):
"""Deepgram Flux WebSocket message types.
These are the top-level message types that can be received from the
Deepgram Flux WebSocket connection.
"""
RECEIVE_CONNECTED = "Connected"
RECEIVE_FATAL_ERROR = "Error"
TURN_INFO = "TurnInfo"
class FluxEventType(str, Enum):
"""Deepgram Flux TurnInfo event types.
These events are contained within TurnInfo messages and indicate
different stages of speech processing and turn detection.
"""
START_OF_TURN = "StartOfTurn"
TURN_RESUMED = "TurnResumed"
END_OF_TURN = "EndOfTurn"
EAGER_END_OF_TURN = "EagerEndOfTurn"
UPDATE = "Update"
class DeepgramFluxSTTService(WebsocketSTTService):
"""Deepgram Flux speech-to-text service.
Provides real-time speech recognition using Deepgram's WebSocket API with Flux capabilities.
Supports configurable models, VAD events, and various audio processing options
including advanced turn detection and EagerEndOfTurn events for improved conversational AI performance.
"""
class InputParams(BaseModel):
"""Configuration parameters for Deepgram Flux API.
This class defines all available connection parameters for the Deepgram Flux API
based on the official documentation.
Parameters:
eager_eot_threshold: Optional. EagerEndOfTurn/TurnResumed are off by default.
You can turn them on by setting eager_eot_threshold to a valid value.
Lower values = more aggressive EagerEndOfTurning (faster response, more LLM calls).
Higher values = more conservative EagerEndOfTurning (slower response, fewer LLM calls).
eot_threshold: Optional. End-of-turn confidence required to finish a turn (default 0.7).
Lower values = turns end sooner (more interruptions, faster responses).
Higher values = turns end later (fewer interruptions, more complete utterances).
eot_timeout_ms: Optional. Time in milliseconds after speech to finish a turn
regardless of EOT confidence (default 5000).
keyterm: List of keyterms to boost recognition accuracy for specialized terminology.
mip_opt_out: Optional. Opts out requests from the Deepgram Model Improvement Program
(default False).
tag: List of tags to label requests for identification during usage reporting.
"""
eager_eot_threshold: Optional[float] = None
eot_threshold: Optional[float] = None
eot_timeout_ms: Optional[int] = None
keyterm: list = []
mip_opt_out: Optional[bool] = None
tag: list = []
def __init__(
self,
*,
api_key: str,
url: str = "wss://api.deepgram.com/v2/listen",
sample_rate: Optional[int] = None,
model: str = "flux-general-en",
flux_encoding: str = "linear16",
params: Optional[InputParams] = None,
**kwargs,
):
"""Initialize the Deepgram Flux STT service.
Args:
api_key: Deepgram API key for authentication. Required for API access.
url: WebSocket URL for the Deepgram Flux API. Defaults to the preview endpoint.
sample_rate: Audio sample rate in Hz. If None, uses the rate from params or 16000.
model: Deepgram Flux model to use for transcription. Currently only supports "flux-general-en".
flux_encoding: Audio encoding format required by Flux API. Must be "linear16".
Raw signed little-endian 16-bit PCM encoding.
params: InputParams instance containing detailed API configuration options.
If None, default parameters will be used.
**kwargs: Additional arguments passed to the parent WebsocketSTTService class.
Examples:
Basic usage with default parameters::
stt = DeepgramFluxSTTService(api_key="your-api-key")
Advanced usage with custom parameters::
params = DeepgramFluxSTTService.InputParams(
eager_eot_threshold=0.5,
eot_threshold=0.8,
keyterm=["AI", "machine learning", "neural network"],
tag=["production", "voice-agent"]
)
stt = DeepgramFluxSTTService(
api_key="your-api-key",
model="flux-general-en",
params=params
)
"""
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._url = url
self._model = model
self._params = params or DeepgramFluxSTTService.InputParams()
self._flux_encoding = flux_encoding
# This is the currently only supported language
self._language = Language.EN
self._websocket_url = None
self._receive_task = None
async def _connect(self):
"""Connect to WebSocket and start background tasks.
Establishes the WebSocket connection to the Deepgram Flux API and starts
the background task for receiving transcription results.
"""
await self._connect_websocket()
if self._websocket and not self._receive_task:
self._receive_task = self.create_task(self._receive_task_handler(self._report_error))
async def _disconnect(self):
"""Disconnect from WebSocket and clean up tasks.
Gracefully disconnects from the Deepgram Flux API, cancels background tasks,
and cleans up resources to prevent memory leaks.
"""
try:
# Cancel background tasks BEFORE closing websocket
if self._receive_task:
await self.cancel_task(self._receive_task, timeout=2.0)
self._receive_task = None
# Now close the websocket
await self._disconnect_websocket()
except Exception as e:
logger.error(f"Error during disconnect: {e}")
finally:
# Reset state only after everything is cleaned up
self._websocket = None
async def _connect_websocket(self):
"""Establish WebSocket connection to API.
Creates a WebSocket connection to the Deepgram Flux API using the configured
URL and authentication headers. Handles connection errors and reports them
through the event handler system.
"""
try:
if self._websocket and self._websocket.state is State.OPEN:
return
self._websocket = await websocket_connect(
self._websocket_url,
additional_headers={"Authorization": f"Token {self._api_key}"},
)
logger.debug("Connected to Deepgram Flux Websocket")
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None
await self._call_event_handler("on_connection_error", f"{e}")
async def _disconnect_websocket(self):
"""Close WebSocket connection and clean up state.
Closes the WebSocket connection to the Deepgram Flux API and stops all
metrics collection. Handles disconnection errors gracefully.
"""
try:
await self.stop_all_metrics()
if self._websocket:
await self._send_close_stream()
logger.debug("Disconnecting from Deepgram Flux Websocket")
await self._websocket.close()
except Exception as e:
logger.error(f"{self} error closing websocket: {e}")
async def _send_close_stream(self) -> None:
"""Sends a CloseStream control message to the Deepgram Flux WebSocket API.
This signals to the server that no more audio data will be sent.
"""
if self._websocket:
logger.debug("Sending CloseStream message to Deepgram Flux")
message = {"type": "CloseStream"}
await self._websocket.send(json.dumps(message))
def can_generate_metrics(self) -> bool:
"""Check if this service can generate processing metrics.
Returns:
True, as Deepgram service supports metrics generation.
"""
return True
async def start(self, frame: StartFrame):
"""Start the Deepgram Flux STT service.
Initializes the service by constructing the WebSocket URL with all configured
parameters and establishing the connection to begin transcription processing.
Args:
frame: The start frame containing initialization parameters and metadata.
"""
await super().start(frame)
url_params = [
f"model={self._model}",
f"sample_rate={self.sample_rate}",
f"encoding={self._flux_encoding}",
]
if self._params.eager_eot_threshold is not None:
url_params.append(f"eager_eot_threshold={self._params.eager_eot_threshold}")
if self._params.eot_threshold is not None:
url_params.append(f"eot_threshold={self._params.eot_threshold}")
if self._params.eot_timeout_ms is not None:
url_params.append(f"eot_timeout_ms={self._params.eot_timeout_ms}")
if self._params.mip_opt_out is not None:
url_params.append(f"mip_opt_out={str(self._params.mip_opt_out).lower()}")
# Add keyterm parameters (can have multiple)
for keyterm in self._params.keyterm:
url_params.append(f"keyterm={keyterm}")
# Add tag parameters (can have multiple)
for tag_value in self._params.tag:
url_params.append(f"tag={tag_value}")
self._websocket_url = f"{self._url}?{'&'.join(url_params)}"
await self._connect()
async def stop(self, frame: EndFrame):
"""Stop the Deepgram Flux STT service.
Args:
frame: The end frame.
"""
await super().stop(frame)
await self._disconnect()
async def cancel(self, frame: CancelFrame):
"""Cancel the Deepgram Flux STT service.
Args:
frame: The cancel frame.
"""
await super().cancel(frame)
await self._disconnect()
async def run_stt(self, audio: bytes) -> AsyncGenerator[Frame, None]:
"""Send audio data to Deepgram Flux for transcription.
Transmits raw audio bytes to the Deepgram Flux API for real-time speech
recognition. Transcription results are received asynchronously through
WebSocket callbacks and processed in the background.
Args:
audio: Raw audio bytes in linear16 format (signed little-endian 16-bit PCM).
Yields:
Frame: None (transcription results are delivered via WebSocket callbacks
rather than as return values from this method).
Raises:
Exception: If the WebSocket connection is not established or if there
are issues sending the audio data.
"""
if not self._websocket:
logger.error("Not connected to Deepgram Flux.")
yield ErrorFrame("Not connected to Deepgram Flux.", fatal=True)
return
try:
await self._websocket.send(audio)
except Exception as e:
logger.error(f"Failed to send audio to Flux: {e}")
yield ErrorFrame(f"Failed to send audio to Flux: {e}")
return
yield None
async def start_metrics(self):
"""Start TTFB and processing metrics collection."""
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
# Ideally, TTFB should measure the time from when a user starts speaking
# until we receive the first transcript. However, Deepgram Flux delivers
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.start_ttfb_metrics()
await self.start_processing_metrics()
@traced_stt
async def _handle_transcription(
self, transcript: str, is_final: bool, language: Optional[Language] = None
):
"""Handle a transcription result with tracing."""
pass
def _get_websocket(self):
"""Get the current WebSocket connection.
Returns the active WebSocket connection instance, raising an exception
if no connection is currently established.
Returns:
The active WebSocket connection instance.
Raises:
Exception: If no WebSocket connection is currently active.
"""
if self._websocket:
return self._websocket
raise Exception("Websocket not connected")
def _validate_message(self, data: Dict[str, Any]) -> bool:
"""Validate basic message structure from Deepgram Flux.
Ensures the received message has the expected structure before processing.
Args:
data: The parsed JSON message data to validate.
Returns:
True if the message structure is valid, False otherwise.
"""
if not isinstance(data, dict):
logger.warning("Message is not a dictionary")
return False
if "type" not in data:
logger.warning("Message missing 'type' field")
return False
return True
async def _receive_messages(self):
"""Receive and process messages from WebSocket.
Continuously receives messages from the Deepgram Flux WebSocket connection
and processes various message types including connection status, transcription
results, turn information, and error conditions. Handles different event types
such as StartOfTurn, EndOfTurn, EagerEndOfTurn, and Update events.
"""
async for message in self._get_websocket():
if isinstance(message, str):
try:
data = json.loads(message)
await self._handle_message(data)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode JSON message: {e}")
# Skip malformed messages
continue
except Exception as e:
logger.error(f"Error processing message: {e}")
# Error will be handled inside WebsocketService->_receive_task_handler
raise
else:
logger.warning(f"Received non-string message: {type(message)}")
async def _handle_message(self, data: Dict[str, Any]):
"""Handle a parsed WebSocket message from Deepgram Flux.
Routes messages to appropriate handlers based on their type. Validates
message structure before processing.
Args:
data: The parsed JSON message data from the WebSocket.
"""
if not self._validate_message(data):
return
message_type = data.get("type")
try:
flux_message_type = FluxMessageType(message_type)
except ValueError:
logger.debug(f"Unhandled message type: {message_type or 'unknown'}")
return
match flux_message_type:
case FluxMessageType.RECEIVE_CONNECTED:
await self._handle_connection_established()
case FluxMessageType.RECEIVE_FATAL_ERROR:
await self._handle_fatal_error(data)
case FluxMessageType.TURN_INFO:
await self._handle_turn_info(data)
async def _handle_connection_established(self):
"""Handle successful connection establishment to Deepgram Flux.
This event is fired when the WebSocket connection to Deepgram Flux
is successfully established and ready to receive audio data for
transcription processing.
"""
logger.info("Connected to Flux - ready to stream audio")
async def _handle_fatal_error(self, data: Dict[str, Any]):
"""Handle fatal error messages from Deepgram Flux.
Fatal errors indicate unrecoverable issues with the connection or
configuration that require intervention. These errors will cause
the connection to be terminated.
Args:
data: The error message data containing error details.
Raises:
Exception: Always raises to trigger error handling in the parent service.
"""
error_msg = data.get("error", "Unknown error")
deepgram_error = f"Fatal error: {error_msg}"
logger.error(deepgram_error)
# Error will be handled inside WebsocketService->_receive_task_handler
raise Exception(deepgram_error)
async def _handle_turn_info(self, data: Dict[str, Any]):
"""Handle TurnInfo events from Deepgram Flux.
TurnInfo messages contain various turn-based events that indicate
the state of speech processing, including turn boundaries, interim
results, and turn finalization events.
Args:
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
event = data.get("event")
transcript = data.get("transcript", "")
try:
flux_event_type = FluxEventType(event)
except ValueError:
logger.debug(f"Unhandled TurnInfo event: {event}")
return
match flux_event_type:
case FluxEventType.START_OF_TURN:
await self._handle_start_of_turn(transcript)
case FluxEventType.TURN_RESUMED:
await self._handle_turn_resumed(event)
case FluxEventType.END_OF_TURN:
await self._handle_end_of_turn(transcript, data)
case FluxEventType.EAGER_END_OF_TURN:
await self._handle_eager_end_of_turn(transcript, data)
case FluxEventType.UPDATE:
await self._handle_update(transcript)
async def _handle_start_of_turn(self, transcript: str):
"""Handle StartOfTurn events from Deepgram Flux.
StartOfTurn events are fired when Deepgram Flux detects the beginning
of a new speaking turn. This triggers bot interruption to stop any
ongoing speech synthesis and signals the start of user speech detection.
The service will:
- Send a BotInterruptionFrame upstream to stop bot speech
- Send a UserStartedSpeakingFrame downstream to notify other components
- Start metrics collection for measuring response times
Args:
transcript: maybe the first few words of the turn.
"""
logger.debug("User started speaking")
await self.push_interruption_task_frame_and_wait()
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStartedSpeakingFrame(), FrameDirection.UPSTREAM)
await self.start_metrics()
if transcript:
logger.trace(f"Start of turn transcript: {transcript}")
async def _handle_turn_resumed(self, event: str):
"""Handle TurnResumed events from Deepgram Flux.
TurnResumed events indicate that speech has resumed after a brief pause
within the same turn. This is primarily used for logging and debugging
purposes and doesn't trigger any significant processing changes.
Args:
event: The event type string for logging purposes.
"""
logger.trace(f"Received event TurnResumed: {event}")
async def _handle_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EndOfTurn events from Deepgram Flux.
EndOfTurn events are fired when Deepgram Flux determines that a speaking
turn has concluded, either due to sufficient silence or end-of-turn
confidence thresholds being met. This provides the final transcript
for the completed turn.
The service will:
- Create and send a final TranscriptionFrame with the complete transcript
- Trigger transcription handling with tracing for metrics
- Stop processing metrics collection
- Send a UserStoppedSpeakingFrame to signal turn completion
Args:
transcript: The final transcript text for the completed turn.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.debug("User stopped speaking")
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._language,
result=data,
)
)
await self._handle_transcription(transcript, True, self._language)
await self.stop_processing_metrics()
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.DOWNSTREAM)
await self.push_frame(UserStoppedSpeakingFrame(), FrameDirection.UPSTREAM)
async def _handle_eager_end_of_turn(self, transcript: str, data: Dict[str, Any]):
"""Handle EagerEndOfTurn events from Deepgram Flux.
EagerEndOfTurn events are fired when the end-of-turn confidence reaches the
EagerEndOfTurn threshold but hasn't yet reached the full end-of-turn threshold.
These provide interim transcripts that can be used for faster response
generation while still allowing the user to continue speaking.
EagerEndOfTurn events enable more responsive conversational AI by allowing
the LLM to start processing likely final transcripts before the turn
is definitively ended.
Args:
transcript: The interim transcript text that triggered the EagerEndOfTurn event.
data: The TurnInfo message data containing event type, transcript and some extra metadata.
"""
logger.trace(f"EagerEndOfTurn - {transcript}")
# Deepgram's EagerEndOfTurn feature enables lower-latency voice agents by sending
# medium-confidence transcripts before EndOfTurn certainty, allowing LLM processing to
# begin early.
#
# However, if speech resumes or the transcripts differ from the final EndOfTurn, the
# EagerEndOfTurn response should be cancelled to avoid incorrect or partial responses.
#
# Pipecat doesn't yet provide built-in Gate/control mechanisms to:
# 1. Start LLM/TTS processing early on EagerEndOfTurn events
# 2. Cancel in-flight processing when TurnResumed occurs
#
# By pushing EagerEndOfTurn transcripts as InterimTranscriptionFrame, we enable
# developers to implement custom EagerEndOfTurn handling in their applications while
# maintaining compatibility with existing interim transcription workflows.
#
# TODO: Implement proper EagerEndOfTurn support with cancellable processing pipeline
# that can start response generation on EagerEndOfTurn and cancel or confirm it.
await self.push_frame(
InterimTranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._language,
result=data,
)
)
async def _handle_update(self, transcript: str):
"""Handle Update events from Deepgram Flux.
Update events provide incremental transcript updates during an ongoing
turn. These events allow for real-time display of transcription progress
and can be used to provide visual feedback to users about what's being
recognized.
The service stops TTFB (Time To First Byte) metrics when the first
substantial update is received, indicating successful processing start.
Args:
transcript: The current partial transcript text for the ongoing turn.
"""
if transcript:
logger.trace(f"Update event: {transcript}")
# TTFB (Time To First Byte) metrics are currently disabled for Deepgram Flux.
# Ideally, TTFB should measure the time from when a user starts speaking
# until we receive the first transcript. However, Deepgram Flux delivers
# both the "user started speaking" event and the first transcript simultaneously,
# making this timing measurement meaningless in this context.
# await self.stop_ttfb_metrics()

View File

@@ -8,6 +8,7 @@ import sys
from pipecat.services import DeprecatedModuleProxy
from .stt import *
from .tts import *
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "elevenlabs", "elevenlabs.tts")
sys.modules[__name__] = DeprecatedModuleProxy(globals(), "elevenlabs", "elevenlabs.[stt,tts]")

View File

@@ -4,527 +4,41 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Event models and utilities for Google Gemini Multimodal Live API."""
import base64
import io
import json
from enum import Enum
from typing import List, Literal, Optional
from PIL import Image
from pydantic import BaseModel, Field
from pipecat.frames.frames import ImageRawFrame
#
# Client events
#
class MediaChunk(BaseModel):
"""Represents a chunk of media data for transmission.
Parameters:
mimeType: MIME type of the media content.
data: Base64-encoded media data.
"""
mimeType: str
data: str
class ContentPart(BaseModel):
"""Represents a part of content that can contain text or media.
Parameters:
text: Text content. Defaults to None.
inlineData: Inline media data. Defaults to None.
"""
text: Optional[str] = Field(default=None, validate_default=False)
inlineData: Optional[MediaChunk] = Field(default=None, validate_default=False)
fileData: Optional["FileData"] = Field(default=None, validate_default=False)
class FileData(BaseModel):
"""Represents a file reference in the Gemini File API."""
mimeType: str
fileUri: str
ContentPart.model_rebuild() # Rebuild model to resolve forward reference
class Turn(BaseModel):
"""Represents a conversational turn in the dialogue.
Parameters:
role: The role of the speaker, either "user" or "model". Defaults to "user".
parts: List of content parts that make up the turn.
"""
role: Literal["user", "model"] = "user"
parts: List[ContentPart]
class StartSensitivity(str, Enum):
"""Determines how start of speech is detected."""
UNSPECIFIED = "START_SENSITIVITY_UNSPECIFIED" # Default is HIGH
HIGH = "START_SENSITIVITY_HIGH" # Detect start of speech more often
LOW = "START_SENSITIVITY_LOW" # Detect start of speech less often
class EndSensitivity(str, Enum):
"""Determines how end of speech is detected."""
UNSPECIFIED = "END_SENSITIVITY_UNSPECIFIED" # Default is HIGH
HIGH = "END_SENSITIVITY_HIGH" # End speech more often
LOW = "END_SENSITIVITY_LOW" # End speech less often
class AutomaticActivityDetection(BaseModel):
"""Configures automatic detection of voice activity.
Parameters:
disabled: Whether automatic activity detection is disabled. Defaults to None.
start_of_speech_sensitivity: Sensitivity for detecting speech start. Defaults to None.
prefix_padding_ms: Padding before speech start in milliseconds. Defaults to None.
end_of_speech_sensitivity: Sensitivity for detecting speech end. Defaults to None.
silence_duration_ms: Duration of silence to detect speech end. Defaults to None.
"""
disabled: Optional[bool] = None
start_of_speech_sensitivity: Optional[StartSensitivity] = None
prefix_padding_ms: Optional[int] = None
end_of_speech_sensitivity: Optional[EndSensitivity] = None
silence_duration_ms: Optional[int] = None
class RealtimeInputConfig(BaseModel):
"""Configures the realtime input behavior.
Parameters:
automatic_activity_detection: Voice activity detection configuration. Defaults to None.
"""
automatic_activity_detection: Optional[AutomaticActivityDetection] = None
class RealtimeInput(BaseModel):
"""Contains realtime input media chunks and text.
Parameters:
mediaChunks: List of media chunks for realtime processing.
text: Text for realtime processing.
"""
mediaChunks: Optional[List[MediaChunk]] = None
text: Optional[str] = None
class ClientContent(BaseModel):
"""Content sent from client to the Gemini Live API.
Parameters:
turns: List of conversation turns. Defaults to None.
turnComplete: Whether the client's turn is complete. Defaults to False.
"""
turns: Optional[List[Turn]] = None
turnComplete: bool = False
class AudioInputMessage(BaseModel):
"""Message containing audio input data.
Parameters:
realtimeInput: Realtime input containing audio chunks.
"""
realtimeInput: RealtimeInput
@classmethod
def from_raw_audio(cls, raw_audio: bytes, sample_rate: int) -> "AudioInputMessage":
"""Create an audio input message from raw audio data.
Args:
raw_audio: Raw audio bytes.
sample_rate: Audio sample rate in Hz.
Returns:
AudioInputMessage instance with encoded audio data.
"""
data = base64.b64encode(raw_audio).decode("utf-8")
return cls(
realtimeInput=RealtimeInput(
mediaChunks=[MediaChunk(mimeType=f"audio/pcm;rate={sample_rate}", data=data)]
)
)
class VideoInputMessage(BaseModel):
"""Message containing video/image input data.
Parameters:
realtimeInput: Realtime input containing video/image chunks.
"""
realtimeInput: RealtimeInput
@classmethod
def from_image_frame(cls, frame: ImageRawFrame) -> "VideoInputMessage":
"""Create a video input message from an image frame.
Args:
frame: Image frame to encode.
Returns:
VideoInputMessage instance with encoded image data.
"""
buffer = io.BytesIO()
Image.frombytes(frame.format, frame.size, frame.image).save(buffer, format="JPEG")
data = base64.b64encode(buffer.getvalue()).decode("utf-8")
return cls(
realtimeInput=RealtimeInput(mediaChunks=[MediaChunk(mimeType=f"image/jpeg", data=data)])
)
class TextInputMessage(BaseModel):
"""Message containing text input data."""
realtimeInput: RealtimeInput
@classmethod
def from_text(cls, text: str) -> "TextInputMessage":
"""Create a text input message from a string.
Args:
text: The text to send.
Returns:
A TextInputMessage instance.
"""
return cls(realtimeInput=RealtimeInput(text=text))
class ClientContentMessage(BaseModel):
"""Message containing client content for the API.
Parameters:
clientContent: The client content to send.
"""
clientContent: ClientContent
class SystemInstruction(BaseModel):
"""System instruction for the model.
Parameters:
parts: List of content parts that make up the system instruction.
"""
parts: List[ContentPart]
class AudioTranscriptionConfig(BaseModel):
"""Configuration for audio transcription."""
pass
class Setup(BaseModel):
"""Setup configuration for the Gemini Live session.
Parameters:
model: Model identifier to use.
system_instruction: System instruction for the model. Defaults to None.
tools: List of available tools/functions. Defaults to None.
generation_config: Generation configuration parameters. Defaults to None.
input_audio_transcription: Input audio transcription config. Defaults to None.
output_audio_transcription: Output audio transcription config. Defaults to None.
realtime_input_config: Realtime input configuration. Defaults to None.
"""
model: str
system_instruction: Optional[SystemInstruction] = None
tools: Optional[List[dict]] = None
generation_config: Optional[dict] = None
input_audio_transcription: Optional[AudioTranscriptionConfig] = None
output_audio_transcription: Optional[AudioTranscriptionConfig] = None
realtime_input_config: Optional[RealtimeInputConfig] = None
class Config(BaseModel):
"""Configuration message for session setup.
Parameters:
setup: Setup configuration for the session.
"""
setup: Setup
#
# Grounding metadata models
#
class SearchEntryPoint(BaseModel):
"""Represents the search entry point with rendered content for search suggestions."""
renderedContent: Optional[str] = None
class WebSource(BaseModel):
"""Represents a web source from grounding chunks."""
uri: Optional[str] = None
title: Optional[str] = None
class GroundingChunk(BaseModel):
"""Represents a grounding chunk containing web source information."""
web: Optional[WebSource] = None
class GroundingSegment(BaseModel):
"""Represents a segment of text that is grounded."""
startIndex: Optional[int] = None
endIndex: Optional[int] = None
text: Optional[str] = None
class GroundingSupport(BaseModel):
"""Represents support information for grounded text segments."""
segment: Optional[GroundingSegment] = None
groundingChunkIndices: Optional[List[int]] = None
confidenceScores: Optional[List[float]] = None
class GroundingMetadata(BaseModel):
"""Represents grounding metadata from Google Search."""
searchEntryPoint: Optional[SearchEntryPoint] = None
groundingChunks: Optional[List[GroundingChunk]] = None
groundingSupports: Optional[List[GroundingSupport]] = None
webSearchQueries: Optional[List[str]] = None
#
# Server events
#
class SetupComplete(BaseModel):
"""Indicates that session setup is complete."""
pass
class InlineData(BaseModel):
"""Inline data embedded in server responses.
Parameters:
mimeType: MIME type of the data.
data: Base64-encoded data content.
"""
mimeType: str
data: str
class Part(BaseModel):
"""Part of a server response containing data or text.
Parameters:
inlineData: Inline binary data. Defaults to None.
text: Text content. Defaults to None.
"""
inlineData: Optional[InlineData] = None
text: Optional[str] = None
class ModelTurn(BaseModel):
"""Represents a turn from the model in the conversation.
Parameters:
parts: List of content parts in the model's response.
"""
parts: List[Part]
class ServerContentInterrupted(BaseModel):
"""Indicates server content was interrupted.
Parameters:
interrupted: Whether the content was interrupted.
"""
interrupted: bool
class ServerContentTurnComplete(BaseModel):
"""Indicates the server's turn is complete.
Parameters:
turnComplete: Whether the turn is complete.
"""
turnComplete: bool
class BidiGenerateContentTranscription(BaseModel):
"""Transcription data from bidirectional content generation.
Parameters:
text: The transcribed text content.
"""
text: str
class ServerContent(BaseModel):
"""Content sent from server to client.
Parameters:
modelTurn: Model's conversational turn. Defaults to None.
interrupted: Whether content was interrupted. Defaults to None.
turnComplete: Whether the turn is complete. Defaults to None.
inputTranscription: Transcription of input audio. Defaults to None.
outputTranscription: Transcription of output audio. Defaults to None.
"""
modelTurn: Optional[ModelTurn] = None
interrupted: Optional[bool] = None
turnComplete: Optional[bool] = None
inputTranscription: Optional[BidiGenerateContentTranscription] = None
outputTranscription: Optional[BidiGenerateContentTranscription] = None
groundingMetadata: Optional[GroundingMetadata] = None
class FunctionCall(BaseModel):
"""Represents a function call from the model.
Parameters:
id: Unique identifier for the function call.
name: Name of the function to call.
args: Arguments to pass to the function.
"""
id: str
name: str
args: dict
class ToolCall(BaseModel):
"""Contains one or more function calls.
Parameters:
functionCalls: List of function calls to execute.
"""
functionCalls: List[FunctionCall]
class Modality(str, Enum):
"""Modality types in token counts."""
UNSPECIFIED = "MODALITY_UNSPECIFIED"
TEXT = "TEXT"
IMAGE = "IMAGE"
AUDIO = "AUDIO"
VIDEO = "VIDEO"
class ModalityTokenCount(BaseModel):
"""Token count for a specific modality.
Parameters:
modality: The modality type.
tokenCount: Number of tokens for this modality.
"""
modality: Modality
tokenCount: int
class UsageMetadata(BaseModel):
"""Usage metadata about the API response.
Parameters:
promptTokenCount: Number of tokens in the prompt. Defaults to None.
cachedContentTokenCount: Number of cached content tokens. Defaults to None.
responseTokenCount: Number of tokens in the response. Defaults to None.
toolUsePromptTokenCount: Number of tokens for tool use prompts. Defaults to None.
thoughtsTokenCount: Number of tokens for model thoughts. Defaults to None.
totalTokenCount: Total number of tokens used. Defaults to None.
promptTokensDetails: Detailed breakdown of prompt tokens by modality. Defaults to None.
cacheTokensDetails: Detailed breakdown of cache tokens by modality. Defaults to None.
responseTokensDetails: Detailed breakdown of response tokens by modality. Defaults to None.
toolUsePromptTokensDetails: Detailed breakdown of tool use tokens by modality. Defaults to None.
"""
promptTokenCount: Optional[int] = None
cachedContentTokenCount: Optional[int] = None
responseTokenCount: Optional[int] = None
toolUsePromptTokenCount: Optional[int] = None
thoughtsTokenCount: Optional[int] = None
totalTokenCount: Optional[int] = None
promptTokensDetails: Optional[List[ModalityTokenCount]] = None
cacheTokensDetails: Optional[List[ModalityTokenCount]] = None
responseTokensDetails: Optional[List[ModalityTokenCount]] = None
toolUsePromptTokensDetails: Optional[List[ModalityTokenCount]] = None
class ServerEvent(BaseModel):
"""Server event received from the Gemini Live API.
Parameters:
setupComplete: Setup completion notification. Defaults to None.
serverContent: Content from the server. Defaults to None.
toolCall: Tool/function call request. Defaults to None.
usageMetadata: Token usage metadata. Defaults to None.
"""
setupComplete: Optional[SetupComplete] = None
serverContent: Optional[ServerContent] = None
toolCall: Optional[ToolCall] = None
usageMetadata: Optional[UsageMetadata] = None
def parse_server_event(str):
"""Parse a server event from JSON string.
Args:
str: JSON string containing the server event.
Returns:
ServerEvent instance if parsing succeeds, None otherwise.
"""
try:
evt = json.loads(str)
return ServerEvent.model_validate(evt)
except Exception as e:
print(f"Error parsing server event: {e}")
return None
class ContextWindowCompressionConfig(BaseModel):
"""Configuration for context window compression.
Parameters:
sliding_window: Whether to use sliding window compression. Defaults to True.
trigger_tokens: Token count threshold to trigger compression. Defaults to None.
"""
sliding_window: Optional[bool] = Field(default=True)
trigger_tokens: Optional[int] = Field(default=None)
"""Event models and utilities for Google Gemini Multimodal Live API.
.. deprecated:: 0.0.90
Importing StartSensitivity and EndSensitivity from this module is deprecated.
Import them directly from google.genai.types instead.
"""
import warnings
from loguru import logger
try:
from google.genai.types import (
EndSensitivity as _EndSensitivity,
)
from google.genai.types import (
StartSensitivity as _StartSensitivity,
)
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
# These aliases are just here for backward compatibility, since we used to
# define public-facing StartSensitivity and EndSensitivity enums in this
# module.
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Importing StartSensitivity and EndSensitivity from "
"pipecat.services.gemini_multimodal_live.events is deprecated. "
"Please import them directly from google.genai.types instead.",
DeprecationWarning,
stacklevel=2,
)
StartSensitivity = _StartSensitivity
EndSensitivity = _EndSensitivity

View File

@@ -9,181 +9,31 @@
This module provides a client for Google's Gemini File API, enabling file
uploads, metadata retrieval, listing, and deletion. Files uploaded through
this API can be referenced in Gemini generative model calls.
.. deprecated:: 0.0.90
Importing GeminiFileAPI from this module is deprecated.
Import it from pipecat.services.google.gemini_live.file_api instead.
"""
import mimetypes
from typing import Any, Dict, Optional
import warnings
import aiohttp
from loguru import logger
try:
from pipecat.services.google.gemini_live.file_api import GeminiFileAPI as _GeminiFileAPI
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
class GeminiFileAPI:
"""Client for the Gemini File API.
This class provides methods for uploading, fetching, listing, and deleting files
through Google's Gemini File API.
Files uploaded through this API remain available for 48 hours and can be referenced
in calls to the Gemini generative models. Maximum file size is 2GB, with total
project storage limited to 20GB.
"""
def __init__(
self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files"
):
"""Initialize the Gemini File API client.
Args:
api_key: Google AI API key
base_url: Base URL for the Gemini File API (default is the v1beta endpoint)
"""
self._api_key = api_key
self._base_url = base_url
# Upload URL uses the /upload/ path
self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files"
async def upload_file(
self, file_path: str, display_name: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file to the Gemini File API using the correct resumable upload protocol.
Args:
file_path: Path to the file to upload
display_name: Optional display name for the file
Returns:
File metadata including uri, name, and display_name
"""
logger.info(f"Uploading file: {file_path}")
async with aiohttp.ClientSession() as session:
# Determine the file's MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Read the file
with open(file_path, "rb") as f:
file_data = f.read()
# Create the metadata payload
metadata = {}
if display_name:
metadata = {"file": {"display_name": display_name}}
# Step 1: Initial resumable request to get upload URL
headers = {
"X-Goog-Upload-Protocol": "resumable",
"X-Goog-Upload-Command": "start",
"X-Goog-Upload-Header-Content-Length": str(len(file_data)),
"X-Goog-Upload-Header-Content-Type": mime_type,
"Content-Type": "application/json",
}
logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}")
async with session.post(
f"{self.upload_base_url}?key={self._api_key}", headers=headers, json=metadata
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error initiating file upload: {error_text}")
raise Exception(f"Failed to initiate upload: {response.status} - {error_text}")
# Get the upload URL from the response header
upload_url = response.headers.get("X-Goog-Upload-URL")
if not upload_url:
logger.error(f"Response headers: {dict(response.headers)}")
raise Exception("No upload URL in response headers")
logger.debug(f"Got upload URL: {upload_url}")
# Step 2: Upload the actual file data
upload_headers = {
"Content-Length": str(len(file_data)),
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
}
logger.debug(f"Step 2: Uploading file data to {upload_url}")
async with session.post(upload_url, headers=upload_headers, data=file_data) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error uploading file data: {error_text}")
raise Exception(f"Failed to upload file: {response.status} - {error_text}")
file_info = await response.json()
logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}")
return file_info
async def get_file(self, name: str) -> Dict[str, Any]:
"""Get metadata for a file.
Args:
name: File name (or full path)
Returns:
File metadata
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.get(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error getting file metadata: {error_text}")
raise Exception(f"Failed to get file metadata: {response.status}")
file_info = await response.json()
return file_info
async def list_files(
self, page_size: int = 10, page_token: Optional[str] = None
) -> Dict[str, Any]:
"""List uploaded files.
Args:
page_size: Number of files to return per page
page_token: Token for pagination
Returns:
List of files and next page token if available
"""
params = {"key": self._api_key, "pageSize": page_size}
if page_token:
params["pageToken"] = page_token
async with aiohttp.ClientSession() as session:
async with session.get(self._base_url, params=params) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error listing files: {error_text}")
raise Exception(f"Failed to list files: {response.status}")
result = await response.json()
return result
async def delete_file(self, name: str) -> bool:
"""Delete a file.
Args:
name: File name (or full path)
Returns:
True if deleted successfully
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.delete(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error deleting file: {error_text}")
raise Exception(f"Failed to delete file: {response.status}")
return True
# These aliases are just here for backward compatibility, since we used to
# define public-facing StartSensitivity and EndSensitivity enums in this
# module.
warnings.warn(
"Importing GeminiFileAPI from "
"pipecat.services.gemini_multimodal_live.file_api is deprecated. "
"Please import it from pipecat.services.google.gemini_live.file_api instead.",
DeprecationWarning,
stacklevel=2,
)
GeminiFileAPI = _GeminiFileAPI

File diff suppressed because it is too large Load Diff

View File

@@ -9,6 +9,7 @@ import sys
from pipecat.services import DeprecatedModuleProxy
from .frames import *
from .gemini_live import *
from .image import *
from .llm import *
from .llm_openai import *

View File

@@ -0,0 +1,3 @@
from .file_api import GeminiFileAPI
from .llm import GeminiLiveLLMService
from .llm_vertex import GeminiLiveVertexLLMService

View File

@@ -0,0 +1,189 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Gemini File API client for uploading and managing files.
This module provides a client for Google's Gemini File API, enabling file
uploads, metadata retrieval, listing, and deletion. Files uploaded through
this API can be referenced in Gemini generative model calls.
"""
import mimetypes
from typing import Any, Dict, Optional
import aiohttp
from loguru import logger
class GeminiFileAPI:
"""Client for the Gemini File API.
This class provides methods for uploading, fetching, listing, and deleting files
through Google's Gemini File API.
Files uploaded through this API remain available for 48 hours and can be referenced
in calls to the Gemini generative models. Maximum file size is 2GB, with total
project storage limited to 20GB.
"""
def __init__(
self, api_key: str, base_url: str = "https://generativelanguage.googleapis.com/v1beta/files"
):
"""Initialize the Gemini File API client.
Args:
api_key: Google AI API key
base_url: Base URL for the Gemini File API (default is the v1beta endpoint)
"""
self._api_key = api_key
self._base_url = base_url
# Upload URL uses the /upload/ path
self.upload_base_url = "https://generativelanguage.googleapis.com/upload/v1beta/files"
async def upload_file(
self, file_path: str, display_name: Optional[str] = None
) -> Dict[str, Any]:
"""Upload a file to the Gemini File API using the correct resumable upload protocol.
Args:
file_path: Path to the file to upload
display_name: Optional display name for the file
Returns:
File metadata including uri, name, and display_name
"""
logger.info(f"Uploading file: {file_path}")
async with aiohttp.ClientSession() as session:
# Determine the file's MIME type
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Read the file
with open(file_path, "rb") as f:
file_data = f.read()
# Create the metadata payload
metadata = {}
if display_name:
metadata = {"file": {"display_name": display_name}}
# Step 1: Initial resumable request to get upload URL
headers = {
"X-Goog-Upload-Protocol": "resumable",
"X-Goog-Upload-Command": "start",
"X-Goog-Upload-Header-Content-Length": str(len(file_data)),
"X-Goog-Upload-Header-Content-Type": mime_type,
"Content-Type": "application/json",
}
logger.debug(f"Step 1: Getting upload URL from {self.upload_base_url}")
async with session.post(
f"{self.upload_base_url}?key={self._api_key}", headers=headers, json=metadata
) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error initiating file upload: {error_text}")
raise Exception(f"Failed to initiate upload: {response.status} - {error_text}")
# Get the upload URL from the response header
upload_url = response.headers.get("X-Goog-Upload-URL")
if not upload_url:
logger.error(f"Response headers: {dict(response.headers)}")
raise Exception("No upload URL in response headers")
logger.debug(f"Got upload URL: {upload_url}")
# Step 2: Upload the actual file data
upload_headers = {
"Content-Length": str(len(file_data)),
"X-Goog-Upload-Offset": "0",
"X-Goog-Upload-Command": "upload, finalize",
}
logger.debug(f"Step 2: Uploading file data to {upload_url}")
async with session.post(upload_url, headers=upload_headers, data=file_data) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error uploading file data: {error_text}")
raise Exception(f"Failed to upload file: {response.status} - {error_text}")
file_info = await response.json()
logger.info(f"File uploaded successfully: {file_info.get('file', {}).get('name')}")
return file_info
async def get_file(self, name: str) -> Dict[str, Any]:
"""Get metadata for a file.
Args:
name: File name (or full path)
Returns:
File metadata
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.get(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error getting file metadata: {error_text}")
raise Exception(f"Failed to get file metadata: {response.status}")
file_info = await response.json()
return file_info
async def list_files(
self, page_size: int = 10, page_token: Optional[str] = None
) -> Dict[str, Any]:
"""List uploaded files.
Args:
page_size: Number of files to return per page
page_token: Token for pagination
Returns:
List of files and next page token if available
"""
params = {"key": self._api_key, "pageSize": page_size}
if page_token:
params["pageToken"] = page_token
async with aiohttp.ClientSession() as session:
async with session.get(self._base_url, params=params) as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error listing files: {error_text}")
raise Exception(f"Failed to list files: {response.status}")
result = await response.json()
return result
async def delete_file(self, name: str) -> bool:
"""Delete a file.
Args:
name: File name (or full path)
Returns:
True if deleted successfully
"""
# Extract just the name part if a full path is provided
if "/" in name:
name = name.split("/")[-1]
async with aiohttp.ClientSession() as session:
async with session.delete(f"{self._base_url}/{name}?key={self._api_key}") as response:
if response.status != 200:
error_text = await response.text()
logger.error(f"Error deleting file: {error_text}")
raise Exception(f"Failed to delete file: {response.status}")
return True

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,184 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Service for accessing Gemini Live via Google Vertex AI.
This module provides integration with Google's Gemini Live model via
Vertex AI, supporting both text and audio modalities with voice transcription,
streaming responses, and tool usage.
"""
import json
from typing import List, Optional, Union
from loguru import logger
from pipecat.adapters.schemas.tools_schema import ToolsSchema
from pipecat.services.google.gemini_live.llm import (
GeminiLiveLLMService,
HttpOptions,
InputParams,
)
try:
from google.auth import default
from google.auth.exceptions import GoogleAuthError
from google.auth.transport.requests import Request
from google.genai import Client
from google.oauth2 import service_account
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google Vertex AI, you need to `pip install pipecat-ai[google]`.")
raise Exception(f"Missing module: {e}")
class GeminiLiveVertexLLMService(GeminiLiveLLMService):
"""Provides access to Google's Gemini Live model via Vertex AI.
This service enables real-time conversations with Gemini, supporting both
text and audio modalities. It handles voice transcription, streaming audio
responses, and tool usage.
"""
def __init__(
self,
*,
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
location: str,
project_id: str,
model="google/gemini-2.0-flash-live-preview-04-09",
voice_id: str = "Charon",
start_audio_paused: bool = False,
start_video_paused: bool = False,
system_instruction: Optional[str] = None,
tools: Optional[Union[List[dict], ToolsSchema]] = None,
params: Optional[InputParams] = None,
inference_on_context_initialization: bool = True,
file_api_base_url: str = "https://generativelanguage.googleapis.com/v1beta/files",
http_options: Optional[HttpOptions] = None,
**kwargs,
):
"""Initialize the service for accessing Gemini Live via Google Vertex AI.
Args:
credentials: JSON string of service account credentials.
credentials_path: Path to the service account JSON file.
location: GCP region for Vertex AI endpoint (e.g., "us-east4").
project_id: Google Cloud project ID.
model: Model identifier to use. Defaults to "models/gemini-2.0-flash-live-preview-04-09".
voice_id: TTS voice identifier. Defaults to "Charon".
start_audio_paused: Whether to start with audio input paused. Defaults to False.
start_video_paused: Whether to start with video input paused. Defaults to False.
system_instruction: System prompt for the model. Defaults to None.
tools: Tools/functions available to the model. Defaults to None.
params: Configuration parameters for the model along with Vertex AI
location and project ID.
inference_on_context_initialization: Whether to generate a response when context
is first set. Defaults to True.
file_api_base_url: Base URL for the Gemini File API. Defaults to the official endpoint.
http_options: HTTP options for the client.
**kwargs: Additional arguments passed to parent GeminiLiveLLMService.
"""
# Check if user incorrectly passed api_key, which is used by parent
# class but not here.
if "api_key" in kwargs:
logger.error(
"GeminiLiveVertexLLMService does not accept 'api_key' parameter. "
"Use 'credentials' or 'credentials_path' instead for Vertex AI authentication."
)
raise ValueError(
"Invalid parameter 'api_key'. Use 'credentials' or 'credentials_path' for Vertex AI authentication."
)
# These need to be set before calling super().__init__() because
# super().__init__() invokes create_client(), which needs these.
self._credentials = self._get_credentials(credentials, credentials_path)
self._project_id = project_id
self._location = location
# Call parent constructor with the obtained API key
super().__init__(
# api_key is required by parent class, but actually not used with
# Vertex
api_key="dummy",
model=model,
voice_id=voice_id,
start_audio_paused=start_audio_paused,
start_video_paused=start_video_paused,
system_instruction=system_instruction,
tools=tools,
params=params,
inference_on_context_initialization=inference_on_context_initialization,
file_api_base_url=file_api_base_url,
http_options=http_options,
**kwargs,
)
def create_client(self):
"""Create the Gemini client instance."""
self._client = Client(
vertexai=True,
credentials=self._credentials,
project=self._project_id,
location=self._location,
)
@property
def file_api(self):
"""Gemini File API is not supported with Vertex AI."""
raise NotImplementedError(
"When using Vertex AI, the recommended approach is to use Google Cloud Storage for file handling. The Gemini File API is not directly supported in this context."
)
@staticmethod
def _get_credentials(credentials: Optional[str], credentials_path: Optional[str]) -> str:
"""Retrieve Credentials using Google service account credentials JSON.
Supports multiple authentication methods:
1. Direct JSON credentials string
2. Path to service account JSON file
3. Default application credentials (ADC)
Args:
credentials: JSON string of service account credentials.
credentials_path: Path to the service account JSON file.
Returns:
OAuth token for API authentication.
Raises:
ValueError: If no valid credentials are provided or found.
"""
creds: Optional[service_account.Credentials] = None
if credentials:
# Parse and load credentials from JSON string
creds = service_account.Credentials.from_service_account_info(
json.loads(credentials),
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
elif credentials_path:
# Load credentials from JSON file
creds = service_account.Credentials.from_service_account_file(
credentials_path,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
else:
try:
creds, project_id = default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
except GoogleAuthError:
pass
if not creds:
raise ValueError("No valid credentials provided.")
creds.refresh(Request()) # Ensure token is up-to-date, lifetime is 1 hour.
return creds

View File

@@ -35,6 +35,7 @@ from pipecat.frames.frames import (
LLMMessagesFrame,
LLMTextFrame,
LLMUpdateSettingsFrame,
OutputImageRawFrame,
UserImageRawFrame,
)
from pipecat.metrics.metrics import LLMTokenUsage
@@ -72,6 +73,9 @@ try:
HttpOptions,
Part,
)
# Temporary hack to be able to process Nano Banana returned images.
genai._api_client.READ_BUFFER_SIZE = 5 * 1024 * 1024
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use Google AI, you need to `pip install pipecat-ai[google]`.")
@@ -682,7 +686,7 @@ class GoogleLLMService(LLMService):
self,
*,
api_key: str,
model: str = "gemini-2.0-flash",
model: str = "gemini-2.5-flash",
params: Optional[InputParams] = None,
system_instruction: Optional[str] = None,
tools: Optional[List[Dict[str, Any]]] = None,
@@ -710,6 +714,7 @@ class GoogleLLMService(LLMService):
self._api_key = api_key
self._system_instruction = system_instruction
self._http_options = http_options
self._create_client(api_key, http_options)
self._settings = {
"max_tokens": params.max_tokens,
@@ -788,6 +793,9 @@ class GoogleLLMService(LLMService):
# and can be configured to turn it off.
if not self._model_name.startswith("gemini-2.5-flash"):
return
# If we have an image model, we don't use a budget either.
if "image" in self._model_name:
return
# If thinking_config is already set, don't override it.
if "thinking_config" in generation_params:
return
@@ -927,6 +935,12 @@ class GoogleLLMService(LLMService):
arguments=function_call.args or {},
)
)
elif part.inline_data and part.inline_data.data:
image = Image.open(io.BytesIO(part.inline_data.data))
frame = OutputImageRawFrame(
image=image.tobytes(), size=image.size, format="RGB"
)
await self.push_frame(frame)
if (
candidate.grounding_metadata

View File

@@ -94,9 +94,9 @@ class GoogleLLMOpenAIBetaService(OpenAILLMService):
async for chunk in chunk_stream:
if chunk.usage:
tokens = LLMTokenUsage(
prompt_tokens=chunk.usage.prompt_tokens,
completion_tokens=chunk.usage.completion_tokens,
total_tokens=chunk.usage.total_tokens,
prompt_tokens=chunk.usage.prompt_tokens or 0,
completion_tokens=chunk.usage.completion_tokens or 0,
total_tokens=chunk.usage.total_tokens or 0,
)
await self.start_llm_usage_metrics(tokens)

View File

@@ -53,12 +53,44 @@ class GoogleVertexLLMService(OpenAILLMService):
Parameters:
location: GCP region for Vertex AI endpoint (e.g., "us-east4").
.. deprecated:: 0.0.90
Use `location` as a direct argument to
`GoogleVertexLLMService.__init__()` instead.
project_id: Google Cloud project ID.
.. deprecated:: 0.0.90
Use `project_id` as a direct argument to
`GoogleVertexLLMService.__init__()` instead.
"""
# https://cloud.google.com/vertex-ai/generative-ai/docs/learn/locations
location: str = "us-east4"
project_id: str
location: Optional[str] = None
project_id: Optional[str] = None
def __init__(self, **kwargs):
"""Initializes the InputParams."""
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
if "location" in kwargs and kwargs["location"] is not None:
warnings.warn(
"GoogleVertexLLMService.InputParams.location is deprecated. "
"Please provide 'location' as a direct argument to GoogleVertexLLMService.__init__() instead.",
DeprecationWarning,
stacklevel=2,
)
if "project_id" in kwargs and kwargs["project_id"] is not None:
warnings.warn(
"GoogleVertexLLMService.InputParams.project_id is deprecated. "
"Please provide 'project_id' as a direct argument to GoogleVertexLLMService.__init__() instead.",
DeprecationWarning,
stacklevel=2,
)
super().__init__(**kwargs)
def __init__(
self,
@@ -66,7 +98,8 @@ class GoogleVertexLLMService(OpenAILLMService):
credentials: Optional[str] = None,
credentials_path: Optional[str] = None,
model: str = "google/gemini-2.0-flash-001",
params: Optional[InputParams] = None,
location: Optional[str] = None,
project_id: Optional[str] = None,
**kwargs,
):
"""Initializes the VertexLLMService.
@@ -75,33 +108,60 @@ class GoogleVertexLLMService(OpenAILLMService):
credentials: JSON string of service account credentials.
credentials_path: Path to the service account JSON file.
model: Model identifier (e.g., "google/gemini-2.0-flash-001").
params: Vertex AI input parameters including location and project.
location: GCP region for Vertex AI endpoint (e.g., "us-east4").
project_id: Google Cloud project ID.
**kwargs: Additional arguments passed to OpenAILLMService.
"""
params = params or OpenAILLMService.InputParams()
base_url = self._get_base_url(params)
# Handle deprecated InputParams fields
if "params" in kwargs and isinstance(kwargs["params"], GoogleVertexLLMService.InputParams):
params = kwargs["params"]
# Extract location and project_id from params if not provided
# directly, for backward compatibility
if project_id is None:
project_id = params.project_id
if location is None:
location = params.location
# Convert to base InputParams
params = OpenAILLMService.InputParams(
**params.model_dump(exclude={"location", "project_id"}, exclude_unset=True)
)
kwargs["params"] = params
# Validate project_id and location parameters
# NOTE: once we remove Vertex-spcific InputParams class, we can update
# __init__() signature as follows:
# - location: str = "us-east4",
# - project_id: str,
# But for now, we need them as-is to maintain proper backward
# compatibility.
if project_id is None:
raise ValueError("project_id is required")
if location is None:
# If location is not provided, default to "us-east4".
# Note: this is legacy behavior; ideally location would be
# required.
logger.warning("location is not provided. Defaulting to 'us-east4'.")
location = "us-east4" # Default location if not provided
base_url = self._get_base_url(location, project_id)
self._api_key = self._get_api_token(credentials, credentials_path)
super().__init__(
api_key=self._api_key,
base_url=base_url,
model=model,
params=params,
**kwargs,
)
@staticmethod
def _get_base_url(params: InputParams) -> str:
def _get_base_url(location: str, project_id: str) -> str:
"""Construct the base URL for Vertex AI API."""
# Determine the correct API host based on location
if params.location == "global":
if location == "global":
api_host = "aiplatform.googleapis.com"
else:
api_host = f"{params.location}-aiplatform.googleapis.com"
return (
f"https://{api_host}/v1/"
f"projects/{params.project_id}/locations/{params.location}/endpoints/openapi"
)
api_host = f"{location}-aiplatform.googleapis.com"
return f"https://{api_host}/v1/projects/{project_id}/locations/{location}/endpoints/openapi"
@staticmethod
def _get_api_token(credentials: Optional[str], credentials_path: Optional[str]) -> str:

View File

@@ -0,0 +1,5 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#

View File

@@ -0,0 +1,220 @@
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
"""Hume Text-to-Speech service implementation."""
import base64
import os
from typing import Any, AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.frames.frames import (
ErrorFrame,
Frame,
StartFrame,
TTSAudioRawFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
from pipecat.services.tts_service import TTSService
from pipecat.utils.tracing.service_decorators import traced_tts
try:
from hume import AsyncHumeClient
from hume.tts import (
FormatPcm,
PostedUtterance,
PostedUtteranceVoiceWithId,
)
except ModuleNotFoundError as e: # pragma: no cover - import-time guidance
logger.error(f"Exception: {e}")
logger.error("In order to use Hume, you need to `pip install pipecat-ai[hume]`.")
raise Exception(f"Missing module: {e}")
HUME_SAMPLE_RATE = 48_000 # Hume TTS streams at 48 kHz
class HumeTTSService(TTSService):
"""Hume Octave Text-to-Speech service.
Streams PCM audio via Hume's HTTP output streaming (JSON chunks) endpoint
using the Python SDK and emits ``TTSAudioRawFrame`` frames suitable for Pipecat transports.
Supported features:
- Generates speech from text using Hume TTS.
- Streams PCM audio.
- Supports dynamic updates of voice and synthesis parameters at runtime.
- Provides metrics for Time To First Byte (TTFB) and TTS usage.
"""
class InputParams(BaseModel):
"""Optional synthesis parameters for Hume TTS.
Parameters:
description: Natural-language acting directions (up to 100 characters).
speed: Speaking-rate multiplier (0.5-2.0).
trailing_silence: Seconds of silence to append at the end (0-5).
"""
description: Optional[str] = None
speed: Optional[float] = None
trailing_silence: Optional[float] = None
def __init__(
self,
*,
api_key: Optional[str] = None,
voice_id: str,
params: Optional[InputParams] = None,
sample_rate: Optional[int] = HUME_SAMPLE_RATE,
**kwargs,
) -> None:
"""Initialize the HumeTTSService.
Args:
api_key: Hume API key. If omitted, reads the ``HUME_API_KEY`` environment variable.
voice_id: ID of the voice to use. Only voice IDs are supported; voice names are not.
params: Optional synthesis controls (acting instructions, speed, trailing silence).
sample_rate: Output sample rate for emitted PCM frames. Defaults to 48_000 (Hume).
**kwargs: Additional arguments passed to the parent class.
"""
api_key = api_key or os.getenv("HUME_API_KEY")
if not api_key:
raise ValueError("HumeTTSService requires an API key (env HUME_API_KEY or api_key=)")
if sample_rate != HUME_SAMPLE_RATE:
logger.warning(
f"Hume TTS streams at {HUME_SAMPLE_RATE} Hz; configured sample_rate={sample_rate}"
)
super().__init__(sample_rate=sample_rate, **kwargs)
self._client = AsyncHumeClient(api_key=api_key)
self._params = params or HumeTTSService.InputParams()
# Store voice in the base class (mirrors other services)
self.set_voice(voice_id)
self._audio_bytes = b""
def can_generate_metrics(self) -> bool:
"""Can generate metrics.
Returns:
True if metrics can be generated, False otherwise.
"""
return True
async def start(self, frame: StartFrame) -> None:
"""Start the service.
Args:
frame: The start frame.
"""
await super().start(frame)
async def update_setting(self, key: str, value: Any) -> None:
"""Runtime updates via `TTSUpdateSettingsFrame`.
Args:
key: The name of the setting to update. Recognized keys are:
- "voice_id"
- "description"
- "speed"
- "trailing_silence"
value: The new value for the setting.
"""
key_l = (key or "").lower()
if key_l == "voice_id":
self.set_voice(str(value))
logger.info(f"HumeTTSService voice_id set to: {self.voice}")
elif key_l == "description":
self._params.description = None if value is None else str(value)
elif key_l == "speed":
self._params.speed = None if value is None else float(value)
elif key_l == "trailing_silence":
self._params.trailing_silence = None if value is None else float(value)
else:
# Defer unknown keys to the base class
await super().update_setting(key, value)
@traced_tts
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
"""Generate speech from text using Hume TTS.
Args:
text: The text to be synthesized.
Returns:
An async generator that yields `Frame` objects, including
`TTSStartedFrame`, `TTSAudioRawFrame`, `ErrorFrame`, and
`TTSStoppedFrame`.
"""
logger.debug(f"{self}: Generating Hume TTS: [{text}]")
# Build the request payload
utterance_kwargs: dict[str, Any] = {
"text": text,
"voice": PostedUtteranceVoiceWithId(id=self._voice_id),
}
if self._params.description is not None:
utterance_kwargs["description"] = self._params.description
if self._params.speed is not None:
utterance_kwargs["speed"] = self._params.speed
if self._params.trailing_silence is not None:
utterance_kwargs["trailing_silence"] = self._params.trailing_silence
utterance = PostedUtterance(**utterance_kwargs)
# Request raw PCM chunks in the streaming JSON
pcm_fmt = FormatPcm(type="pcm")
await self.start_ttfb_metrics()
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
try:
# Instant mode is always enabled here (not user-configurable)
# Hume emits mono PCM at 48 kHz; downstream can resample if needed.
# We buffer audio bytes before sending to prevent glitches.
self._audio_bytes = b""
async for chunk in self._client.tts.synthesize_json_streaming(
utterances=[utterance],
format=pcm_fmt,
instant_mode=True,
version="2",
):
audio_b64 = getattr(chunk, "audio", None)
if not audio_b64:
continue
pcm_bytes = base64.b64decode(audio_b64)
self._audio_bytes += pcm_bytes
# Buffer audio until we have enough to avoid glitches
if len(self._audio_bytes) < self.chunk_size:
continue
frame = TTSAudioRawFrame(
audio=self._audio_bytes,
sample_rate=self.sample_rate,
num_channels=1,
)
yield frame
self._audio_bytes = b""
except Exception as e:
logger.exception(f"{self} error generating TTS: {e}")
await self.push_error(ErrorFrame(f"Error generating TTS: {e}"))
finally:
# Ensure TTFB timer is stopped even on early failures
await self.stop_ttfb_metrics()
yield TTSStoppedFrame()

View File

@@ -10,6 +10,7 @@ from pipecat.services import DeprecatedModuleProxy
from .image import *
from .llm import *
from .realtime import *
from .stt import *
from .tts import *

View File

@@ -66,6 +66,7 @@ class BaseOpenAILLMService(LLMService):
top_p: Top-p (nucleus) sampling parameter (0.0 to 1.0).
max_tokens: Maximum tokens in response (deprecated, use max_completion_tokens).
max_completion_tokens: Maximum completion tokens to generate.
service_tier: Service tier to use (e.g., "auto", "flex", "priority").
extra: Additional model-specific parameters.
"""
@@ -83,6 +84,7 @@ class BaseOpenAILLMService(LLMService):
top_p: Optional[float] = Field(default_factory=lambda: NOT_GIVEN, ge=0.0, le=1.0)
max_tokens: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=1)
max_completion_tokens: Optional[int] = Field(default_factory=lambda: NOT_GIVEN, ge=1)
service_tier: Optional[str] = Field(default_factory=lambda: NOT_GIVEN)
extra: Optional[Dict[str, Any]] = Field(default_factory=dict)
def __init__(
@@ -125,6 +127,7 @@ class BaseOpenAILLMService(LLMService):
"top_p": params.top_p,
"max_tokens": params.max_tokens,
"max_completion_tokens": params.max_completion_tokens,
"service_tier": params.service_tier,
"extra": params.extra if isinstance(params.extra, dict) else {},
}
self._retry_timeout_secs = retry_timeout_secs
@@ -236,6 +239,7 @@ class BaseOpenAILLMService(LLMService):
"top_p": self._settings["top_p"],
"max_tokens": self._settings["max_tokens"],
"max_completion_tokens": self._settings["max_completion_tokens"],
"service_tier": self._settings["service_tier"],
}
# Messages, tools, tool_choice

View File

@@ -0,0 +1,272 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Realtime LLM context and aggregator implementations."""
import copy
import json
from loguru import logger
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
InterimTranscriptionFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMTextFrame,
TranscriptionFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
from . import events
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
class OpenAIRealtimeLLMContext(OpenAILLMContext):
"""OpenAI Realtime LLM context with session management and message conversion.
Extends the standard OpenAI LLM context to support real-time session properties,
instruction management, and conversion between standard message formats and
realtime conversation items.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize the OpenAIRealtimeLLMContext.
Args:
messages: Initial conversation messages. Defaults to None.
tools: Available function tools. Defaults to None.
**kwargs: Additional arguments passed to parent OpenAILLMContext.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self):
self.llm_needs_settings_update = True
self.llm_needs_initial_messages = True
self._session_instructions = ""
return
@staticmethod
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
"""Upgrade a standard OpenAI LLM context to a realtime context.
Args:
obj: The OpenAILLMContext instance to upgrade.
Returns:
The upgraded OpenAIRealtimeLLMContext instance.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext):
obj.__class__ = OpenAIRealtimeLLMContext
obj.__setup_local()
return obj
# todo
# - finish implementing all frames
def from_standard_message(self, message):
"""Convert a standard message format to a realtime conversation item.
Args:
message: The standard message dictionary to convert.
Returns:
A ConversationItem instance for the realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in from_standard_message: {message}")
def get_messages_for_initializing_history(self):
"""Get conversation items for initializing the realtime session history.
Converts the context's messages to a format suitable for the realtime API,
handling system instructions and conversation history packaging.
Returns:
List of conversation items for session initialization.
"""
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not self.messages:
return []
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
self.llm_needs_settings_update = True
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
self._session_instructions = content
elif isinstance(content, list):
self._session_instructions = content[0].get("text")
if not messages:
return []
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return [self.from_standard_message(messages[0])]
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
return [
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
]
def add_user_content_item_as_message(self, item):
"""Add a user content item as a standard message to the context.
Args:
item: The conversation item to add as a user message.
"""
message = {
"role": "user",
"content": [{"type": "text", "text": item.content[0].transcript}],
}
self.add_message(message)
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
"""User context aggregator for OpenAI Realtime API.
Handles user input frames and generates appropriate context updates
for the realtime conversation, including message updates and tool settings.
Args:
context: The OpenAI realtime LLM context.
**kwargs: Additional arguments passed to parent aggregator.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process incoming frames and handle realtime-specific frame types.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline,
# messages are only processed by the user context aggregator, which is generally what we want. But
# we also need to send new messages over the websocket, so the openai realtime API has them
# in its context.
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(RealtimeMessagesUpdateFrame(context=self._context))
# Parent also doesn't push the LLMSetToolsFrame.
if isinstance(frame, LLMSetToolsFrame):
await self.push_frame(frame, direction)
async def push_aggregation(self):
"""Push user input aggregation.
Currently ignores all user input coming into the pipeline as realtime
audio input is handled directly by the service.
"""
# for the moment, ignore all user input coming into the pipeline.
# todo: think about whether/how to fix this to allow for text input from
# upstream (transport/transcription, or other sources)
pass
class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Assistant context aggregator for OpenAI Realtime API.
Handles assistant output frames from the realtime service, filtering
out duplicate text frames and managing function call results.
Args:
context: The OpenAI realtime LLM context.
**kwargs: Additional arguments passed to parent aggregator.
"""
# The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output,
# but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
# are process. This ensures that the context gets only one set of messages.
# OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames,
# so we need to ignore pushing those as well, as they're also TextFrames.
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process assistant frames, filtering out duplicate text content.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)):
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call result and notify the realtime service.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the llm itself,
# so we didn't have a chance to add the result to the openai realtime api context. Let's push a
# special frame to do that.
await self.push_frame(
RealtimeFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,37 @@
#
# Copyright (c) 20242025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Custom frame types for OpenAI Realtime API integration."""
from dataclasses import dataclass
from typing import TYPE_CHECKING
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
if TYPE_CHECKING:
from pipecat.services.openai.realtime.context import OpenAIRealtimeLLMContext
@dataclass
class RealtimeMessagesUpdateFrame(DataFrame):
"""Frame indicating that the realtime context messages have been updated.
Parameters:
context: The updated OpenAI realtime LLM context.
"""
context: "OpenAIRealtimeLLMContext"
@dataclass
class RealtimeFunctionCallResultFrame(DataFrame):
"""Frame containing function call results for the realtime service.
Parameters:
result_frame: The function call result frame to send to the realtime API.
"""
result_frame: FunctionCallResultFrame

View File

@@ -1,9 +1,27 @@
from .azure import AzureRealtimeLLMService
from .events import (
#
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import warnings
from pipecat.services.azure.realtime.llm import AzureRealtimeLLMService
from pipecat.services.openai.realtime.events import (
InputAudioNoiseReduction,
InputAudioTranscription,
SemanticTurnDetection,
SessionProperties,
TurnDetection,
)
from .openai import OpenAIRealtimeLLMService
from pipecat.services.openai.realtime.llm import OpenAIRealtimeLLMService
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.openai_realtime are deprecated. "
"Please use the equivalent types from "
"pipecat.services.openai.realtime instead.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -1,67 +1,21 @@
#
# Copyright (c) 20242025, Daily
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Azure OpenAI Realtime LLM service implementation."""
from loguru import logger
import warnings
from .openai import OpenAIRealtimeLLMService
from pipecat.services.azure.realtime.llm import *
try:
from websockets.asyncio.client import connect as websocket_connect
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use OpenAI, you need to `pip install pipecat-ai[openai]`. Also, set `OPENAI_API_KEY` environment variable."
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.openai_realtime.azure are deprecated. "
"Please use the equivalent types from "
"pipecat.services.azure.realtime.llm instead.",
DeprecationWarning,
stacklevel=2,
)
raise Exception(f"Missing module: {e}")
class AzureRealtimeLLMService(OpenAIRealtimeLLMService):
"""Azure OpenAI Realtime LLM service with Azure-specific authentication.
Extends the OpenAI Realtime service to work with Azure OpenAI endpoints,
using Azure's authentication headers and endpoint format. Provides the same
real-time audio and text communication capabilities as the base OpenAI service.
"""
def __init__(
self,
*,
api_key: str,
base_url: str,
**kwargs,
):
"""Initialize Azure Realtime LLM service.
Args:
api_key: The API key for the Azure OpenAI service.
base_url: The full Azure WebSocket endpoint URL including api-version and deployment.
Example: "wss://my-project.openai.azure.com/openai/realtime?api-version=2024-10-01-preview&deployment=my-realtime-deployment"
**kwargs: Additional arguments passed to parent OpenAIRealtimeLLMService.
"""
super().__init__(base_url=base_url, api_key=api_key, **kwargs)
self.api_key = api_key
self.base_url = base_url
async def _connect(self):
try:
if self._websocket:
# Here we assume that if we have a websocket, we are connected. We
# handle disconnections in the send/recv code paths.
return
logger.info(f"Connecting to {self.base_url}, api key: {self.api_key}")
self._websocket = await websocket_connect(
uri=self.base_url,
additional_headers={
"api-key": self.api_key,
},
)
self._receive_task = self.create_task(self._receive_task_handler())
except Exception as e:
logger.error(f"{self} initialization error: {e}")
self._websocket = None

View File

@@ -1,272 +1,21 @@
#
# Copyright (c) 20242025, Daily
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""OpenAI Realtime LLM context and aggregator implementations."""
import copy
import json
import warnings
from loguru import logger
from pipecat.services.openai.realtime.context import *
from pipecat.frames.frames import (
Frame,
FunctionCallResultFrame,
InterimTranscriptionFrame,
LLMMessagesUpdateFrame,
LLMSetToolsFrame,
LLMTextFrame,
TranscriptionFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.openai.llm import (
OpenAIAssistantContextAggregator,
OpenAIUserContextAggregator,
)
from . import events
from .frames import RealtimeFunctionCallResultFrame, RealtimeMessagesUpdateFrame
class OpenAIRealtimeLLMContext(OpenAILLMContext):
"""OpenAI Realtime LLM context with session management and message conversion.
Extends the standard OpenAI LLM context to support real-time session properties,
instruction management, and conversion between standard message formats and
realtime conversation items.
"""
def __init__(self, messages=None, tools=None, **kwargs):
"""Initialize the OpenAIRealtimeLLMContext.
Args:
messages: Initial conversation messages. Defaults to None.
tools: Available function tools. Defaults to None.
**kwargs: Additional arguments passed to parent OpenAILLMContext.
"""
super().__init__(messages=messages, tools=tools, **kwargs)
self.__setup_local()
def __setup_local(self):
self.llm_needs_settings_update = True
self.llm_needs_initial_messages = True
self._session_instructions = ""
return
@staticmethod
def upgrade_to_realtime(obj: OpenAILLMContext) -> "OpenAIRealtimeLLMContext":
"""Upgrade a standard OpenAI LLM context to a realtime context.
Args:
obj: The OpenAILLMContext instance to upgrade.
Returns:
The upgraded OpenAIRealtimeLLMContext instance.
"""
if isinstance(obj, OpenAILLMContext) and not isinstance(obj, OpenAIRealtimeLLMContext):
obj.__class__ = OpenAIRealtimeLLMContext
obj.__setup_local()
return obj
# todo
# - finish implementing all frames
def from_standard_message(self, message):
"""Convert a standard message format to a realtime conversation item.
Args:
message: The standard message dictionary to convert.
Returns:
A ConversationItem instance for the realtime API.
"""
if message.get("role") == "user":
content = message.get("content")
if isinstance(message.get("content"), list):
content = ""
for c in message.get("content"):
if c.get("type") == "text":
content += " " + c.get("text")
else:
logger.error(
f"Unhandled content type in context message: {c.get('type')} - {message}"
)
return events.ConversationItem(
role="user",
type="message",
content=[events.ItemContent(type="input_text", text=content)],
)
if message.get("role") == "assistant" and message.get("tool_calls"):
tc = message.get("tool_calls")[0]
return events.ConversationItem(
type="function_call",
call_id=tc["id"],
name=tc["function"]["name"],
arguments=tc["function"]["arguments"],
)
logger.error(f"Unhandled message type in from_standard_message: {message}")
def get_messages_for_initializing_history(self):
"""Get conversation items for initializing the realtime session history.
Converts the context's messages to a format suitable for the realtime API,
handling system instructions and conversation history packaging.
Returns:
List of conversation items for session initialization.
"""
# We can't load a long conversation history into the openai realtime api yet. (The API/model
# forgets that it can do audio, if you do a series of `conversation.item.create` calls.) So
# our general strategy until this is fixed is just to put everything into a first "user"
# message as a single input.
if not self.messages:
return []
messages = copy.deepcopy(self.messages)
# If we have a "system" message as our first message, let's pull that out into session
# "instructions"
if messages[0].get("role") == "system":
self.llm_needs_settings_update = True
system = messages.pop(0)
content = system.get("content")
if isinstance(content, str):
self._session_instructions = content
elif isinstance(content, list):
self._session_instructions = content[0].get("text")
if not messages:
return []
# If we have just a single "user" item, we can just send it normally
if len(messages) == 1 and messages[0].get("role") == "user":
return [self.from_standard_message(messages[0])]
# Otherwise, let's pack everything into a single "user" message with a bit of
# explanation for the LLM
intro_text = """
This is a previously saved conversation. Please treat this conversation history as a
starting point for the current conversation."""
trailing_text = """
This is the end of the previously saved conversation. Please continue the conversation
from here. If the last message is a user instruction or question, act on that instruction
or answer the question. If the last message is an assistant response, simple say that you
are ready to continue the conversation."""
return [
{
"role": "user",
"type": "message",
"content": [
{
"type": "input_text",
"text": "\n\n".join(
[intro_text, json.dumps(messages, indent=2), trailing_text]
),
}
],
}
]
def add_user_content_item_as_message(self, item):
"""Add a user content item as a standard message to the context.
Args:
item: The conversation item to add as a user message.
"""
message = {
"role": "user",
"content": [{"type": "text", "text": item.content[0].transcript}],
}
self.add_message(message)
class OpenAIRealtimeUserContextAggregator(OpenAIUserContextAggregator):
"""User context aggregator for OpenAI Realtime API.
Handles user input frames and generates appropriate context updates
for the realtime conversation, including message updates and tool settings.
Args:
context: The OpenAI realtime LLM context.
**kwargs: Additional arguments passed to parent aggregator.
"""
async def process_frame(
self, frame: Frame, direction: FrameDirection = FrameDirection.DOWNSTREAM
):
"""Process incoming frames and handle realtime-specific frame types.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
await super().process_frame(frame, direction)
# Parent does not push LLMMessagesUpdateFrame. This ensures that in a typical pipeline,
# messages are only processed by the user context aggregator, which is generally what we want. But
# we also need to send new messages over the websocket, so the openai realtime API has them
# in its context.
if isinstance(frame, LLMMessagesUpdateFrame):
await self.push_frame(RealtimeMessagesUpdateFrame(context=self._context))
# Parent also doesn't push the LLMSetToolsFrame.
if isinstance(frame, LLMSetToolsFrame):
await self.push_frame(frame, direction)
async def push_aggregation(self):
"""Push user input aggregation.
Currently ignores all user input coming into the pipeline as realtime
audio input is handled directly by the service.
"""
# for the moment, ignore all user input coming into the pipeline.
# todo: think about whether/how to fix this to allow for text input from
# upstream (transport/transcription, or other sources)
pass
class OpenAIRealtimeAssistantContextAggregator(OpenAIAssistantContextAggregator):
"""Assistant context aggregator for OpenAI Realtime API.
Handles assistant output frames from the realtime service, filtering
out duplicate text frames and managing function call results.
Args:
context: The OpenAI realtime LLM context.
**kwargs: Additional arguments passed to parent aggregator.
"""
# The LLMAssistantContextAggregator uses TextFrames to aggregate the LLM output,
# but the OpenAIRealtimeLLMService pushes LLMTextFrames and TTSTextFrames. We
# need to override this proces_frame for LLMTextFrame, so that only the TTSTextFrames
# are process. This ensures that the context gets only one set of messages.
# OpenAIRealtimeLLMService also pushes TranscriptionFrames and InterimTranscriptionFrames,
# so we need to ignore pushing those as well, as they're also TextFrames.
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process assistant frames, filtering out duplicate text content.
Args:
frame: The frame to process.
direction: The direction of frame flow in the pipeline.
"""
if not isinstance(frame, (LLMTextFrame, TranscriptionFrame, InterimTranscriptionFrame)):
await super().process_frame(frame, direction)
async def handle_function_call_result(self, frame: FunctionCallResultFrame):
"""Handle function call result and notify the realtime service.
Args:
frame: The function call result frame to handle.
"""
await super().handle_function_call_result(frame)
# The standard function callback code path pushes the FunctionCallResultFrame from the llm itself,
# so we didn't have a chance to add the result to the openai realtime api context. Let's push a
# special frame to do that.
await self.push_frame(
RealtimeFunctionCallResultFrame(result_frame=frame), FrameDirection.UPSTREAM
)
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.openai_realtime.context are deprecated. "
"Please use the equivalent types from "
"pipecat.services.openai.realtime.context instead.",
DeprecationWarning,
stacklevel=2,
)

File diff suppressed because it is too large Load Diff

View File

@@ -1,37 +1,21 @@
#
# Copyright (c) 20242025, Daily
# Copyright (c) 2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Custom frame types for OpenAI Realtime API integration."""
from dataclasses import dataclass
from typing import TYPE_CHECKING
import warnings
from pipecat.frames.frames import DataFrame, FunctionCallResultFrame
from pipecat.services.openai.realtime.frames import *
if TYPE_CHECKING:
from pipecat.services.openai_realtime_beta.context import OpenAIRealtimeLLMContext
@dataclass
class RealtimeMessagesUpdateFrame(DataFrame):
"""Frame indicating that the realtime context messages have been updated.
Parameters:
context: The updated OpenAI realtime LLM context.
"""
context: "OpenAIRealtimeLLMContext"
@dataclass
class RealtimeFunctionCallResultFrame(DataFrame):
"""Frame containing function call results for the realtime service.
Parameters:
result_frame: The function call result frame to send to the realtime API.
"""
result_frame: FunctionCallResultFrame
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"Types in pipecat.services.openai_realtime.frames are deprecated. "
"Please use the equivalent types from "
"pipecat.services.openai.realtime.frames instead.",
DeprecationWarning,
stacklevel=2,
)

View File

@@ -14,6 +14,7 @@ import io
import json
import struct
import uuid
import warnings
from typing import AsyncGenerator, Optional
import aiohttp
@@ -110,6 +111,11 @@ def language_to_playht_language(language: Language) -> Optional[str]:
class PlayHTTTSService(InterruptibleTTSService):
"""PlayHT WebSocket-based text-to-speech service.
.. deprecated:: 0.0.88
This class is deprecated and will be removed in a future version.
PlayHT is shutting down their API on December 31st, 2025.
Provides real-time text-to-speech synthesis using PlayHT's WebSocket API.
Supports streaming audio generation with configurable voice engines and
language settings.
@@ -158,6 +164,15 @@ class PlayHTTTSService(InterruptibleTTSService):
**kwargs,
)
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"PlayHT is shutting down their API on December 31st, 2025. "
"'PlayHTTTSService' is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
params = params or PlayHTTTSService.InputParams()
self._api_key = api_key
@@ -401,6 +416,11 @@ class PlayHTTTSService(InterruptibleTTSService):
class PlayHTHttpTTSService(TTSService):
"""PlayHT HTTP-based text-to-speech service.
.. deprecated:: 0.0.88
This class is deprecated and will be removed in a future version.
PlayHT is shutting down their API on December 31st, 2025.
Provides text-to-speech synthesis using PlayHT's HTTP API for simpler,
non-streaming synthesis. Suitable for use cases where streaming is not
required and simpler integration is preferred.
@@ -454,8 +474,6 @@ class PlayHTHttpTTSService(TTSService):
# Warn about deprecated protocol parameter if explicitly provided
if protocol:
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
@@ -464,6 +482,15 @@ class PlayHTHttpTTSService(TTSService):
stacklevel=2,
)
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"PlayHT is shutting down their API on December 31st, 2025. "
"'PlayHTHttpTTSService' is deprecated and will be removed in a future version.",
DeprecationWarning,
stacklevel=2,
)
params = params or PlayHTHttpTTSService.InputParams()
self._user_id = user_id

View File

@@ -15,6 +15,7 @@ from loguru import logger
from pipecat.frames.frames import (
AudioRawFrame,
ErrorFrame,
Frame,
StartFrame,
STTMuteFrame,
@@ -24,6 +25,7 @@ from pipecat.frames.frames import (
)
from pipecat.processors.frame_processor import FrameDirection
from pipecat.services.ai_service import AIService
from pipecat.services.websocket_service import WebsocketService
from pipecat.transcriptions.language import Language
@@ -283,3 +285,35 @@ class SegmentedSTTService(STTService):
if not self._user_speaking and len(self._audio_buffer) > self._audio_buffer_size_1s:
discarded = len(self._audio_buffer) - self._audio_buffer_size_1s
self._audio_buffer = self._audio_buffer[discarded:]
class WebsocketSTTService(STTService, WebsocketService):
"""Base class for websocket-based STT services.
Combines STT functionality with websocket connectivity, providing automatic
error handling and reconnection capabilities.
Event handlers:
on_connection_error: Called when a websocket connection error occurs.
Example::
@stt.event_handler("on_connection_error")
async def on_connection_error(stt: STTService, error: str):
logger.error(f"STT connection error: {error}")
"""
def __init__(self, *, reconnect_on_error: bool = True, **kwargs):
"""Initialize the Websocket STT service.
Args:
reconnect_on_error: Whether to automatically reconnect on websocket errors.
**kwargs: Additional arguments passed to parent classes.
"""
STTService.__init__(self, **kwargs)
WebsocketService.__init__(self, reconnect_on_error=reconnect_on_error, **kwargs)
self._register_event_handler("on_connection_error")
async def _report_error(self, error: ErrorFrame):
await self._call_event_handler("on_connection_error", error.error)
await self.push_error(error)

View File

@@ -29,20 +29,19 @@ from pipecat.frames.frames import (
CancelFrame,
EndFrame,
Frame,
InputTransportMessageUrgentFrame,
InterruptionFrame,
MixerControlFrame,
OutputAudioRawFrame,
OutputDTMFFrame,
OutputDTMFUrgentFrame,
OutputImageRawFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
OutputTransportReadyFrame,
SpeechOutputAudioRawFrame,
SpriteFrame,
StartFrame,
SystemFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
TTSAudioRawFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
@@ -178,7 +177,9 @@ class BaseOutputTransport(FrameProcessor):
# Sending a frame indicating that the output transport is ready and able to receive frames.
await self.push_frame(OutputTransportReadyFrame(), FrameDirection.UPSTREAM)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
async def send_message(
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
):
"""Send a transport message.
Args:
@@ -307,9 +308,7 @@ class BaseOutputTransport(FrameProcessor):
elif isinstance(frame, InterruptionFrame):
await self.push_frame(frame, direction)
await self._handle_frame(frame)
elif isinstance(frame, TransportMessageUrgentFrame) and not isinstance(
frame, InputTransportMessageUrgentFrame
):
elif isinstance(frame, OutputTransportMessageUrgentFrame):
await self.send_message(frame)
elif isinstance(frame, OutputDTMFUrgentFrame):
await self.write_dtmf(frame)
@@ -646,7 +645,7 @@ class BaseOutputTransport(FrameProcessor):
await self._set_video_image(frame)
elif isinstance(frame, SpriteFrame):
await self._set_video_images(frame.images)
elif isinstance(frame, TransportMessageFrame):
elif isinstance(frame, OutputTransportMessageFrame):
await self._transport.send_message(frame)
elif isinstance(frame, OutputDTMFFrame):
await self._transport.write_dtmf(frame)

View File

@@ -30,15 +30,15 @@ from pipecat.frames.frames import (
ErrorFrame,
Frame,
InputAudioRawFrame,
InputTransportMessageUrgentFrame,
InputTransportMessageFrame,
InterimTranscriptionFrame,
OutputAudioRawFrame,
OutputImageRawFrame,
OutputTransportMessageFrame,
OutputTransportMessageUrgentFrame,
SpriteFrame,
StartFrame,
TranscriptionFrame,
TransportMessageFrame,
TransportMessageUrgentFrame,
UserAudioRawFrame,
UserImageRawFrame,
UserImageRequestFrame,
@@ -74,7 +74,7 @@ VAD_RESET_PERIOD_MS = 2000
@dataclass
class DailyTransportMessageFrame(TransportMessageFrame):
class DailyOutputTransportMessageFrame(OutputTransportMessageFrame):
"""Frame for transport messages in Daily calls.
Parameters:
@@ -85,7 +85,7 @@ class DailyTransportMessageFrame(TransportMessageFrame):
@dataclass
class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
class DailyOutputTransportMessageUrgentFrame(OutputTransportMessageUrgentFrame):
"""Frame for urgent transport messages in Daily calls.
Parameters:
@@ -96,7 +96,59 @@ class DailyTransportMessageUrgentFrame(TransportMessageUrgentFrame):
@dataclass
class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
class DailyTransportMessageFrame(DailyOutputTransportMessageFrame):
"""Frame for transport messages in Daily calls.
.. deprecated:: 0.0.87
This frame is deprecated and will be removed in a future version.
Instead, use `DailyOutputTransportMessageFrame`.
Parameters:
participant_id: Optional ID of the participant this message is for/from.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"DailyTransportMessageFrame is deprecated and will be removed in a future version. "
"Instead, use DailyOutputTransportMessageFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DailyTransportMessageUrgentFrame(DailyOutputTransportMessageUrgentFrame):
"""Frame for urgent transport messages in Daily calls.
.. deprecated:: 0.0.87
This frame is deprecated and will be removed in a future version.
Instead, use `DailyOutputTransportMessageUrgentFrame`.
Parameters:
participant_id: Optional ID of the participant this message is for/from.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"DailyTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
"Instead, use DailyOutputTransportMessageUrgentFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DailyInputTransportMessageFrame(InputTransportMessageFrame):
"""Frame for input urgent transport messages in Daily calls.
Parameters:
@@ -106,6 +158,31 @@ class DailyInputTransportMessageUrgentFrame(InputTransportMessageUrgentFrame):
participant_id: Optional[str] = None
class DailyInputTransportMessageUrgentFrame(DailyInputTransportMessageFrame):
"""Frame for input urgent transport messages in Daily calls.
.. deprecated:: 0.0.87
This frame is deprecated and will be removed in a future version.
Instead, use `DailyInputTransportMessageFrame`.
Parameters:
participant_id: Optional ID of the participant this message is for/from.
"""
def __post_init__(self):
super().__post_init__()
import warnings
with warnings.catch_warnings():
warnings.simplefilter("always")
warnings.warn(
"DailyInputTransportMessageUrgentFrame is deprecated and will be removed in a future version. "
"Instead, use DailyInputTransportMessageFrame.",
DeprecationWarning,
stacklevel=2,
)
@dataclass
class DailyUpdateRemoteParticipantsFrame(ControlFrame):
"""Frame to update remote participants in Daily calls.
@@ -474,7 +551,9 @@ class DailyTransportClient(EventHandler):
"""
return self._out_sample_rate
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
async def send_message(
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
):
"""Send an application message to participants.
Args:
@@ -484,7 +563,9 @@ class DailyTransportClient(EventHandler):
return
participant_id = None
if isinstance(frame, (DailyTransportMessageFrame, DailyTransportMessageUrgentFrame)):
if isinstance(
frame, (DailyOutputTransportMessageFrame, DailyOutputTransportMessageUrgentFrame)
):
participant_id = frame.participant_id
future = self._get_event_loop().create_future()
@@ -1621,7 +1702,7 @@ class DailyInputTransport(BaseInputTransport):
message: The message data to send.
sender: ID of the message sender.
"""
frame = DailyInputTransportMessageUrgentFrame(message=message, participant_id=sender)
frame = DailyInputTransportMessageFrame(message=message, participant_id=sender)
await self.push_frame(frame)
#
@@ -1843,7 +1924,9 @@ class DailyOutputTransport(BaseOutputTransport):
if isinstance(frame, DailyUpdateRemoteParticipantsFrame):
await self._client.update_remote_participants(frame.remote_participants)
async def send_message(self, frame: TransportMessageFrame | TransportMessageUrgentFrame):
async def send_message(
self, frame: OutputTransportMessageFrame | OutputTransportMessageUrgentFrame
):
"""Send a transport message to participants.
Args:

Some files were not shown because too many files have changed in this diff Show More