Compare commits

...

142 Commits

Author SHA1 Message Date
Aleix Conchillo Flaqué
1d4be0139a Merge pull request #705 from pipecat-ai/aleix/prepare-0.0.48
update CHANGELOG for 0.0.48
2024-11-10 14:08:33 -08:00
Aleix Conchillo Flaqué
f58c3ee322 update CHANGELOG for 0.0.48 2024-11-10 23:01:03 +01:00
Aleix Conchillo Flaqué
379750df91 Merge pull request #704 from pipecat-ai/aleix/cartesia-tts-stopped-frame
services(cartesia): generated TTSStoppedFrame after no more audio
2024-11-10 05:17:36 -08:00
Aleix Conchillo Flaqué
d125a38737 services(cartesia): generated TTSStoppedFrame after no more audio
The TTSStoppedFrame should be generated when the TTS services stoped generating
audio not when the bot stops speaking.
2024-11-10 09:55:45 +01:00
Mark Backman
446bb0aeaf Merge pull request #702 from pipecat-ai/mb/azure-websocket
Add an Azure TTS websocket service
2024-11-09 17:41:53 -05:00
Aleix Conchillo Flaqué
d839080834 Merge pull request #642 from pipecat-ai/aleix/input-queues-block-frames
introduce frame processor input queues block frames
2024-11-09 14:30:17 -08:00
Mark Backman
9b85d0642b Add a changelog entry 2024-11-09 12:37:29 -05:00
Mark Backman
230b51a117 Add an Azure TTS websocket service 2024-11-09 12:37:29 -05:00
Mark Backman
3a965ca396 Merge pull request #701 from pipecat-ai/khk/anthropic-function-calling-fix
fixes for anthropic function calling
2024-11-09 06:39:34 -05:00
Kwindla Hultman Kramer
33fc5bf990 improved 20c-persistent-context-anthropic.py 2024-11-08 16:42:30 -08:00
Kwindla Hultman Kramer
a54ca08405 fixes for anthropic function calling 2024-11-08 16:33:02 -08:00
Filipi da Silva Fuchter
4379db43ed Merge pull request #689 from pipecat-ai/filipi/krisp
Making pipecat work with Krisp
2024-11-08 16:22:52 -03:00
Filipi Fuchter
e915c676aa Added support for Krisp audio filter 2024-11-08 16:18:10 -03:00
Mark Backman
e0a003afa1 Merge pull request #695 from pipecat-ai/mb/initialize-azure-lang
Initialize the speech_recognition_language for Azure TTS
2024-11-08 06:40:40 -05:00
James Hush
d5666727ce feat: toggle looping with soundfile mixer (#693)
* feat: toggle looping with soundfile mixer

* Implement PR changes
2024-11-07 21:08:37 -08:00
Mark Backman
f6d7402530 Update changelog 2024-11-07 15:16:03 -05:00
Mark Backman
aefe190c9f Initialize the speech_recognition_language for Azure TTS 2024-11-07 15:14:05 -05:00
Vanessa Pyne
29925a8f21 Merge pull request #551 from Allenmylath/patch-3
Frame types and short descriptionCreate Frames.md
2024-11-07 10:05:32 -06:00
Aleix Conchillo Flaqué
beb3271168 services(tts): make sure word timestamp is reset properly 2024-11-06 18:54:12 -08:00
Aleix Conchillo Flaqué
b959ac6e1e Merge pull request #694 from pipecat-ai/aleix/daily-add-on-transcription-message
transports(daily): call on_transcription_message event handler
2024-11-06 15:21:17 -08:00
Aleix Conchillo Flaqué
17f4286942 transports(daily): call on_transcription_message event handler 2024-11-06 15:10:58 -08:00
Aleix Conchillo Flaqué
ce89bbb16e tts(elevenlabs): support pausing and resuming frames while speaking 2024-11-06 14:38:33 -08:00
Aleix Conchillo Flaqué
865768039b processors: remove block_on_frames and add pause_processing_frames() instead 2024-11-06 14:20:25 -08:00
Aleix Conchillo Flaqué
7071482583 try to use queue_frame() instead of process_frame() 2024-11-06 14:18:21 -08:00
Aleix Conchillo Flaqué
5353d13151 update CHANGELOG 2024-11-06 13:16:58 -08:00
Aleix Conchillo Flaqué
a9e565f355 processors: fix input queue interruptions 2024-11-06 13:12:24 -08:00
Aleix Conchillo Flaqué
b6f0c16591 examples: restore EndFrame() on 01 and 02 foundational 2024-11-06 13:05:03 -08:00
Aleix Conchillo Flaqué
49005d02f5 services(tts): use TTSSpeakFrame in say() method 2024-11-06 13:05:03 -08:00
Aleix Conchillo Flaqué
6d8b885071 transports(base_output): push bot started/stopped frames downstream 2024-11-06 13:04:37 -08:00
Aleix Conchillo Flaqué
2eccb33e73 processors: allow passing a callback when queued frame is processed 2024-11-06 13:04:37 -08:00
Aleix Conchillo Flaqué
22ca4c5a02 processors: cancel input task and empty queue with interruptions 2024-11-06 13:04:37 -08:00
Aleix Conchillo Flaqué
84f26ac1ca processors: introduce input queues
Frame processors can now decide if they should continue processing frames or
not, and if so also decide when to continue processing frames. For example,
asynchronous TTS services will stop processing frames until they have generated
all the audio for an LLM response.
2024-11-06 12:13:49 -08:00
Aleix Conchillo Flaqué
74937411e6 Merge pull request #691 from pipecat-ai/aleix/rtvi-manual-bot-ready
rtvi: bot-ready message needs to be sent manual
2024-11-06 10:53:25 -08:00
Aleix Conchillo Flaqué
8aab068ffd rtvi: bot-ready message needs to be sent manual 2024-11-05 10:52:54 -08:00
Aleix Conchillo Flaqué
bd50201ce4 transports(daily): just make it clear we subscribe to camera 2024-11-04 17:32:46 -08:00
Aleix Conchillo Flaqué
6082da284e Merge pull request #611 from pipecat-ai/aleix/audio-filters
introduce audio filters
2024-11-04 16:34:47 -08:00
Aleix Conchillo Flaqué
358c458265 transports(base_input): handle filter contorl frames 2024-11-04 16:19:52 -08:00
Aleix Conchillo Flaqué
807dbbe326 audio(noisereduce): allow enabling/disabling filter 2024-11-04 16:13:29 -08:00
Aleix Conchillo Flaqué
3c116b291d audio(mixers): some cosmetics 2024-11-04 15:37:08 -08:00
Aleix Conchillo Flaqué
0dd413ee90 audio(filters): add noisereduce filter 2024-11-04 15:37:08 -08:00
Aleix Conchillo Flaqué
abc8ede3d7 introduce audio filters 2024-11-04 15:37:08 -08:00
Aleix Conchillo Flaqué
126324ca1b Merge pull request #687 from pipecat-ai/aleix/transport-audio-mixers
introduce transport audio mixers
2024-11-04 13:14:36 -08:00
Aleix Conchillo Flaqué
602915ae18 examples(websocket-server): allow interruptions 2024-11-04 13:05:02 -08:00
Aleix Conchillo Flaqué
0ac9e2dd3f transports(network): synchronize with time before sending data 2024-11-04 13:04:18 -08:00
Aleix Conchillo Flaqué
a9ef5ca95d examples: add bot background sound example 2024-11-03 11:13:02 -08:00
Aleix Conchillo Flaqué
81c476dd4c introduce output transport audio mixers 2024-11-03 11:13:02 -08:00
Kwindla Hultman Kramer
151242d3a0 Merge pull request #666 from pipecat-ai/khk/realtime-pipecat-vad
Support using Pipecat turn detection instead of OpenAI Realtime API turn detection
2024-11-02 08:36:31 -07:00
Kwindla Hultman Kramer
93c6e5098c added comment explaining config of TurnDetection 2024-11-02 08:24:54 -07:00
Aleix Conchillo Flaqué
4455b2a428 rtvi: create queues before tasks 2024-11-01 23:06:50 -07:00
Aleix Conchillo Flaqué
94062592ef base_output: generate smaller audio frames of the same incoming type 2024-11-01 23:06:50 -07:00
Aleix Conchillo Flaqué
d2401a76c8 base_output: only generate bot speaking with TTS audio frames 2024-11-01 23:06:50 -07:00
Aleix Conchillo Flaqué
e2b1b56e86 examples: don't require room token if using an STT 2024-11-01 23:06:50 -07:00
Mark Backman
84bd767312 Merge pull request #685 from pipecat-ai/mb/add-recording-events
Add recording events and callbacks
2024-11-01 12:02:46 -04:00
Mark Backman
802c29e9e1 Add recording events and callbacks 2024-11-01 10:20:00 -04:00
Aleix Conchillo Flaqué
f83381860c Merge pull request #677 from pipecat-ai/aleix/add-notifier-and-notifier-filters
add notifiers and more frame filters
2024-10-31 15:55:07 -07:00
Aleix Conchillo Flaqué
4dad1bfe49 examples: add foundational/22-natural-conversation.py 2024-10-31 12:10:33 -07:00
marcus-daily
9ee8896b64 Removing unnecessary ruff arguments from README 2024-10-31 18:02:29 +00:00
marcus-daily
5f7a2f66d4 Add .idea to .gitignore 2024-10-31 18:02:29 +00:00
marcus-daily
76e5f1e847 Remove unnecessary ruff params in CI 2024-10-31 15:07:28 +00:00
marcus-daily
6975340d6c Set Ruff config for the project 2024-10-31 15:07:28 +00:00
marcus-daily
0f4cf56418 Load dotenv in simple chatbot server (fixes #415) 2024-10-31 12:08:30 +00:00
Aleix Conchillo Flaqué
018e51e8a3 add notifiers and more frame filters 2024-10-30 16:36:17 -07:00
Vanessa Pyne
b050143952 Merge pull request #676 from RonakAgarwalVani/fix/chunk-choices-delta-none
Fix uncaught exception when accessing 'tool_calls' in NoneType delta in response handling
2024-10-30 14:44:32 -05:00
Mark Backman
98ea1f0791 Merge pull request #675 from pipecat-ai/mb/playht-add-request-id
Add a request_id to each TTS sequence
2024-10-30 13:56:15 -04:00
Mark Backman
8272c35527 Use a request_id in TTS commands for the PlayHT websocket service 2024-10-30 13:54:18 -04:00
Mark Backman
e973e82e05 Merge pull request #672 from pipecat-ai/mb/fix-playht
Fix PlayHT TTFB metrics
2024-10-30 13:53:02 -04:00
RonakAgarwalVani
d1396bf618 Update openai.py 2024-10-30 14:26:49 +05:30
Vanessa Pyne
8186e423de Merge pull request #637 from pipecat-ai/vp-issue-template
docs: add ISSUE_TEMPLATE.md
2024-10-29 15:08:42 -05:00
vipyne
3010addb8b docs: add CONTRIBUTING.md 2024-10-29 15:03:07 -05:00
vipyne
029e0d391e docs: add ISSUE_TEMPLATE.md 2024-10-29 15:03:07 -05:00
Vanessa Pyne
bf31223577 Merge pull request #671 from pipecat-ai/vp-issue-635
docs: small fix
2024-10-29 14:34:13 -05:00
vipyne
42cc79154f docs: small fix 2024-10-29 14:33:57 -05:00
Mark Backman
05b857006a Update changelog 2024-10-28 20:56:29 -04:00
Mark Backman
2e57d21b89 Fix ttfb metrics 2024-10-28 20:27:24 -04:00
Aleix Conchillo Flaqué
fa05ec46be Merge pull request #667 from pipecat-ai/aleix/base-output-bot-speaking-detection
transports(base_output): use audio frames for bot speaking detection
2024-10-28 10:54:54 -07:00
Aleix Conchillo Flaqué
e3ce619284 transports(base_output): use audio frames for bot speaking detection 2024-10-28 10:07:37 -07:00
Vanessa Pyne
fb512dcd74 Merge pull request #630 from MoofSoup/update-readme
docs: simplify readme
2024-10-28 10:26:30 -05:00
Aleix Conchillo Flaqué
ca15d97383 Merge pull request #662 from pipecat-ai/aleix/daily-transport-async-functions
transports(daily): make functions async
2024-10-25 16:14:06 -07:00
Aleix Conchillo Flaqué
b32448e967 transports(daily): make functions async 2024-10-25 15:01:52 -07:00
Aleix Conchillo Flaqué
7e30da6183 Merge pull request #661 from pipecat-ai/aleix/allow-updating-subscritption-before
transports(daily): allow updating subscriptions before join
2024-10-25 15:00:34 -07:00
Aleix Conchillo Flaqué
a6dd2600d2 examples(tavus): await update_subscriptions 2024-10-25 14:56:56 -07:00
Aleix Conchillo Flaqué
b905b57dfc transports(daily): allow updating subscriptions before join 2024-10-25 14:46:17 -07:00
Kwindla Hultman Kramer
e1a7edfb58 make it possible to use Pipecat turn detection instead of OpenAI turn detection 2024-10-25 15:59:48 -05:00
Aleix Conchillo Flaqué
1b30b1fc23 Merge pull request #665 from pipecat-ai/aleix/fix-bot-started-stopped-speaking
transports(base_output): fix constant bot started/stopped speaking fr…
2024-10-25 13:00:38 -07:00
Aleix Conchillo Flaqué
55026898f6 transports(base_output): use vad stop secs for bot stopped speaking 2024-10-25 12:59:15 -07:00
Aleix Conchillo Flaqué
4283557894 audio(vad): expose params property 2024-10-25 12:59:15 -07:00
Aleix Conchillo Flaqué
5ab00e01aa transports(base_output): fix constant bot started/stopped speaking frames 2024-10-25 12:10:24 -07:00
Aleix Conchillo Flaqué
fcfc729e83 Merge pull request #664 from pipecat-ai/aleix/fix-aws-stuttering
services(aws): read stream and resample in a thread
2024-10-25 11:49:28 -07:00
Aleix Conchillo Flaqué
4eacb34fd8 services(aws): read stream and resample in a thread 2024-10-25 11:22:28 -07:00
Aleix Conchillo Flaqué
3a8aacccf7 Merge pull request #663 from pipecat-ai/aleix/audio-resampling-with-resampy
audio: use resamply for audio resampling
2024-10-25 10:16:20 -07:00
roey
54c0bf0c70 Adding TavusVideoService layer (#617)
Co-authored-by: roey <159067767+roey-tavus@users.noreply.github.com>
Co-authored-by: Mert Gerdan <mert@tavus.io>
Co-authored-by: Aleix Conchillo Flaqué <aleix@daily.co>
2024-10-25 09:46:25 -07:00
Aleix Conchillo Flaqué
778b05a252 audio: use resamply for audio resampling 2024-10-25 09:22:22 -07:00
Mark Backman
f16a416c2b Merge pull request #660 from pipecat-ai/mb/add-gemini-inputs
Add input params to Google Gemini
2024-10-24 20:58:19 -04:00
Aleix Conchillo Flaqué
1be63bccb8 Merge pull request #647 from pipecat-ai/aleix/daily-transport-only-transcribe-users
transport(daily): only transcribe users
2024-10-24 17:40:34 -07:00
Mark Backman
37820ac0df Add input params to Google Gemini 2024-10-24 20:12:41 -04:00
Aleix Conchillo Flaqué
8ea80d43f4 transports(daily): only transcribe user audio 2024-10-24 17:06:43 -07:00
Aleix Conchillo Flaqué
e117d70a00 update to daily-python 0.12.0 2024-10-24 16:49:19 -07:00
Aleix Conchillo Flaqué
2ba753272a Merge pull request #658 from pipecat-ai/aleix/default-to-24000-sample-rate
update TTS and transport output sample rate to 24000
2024-10-24 16:48:41 -07:00
Aleix Conchillo Flaqué
60c8c2f6e9 examples(15a): use daily transcription instead of local whisper 2024-10-24 16:47:41 -07:00
Aleix Conchillo Flaqué
cfb48200c2 services(azure): support sample rates 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
6d317c6e8e audio: don't resample if same sample rate 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
158d52856f transports(livekit): fix VADAnalyzer import 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
92a69e404f update TTS and transport output sample rate to 24000 2024-10-24 16:47:35 -07:00
Aleix Conchillo Flaqué
d24c6185d8 Merge pull request #654 from pipecat-ai/aleix/daily-allow-completion-futures
transport(daily): allow completion futures
2024-10-24 14:28:53 -07:00
Mark Backman
1fd21578a6 Merge pull request #657 from pipecat-ai/mb/add-elevenlabs-output-format-type
Add ElevenLabs output format type
2024-10-24 17:07:04 -04:00
Mark Backman
700db87127 Merge pull request #656 from pipecat-ai/mb/add-gemini-metrics
Add Gemini token usage metrics
2024-10-24 17:04:56 -04:00
Mark Backman
6f1310569c Add ElevenLabs output format type 2024-10-24 17:03:45 -04:00
Aleix Conchillo Flaqué
14cedb0be8 Merge pull request #655 from pipecat-ai/aleix/fix-together-params
services(together): fix together AI InputParams
2024-10-24 13:51:38 -07:00
Mark Backman
fae97f9051 Add Gemini token usage metrics 2024-10-24 16:37:21 -04:00
Aleix Conchillo Flaqué
d930a46e64 services(together): fix together AI InputParams 2024-10-24 13:08:35 -07:00
Aleix Conchillo Flaqué
2e6b5d1843 transports(daily): fix aiohttp timeout 2024-10-24 11:44:30 -07:00
Aleix Conchillo Flaqué
88362db034 transports(daily): no more need for an output message queue 2024-10-24 11:44:30 -07:00
Aleix Conchillo Flaqué
f7f0c44c32 transports(daily): don't block event handlers 2024-10-24 11:44:30 -07:00
Mark Backman
33553b71d4 Merge pull request #653 from pipecat-ai/mb/align-tts-constructors
Align TTSService constructors
2024-10-24 13:52:43 -04:00
Mark Backman
be8ca505cd Merge pull request #652 from pipecat-ai/khk/more-gemini
Gemini new context manager and rewrite to use google data structures internally
2024-10-24 13:47:38 -04:00
Mark Backman
e957cce422 Align TTSService constructors 2024-10-24 13:42:06 -04:00
Mark Backman
418a13a4ec Merge pull request #650 from pipecat-ai/mb/assembly-fix
AssemblyAI: don't disconnect on language change
2024-10-24 11:26:56 -04:00
Mark Backman
fc445c0a1f Merge pull request #649 from pipecat-ai/mb/open-ai-max-tokens
Add max_tokens and max_completion_tokens inputs for OpenAI
2024-10-24 11:26:44 -04:00
Mark Backman
f0c65468ed AssemblyAI: don't disconnect on language change 2024-10-24 08:30:48 -04:00
Mark Backman
ce6a2bdcf7 Add max tokens inputs to OpenAI 2024-10-24 07:03:45 -04:00
Mark Backman
673542e235 Merge pull request #646 from pipecat-ai/mb/grok-function-calling
Support function calling for Grok
2024-10-23 21:56:38 -04:00
Kwindla Hultman Kramer
e032b0b70a gemini context aggregators 2024-10-23 18:44:09 -07:00
Mark Backman
e39f7e965b Support function calling for Grok 2024-10-23 17:22:26 -04:00
Mattie Ruth
d26751e968 add missing PipelineParams to enable the metrics (#645) 2024-10-23 16:46:46 -04:00
Aleix Conchillo Flaqué
e0ca4a9c23 Merge pull request #643 from pipecat-ai/aleix/daily-update-subscriptions
transports(daily): add update_subscriptions()
2024-10-22 17:07:07 -07:00
Aleix Conchillo Flaqué
801e52c095 transports(daily): add update_subscriptions() 2024-10-22 15:02:55 -07:00
Aleix Conchillo Flaqué
a46eaa838b Merge pull request #641 from pipecat-ai/aleix/prepare-0.0.47
prepare 0.0.47
2024-10-22 10:30:42 -07:00
Aleix Conchillo Flaqué
7c432499db update CHANGELOG for 0.0.47 2024-10-22 10:02:50 -07:00
Aleix Conchillo Flaqué
8d75fcc9f0 use warnings package to report deprecated code 2024-10-22 10:02:21 -07:00
Aleix Conchillo Flaqué
61d73f81ae Merge pull request #639 from pipecat-ai/aleix/daily-transcription-model
transport(daily): use "nova-2-general" for transcription
2024-10-22 09:40:43 -07:00
Aleix Conchillo Flaqué
951255def9 transport(daily): use "nova-2-general" for transcription 2024-10-22 09:40:03 -07:00
Moof Soup
bf5a7c3562 docs: Clarify README example and token usage
clarified readme example
2024-10-21 19:54:34 -07:00
Mark Backman
e556f34094 Merge pull request #638 from pipecat-ai/mb/fix-silero-vad-import
Fix Silero VAD import issue
2024-10-21 20:48:06 -04:00
Mark Backman
ccc3691620 Fix Silero VAD import issue 2024-10-21 20:39:20 -04:00
Vanessa Pyne
5321affda7 Merge pull request #588 from Allenmylath/patch-11
Update README.md
2024-10-21 11:20:05 -05:00
Mark Backman
e5ad8dc67b Merge pull request #627 from pipecat-ai/mb/upgrade-gladia-to-v2-api
Update GladiaSTTService to use the Gladia V2 API
2024-10-21 12:01:20 -04:00
Mark Backman
46927805bc Update GladiaSTTService to use the Gladia V2 API 2024-10-21 07:10:38 -04:00
Kwindla Hultman Kramer
07712cdb16 gemini function calling and partial implementation of standard context stuff 2024-10-18 17:14:57 -07:00
allenmylath
b999b76f70 Update README.md
readme description still shows simple-chatbot definition hence made more accurate description
2024-10-15 08:14:43 +05:30
allenmylath
0e69625a01 Rename frames.md to frame.md
edited again to frame.md
2024-10-14 10:07:47 +05:30
allenmylath
4e0823fced Rename Frames.md to frames.md
file name changed as requested
2024-10-14 10:05:26 +05:30
Allenmylath
40af3571f0 Create Frames.md
Made asmall explanation for diffrent types of frames in pipcat
2024-10-05 22:04:03 +05:30
124 changed files with 3583 additions and 723 deletions

View File

@@ -38,4 +38,4 @@ jobs:
id: ruff
run: |
source .venv/bin/activate
ruff format --config line-length=100 --diff --exclude "*_pb2.py"
ruff format --diff

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@ __pycache__/
*~
venv
.venv
/.idea
#*#
# Distribution / packaging

View File

@@ -5,7 +5,97 @@ All notable changes to **Pipecat** will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
## [0.0.48] - 2024-11-10 "Antonio release"
### Added
- There's now an input queue in each frame processor. When you call
`FrameProcessor.push_frame()` this will internally call
`FrameProcessor.queue_frame()` on the next processor (upstream or downstream)
and the frame will be internally queued (except system frames). Then, the
queued frames will get processed. With this input queue it is also possible
for FrameProcessors to block processing more frames by calling
`FrameProcessor.pause_processing_frames()`. The way to resume processing
frames is by calling `FrameProcessor.resume_processing_frames()`.
- Added audio filter `NoisereduceFilter`.
- Introduce input transport audio filters (`BaseAudioFilter`). Audio filters can
be used to remove background noises before audio is sent to VAD.
- Introduce output transport audio mixers (`BaseAudioMixer`). Output transport
audio mixers can be used, for example, to add background sounds or any other
audio mixing functionality before the output audio is actually written to the
transport.
- Added `GatedOpenAILLMContextAggregator`. This aggregator keeps the last
received OpenAI LLM context frame and it doesn't let it through until the
notifier is notified.
- Added `WakeNotifierFilter`. This processor expects a list of frame types and
will execute a given callback predicate when a frame of any of those type is
being processed. If the callback returns true the notifier will be notified.
- Added `NullFilter`. A null filter doesn't push any frames upstream or
downstream. This is usually used to disable one of the pipelines in
`ParallelPipeline`.
- Added `EventNotifier`. This can be used as a very simple synchronization
feature between processors.
- Added `TavusVideoService`. This is an integration for Tavus digital twins.
(see https://www.tavus.io/)
- Added `DailyTransport.update_subscriptions()`. This allows you to have fine
grained control of what media subscriptions you want for each participant in a
room.
- Added audio filter `KrispFilter`.
### Changed
- The following `DailyTransport` functions are now `async` which means they need
to be awaited: `start_dialout`, `stop_dialout`, `start_recording`,
`stop_recording`, `capture_participant_transcription` and
`capture_participant_video`.
- Changed default output sample rate to 24000. This changes all TTS service to
output to 24000 and also the default output transport sample rate. This
improves audio quality at the cost of some extra bandwidth.
- `AzureTTSService` now uses Azure websockets instead of HTTP requests.
- The previous `AzureTTSService` HTTP implementation is now
`AzureHttpTTSService`.
### Fixed
- Websocket transports (FastAPI and Websocket) now synchronize with time before
sending data. This allows for interruptions to just work out of the box.
- Improved bot speaking detection for all TTS services by using actual bot
audio.
- Fixed an issue that was generating constant bot started/stopped speaking
frames for HTTP TTS services.
- Fixed an issue that was causing stuttering with AWS TTS service.
- Fixed an issue with PlayHTTTSService, where the TTFB metrics were reporting
very small time values.
- Fixed an issue where AzureTTSService wasn't initializing the specified
language.
### Other
- Add `23-bot-background-sound.py` foundational example.
- Added a new foundational example `22-natural-conversation.py`. This example
shows how to achieve a more natural conversation detecting when the user ends
statement.
## [0.0.47] - 2024-10-22
### Added
@@ -15,8 +105,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added a foundational example for Gladia transcription:
`13c-gladia-transcription.py`
### Changed
- Updated `GladiaSTTService` to use the V2 API.
- Changed `DailyTransport` transcription model to `nova-2-general`.
### Fixed
- Fixed an issue that would cause an import error when importing
`SileroVADAnalyzer` from the old package `pipecat.vad.silero`.
- Fixed `enable_usage_metrics` to control LLM/TTS usage metrics separately
from `enable_metrics`.
@@ -32,6 +131,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed
- Changed `DeepgramSTTService` model to `nova-2-general`.
- Moved `SileroVAD` audio processor to `processors.audio.vad`.
- Module `utils.audio` is now `audio.utils`. A new `resample_audio` function has

View File

@@ -64,7 +64,7 @@ async def main():
# Use Daily as a real-time media transport (WebRTC)
transport = DailyTransport(
room_url=...,
token=...,
token="", # leave empty. Note: token is _not_ your api key
bot_name="Bot Name",
params=DailyParams(audio_out_enabled=True))
@@ -129,6 +129,24 @@ Pipecat makes use of WebRTC VAD by default when using a WebRTC transport layer.
pip install pipecat-ai[silero]
```
## Running the Krisp Audio Filter
To use the Krisp Filter in this project, youll need access to the **Krisp C++ SDK**.
### Step 1: Obtain Access to the Krisp SDK
1. **Create a Krisp Account**: If you dont already have an account, [sign up at Krisp](https://krisp.ai/) to access the SDK.
2. **Download the SDK**: Once you have an account, follow the instructions on the Krisp platform to download the [Krisp's desktop SDKs](https://sdk.krisp.ai/sdk/desktop).
3. **Export the path to you krisp SDK**:
`export KRISP_SDK_PATH=/PATH/TO/KRISP/SDK`
### Step 2: Install the `pipecat-krisp` Module
Once the environment variable `KRISP_SDK_PATH` is exported, activate your Python virtual environment and install it with `pip`:
```shell
source venv/bin/activate
pip install pipecat-ai[krisp]
```
## Hacking on the framework itself
_Note that you may need to set up a virtual environment before following the instructions below. For instance, you might need to run the following from the root of the repo:_
@@ -178,7 +196,7 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
:ensure t
:hook ((python-mode . lazy-ruff-mode))
:config
(setq lazy-ruff-format-command "ruff format --config line-length=100")
(setq lazy-ruff-format-command "ruff format")
(setq lazy-ruff-only-format-block t)
(setq lazy-ruff-only-format-region t)
(setq lazy-ruff-only-format-buffer t))
@@ -197,14 +215,13 @@ You can use [use-package](https://github.com/jwiegley/use-package) to install [e
### Visual Studio Code
Install the
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, enable formatting on save and configure `ruff` arguments:
[Ruff](https://marketplace.visualstudio.com/items?itemName=charliermarsh.ruff) extension. Then edit the user settings (_Ctrl-Shift-P_ `Open User Settings (JSON)`) and set it as the default Python formatter, and enable formatting on save:
```json
"[python]": {
"editor.defaultFormatter": "charliermarsh.ruff",
"editor.formatOnSave": true
},
"ruff.format.args": ["--config", "line-length=100"]
}
```
## Getting help

165
docs/CONTRIBUTING.md Normal file
View File

@@ -0,0 +1,165 @@
## Contributing to Pipecat
We welcome contributions of all kinds! Your help is appreciated. Follow these steps to get involved:
1. **Fork this repository**: Start by forking the Pipecat Documentation repository to your GitHub account.
2. **Clone the repository**: Clone your forked repository to your local machine.
```bash
git clone https://github.com/your-username/pipecat
```
3. **Create a branch**: For your contribution, create a new branch.
```bash
git checkout -b your-branch-name
```
4. **Make your changes**: Edit or add files as necessary.
5. **Test your changes**: Ensure that your changes look correct and follow the style set in the codebase.
6. **Commit your changes**: Once you're satisfied with your changes, commit them with a meaningful message.
```bash
git commit -m "Description of your changes"
```
7. **Push your changes**: Push your branch to your forked repository.
```bash
git push origin your-branch-name
```
9. **Submit a Pull Request (PR)**: Open a PR from your forked repository to the main branch of this repo.
> Important: Describe the changes you've made clearly!
Our maintainers will review your PR, and once everything is good, your contributions will be merged!
# Contributor Covenant Code of Conduct
## Our Pledge
We as members, contributors, and leaders pledge to make participation in our
community a harassment-free experience for everyone, regardless of age, body
size, visible or invisible disability, ethnicity, sex characteristics, gender
identity and expression, level of experience, education, socio-economic status,
nationality, personal appearance, race, caste, color, religion, or sexual
identity and orientation.
We pledge to act and interact in ways that contribute to an open, welcoming,
diverse, inclusive, and healthy community.
## Our Standards
Examples of behavior that contributes to a positive environment for our
community include:
* Demonstrating empathy and kindness toward other people
* Being respectful of differing opinions, viewpoints, and experiences
* Giving and gracefully accepting constructive feedback
* Accepting responsibility and apologizing to those affected by our mistakes,
and learning from the experience
* Focusing on what is best not just for us as individuals, but for the overall
community
Examples of unacceptable behavior include:
* The use of sexualized language or imagery, and sexual attention or advances of
any kind
* Trolling, insulting or derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or email address,
without their explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Enforcement Responsibilities
Community leaders are responsible for clarifying and enforcing our standards of
acceptable behavior and will take appropriate and fair corrective action in
response to any behavior that they deem inappropriate, threatening, offensive,
or harmful.
Community leaders have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are
not aligned to this Code of Conduct, and will communicate reasons for moderation
decisions when appropriate.
## Scope
This Code of Conduct applies within all community spaces, and also applies when
an individual is officially representing the community in public spaces.
Examples of representing our community include using an official email address,
posting via an official social media account, or acting as an appointed
representative at an online or offline event.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported to the community leaders responsible for enforcement at pipecat-ai@daily.co.
All complaints will be reviewed and investigated promptly and fairly.
All community leaders are obligated to respect the privacy and security of the
reporter of any incident.
## Enforcement Guidelines
Community leaders will follow these Community Impact Guidelines in determining
the consequences for any action they deem in violation of this Code of Conduct:
### 1. Correction
**Community Impact**: Use of inappropriate language or other behavior deemed
unprofessional or unwelcome in the community.
**Consequence**: A private, written warning from community leaders, providing
clarity around the nature of the violation and an explanation of why the
behavior was inappropriate. A public apology may be requested.
### 2. Warning
**Community Impact**: A violation through a single incident or series of
actions.
**Consequence**: A warning with consequences for continued behavior. No
interaction with the people involved, including unsolicited interaction with
those enforcing the Code of Conduct, for a specified period of time. This
includes avoiding interactions in community spaces as well as external channels
like social media. Violating these terms may lead to a temporary or permanent
ban.
### 3. Temporary Ban
**Community Impact**: A serious violation of community standards, including
sustained inappropriate behavior.
**Consequence**: A temporary ban from any sort of interaction or public
communication with the community for a specified period of time. No public or
private interaction with the people involved, including unsolicited interaction
with those enforcing the Code of Conduct, is allowed during this period.
Violating these terms may lead to a permanent ban.
### 4. Permanent Ban
**Community Impact**: Demonstrating a pattern of violation of community
standards, including sustained inappropriate behavior, harassment of an
individual, or aggression toward or disparagement of classes of individuals.
**Consequence**: A permanent ban from any sort of public interaction within the
community.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage],
version 2.1, available at
[https://www.contributor-covenant.org/version/2/1/code_of_conduct.html][v2.1].
Community Impact Guidelines were inspired by
[Mozilla's code of conduct enforcement ladder][Mozilla CoC].
For answers to common questions about this code of conduct, see the FAQ at
[https://www.contributor-covenant.org/faq][FAQ]. Translations are available at
[https://www.contributor-covenant.org/translations][translations].
[homepage]: https://www.contributor-covenant.org
[v2.1]: https://www.contributor-covenant.org/version/2/1/code_of_conduct.html
[Mozilla CoC]: https://github.com/mozilla/diversity
[FAQ]: https://www.contributor-covenant.org/faq
[translations]: https://www.contributor-covenant.org/translations

22
docs/ISSUE_TEMPLATE.md Normal file
View File

@@ -0,0 +1,22 @@
# Description
Is this reporting a bug or feature request?
If reporting a bug, please fill out the following:
### Environment
- pipecat-ai version:
- python version:
- OS:
### Issue description
Provide a clear description of the issue.
### Repro steps
List the steps to reproduce the issue.
### Expected behavior
### Actual behavior
### Logs

View File

@@ -0,0 +1 @@
#### Please describe the changes in your PR. If it is addressing an issue, please reference that as well.

113
docs/frame.md Normal file
View File

@@ -0,0 +1,113 @@
# Understanding Different Frame Types in the Pipecat System
In the Pipecat system, frames are used to represent different types of data and control signals that flow through the pipeline. Understanding these frame types is crucial for working with the system effectively. This tutorial will cover the main categories of frames and their specific uses.
## 1. Base Frame Classes
### Frame
The `Frame` class is the base class for all frames. It includes:
- `id`: A unique identifier
- `name`: A descriptive name
- `pts`: Presentation timestamp (optional)
### DataFrame
`DataFrame` is a subclass of `Frame` and serves as a base for most data-carrying frames.
## 2. Audio Frames
### AudioRawFrame
Represents a chunk of audio with properties:
- `audio`: Raw audio data
- `sample_rate`: Audio sample rate
- `num_channels`: Number of audio channels
Subclasses include:
- `InputAudioRawFrame`: For audio from input sources
- `OutputAudioRawFrame`: For audio to be played by output devices
- `TTSAudioRawFrame`: For audio generated by Text-to-Speech services
## 3. Image Frames
### ImageRawFrame
Represents an image with properties:
- `image`: Raw image data
- `size`: Image dimensions
- `format`: Image format (e.g., JPEG, PNG)
Subclasses include:
- `InputImageRawFrame`: For images from input sources
- `OutputImageRawFrame`: For images to be displayed
- `UserImageRawFrame`: For images associated with a specific user
- `VisionImageRawFrame`: For images with associated text for description
- `URLImageRawFrame`: For images with an associated URL
### SpriteFrame
Represents an animated sprite, containing a list of `ImageRawFrame` objects.
## 4. Text and Transcription Frames
### TextFrame
Represents a chunk of text, used for various purposes in the pipeline.
### TranscriptionFrame
A specialized `TextFrame` for speech transcriptions, including:
- `user_id`: ID of the speaking user
- `timestamp`: When the transcription was generated
- `language`: Detected language of the speech
### InterimTranscriptionFrame
Similar to `TranscriptionFrame`, but for interim (not final) transcriptions.
## 5. LLM (Language Model) Frames
### LLMMessagesFrame
Contains a list of messages for an LLM service to process.
### LLMMessagesAppendFrame and LLMMessagesUpdateFrame
Used to modify the current context of LLM messages.
### LLMSetToolsFrame
Specifies tools (functions) available for the LLM to use.
### LLMEnablePromptCachingFrame
Controls prompt caching in certain LLMs.
## 6. System and Control Frames
### SystemFrame
Base class for system-level frames.
Important system frames include:
- `StartFrame`: Initiates a pipeline
- `CancelFrame`: Stops a pipeline immediately
- `ErrorFrame`: Notifies of errors (with `FatalErrorFrame` for unrecoverable errors)
- `EndTaskFrame` and `CancelTaskFrame`: Control pipeline tasks
- `StartInterruptionFrame` and `StopInterruptionFrame`: Indicate user speech for interruptions
### ControlFrame
Base class for control-flow frames.
Notable control frames:
- `EndFrame`: Signals the end of a pipeline
- `LLMFullResponseStartFrame` and `LLMFullResponseEndFrame`: Bracket LLM responses
- `UserStartedSpeakingFrame` and `UserStoppedSpeakingFrame`: Indicate user speech activity
- `BotStartedSpeakingFrame` and `BotStoppedSpeakingFrame`: Indicate bot speech activity
- `TTSStartedFrame` and `TTSStoppedFrame`: Bracket Text-to-Speech responses
## 7. Special Purpose Frames
### AppFrame
Base class for application-specific custom frames.
### MetricsFrame
Contains performance metrics data.
### FunctionCallInProgressFrame and FunctionCallResultFrame
Used for handling LLM function (tool) calls.
### ServiceUpdateSettingsFrame
Base class for updating service settings, with specific subclasses for LLM, TTS, and STT services.
## Conclusion
Understanding these frame types is essential for working with the Pipecat system. Each frame type serves a specific purpose in the pipeline, whether it's carrying data (like audio or images), controlling the flow of the pipeline, or managing system-level operations. By using the appropriate frame types, you can effectively process and transmit various kinds of information through your pipeline.

View File

@@ -46,5 +46,13 @@ PLAY_HT_API_KEY=...
# OpenAI
OPENAI_API_KEY=...
#OpenPipe
# OpenPipe
OPENPIPE_API_KEY=...
# Tavus
TAVUS_API_KEY=...
TAVUS_REPLICA_ID=...
TAVUS_PERSONA_ID=...
#Krisp
KRISP_MODEL_PATH=...

View File

@@ -1,12 +1,41 @@
# Simple Chatbot
# Chatbot with canonical-metrics
<img src="image.png" width="420px">
This project implements a chatbot using a pipeline architecture that integrates audio processing, transcription, and a language model for conversational interactions. The chatbot operates within a daily communication environment, utilizing various services for text-to-speech and language model responses.
This app connects you to a chatbot powered by GPT-4, complete with animations generated by Stable Video Diffusion.
## Features
See a video of it in action: https://x.com/kwindla/status/1778628911817183509
- **Audio Input and Output**: Captures microphone input and plays back audio responses.
- **Voice Activity Detection**: Utilizes Silero VAD to manage audio input intelligently.
- **Text-to-Speech**: Integrates ElevenLabs TTS service to convert text responses into audio.
- **Language Model Interaction**: Uses OpenAI's GPT-4 model to generate responses based on user input.
- **Transcription Services**: Captures and transcribes participant speech for analytics.
- **Metrics Collection**: Sends audio data for analysis via Canonical Metrics Service.
## Requirements
- Python 3.10+
- `python-dotenv`
- Additional libraries from the `pipecat` package.
## Setup
1. Clone the repository.
2. Install the required packages.
3. Set up environment variables for API keys:
- `OPENAI_API_KEY`
- `ELEVENLABS_API_KEY`
- `CANONICAL_API_KEY`
- `CANONICAL_API_URL`
4. Run the script.
## Usage
The chatbot introduces itself and engages in conversations, providing brief and creative responses. Designed for flexibility, it can support multiple languages with appropriate configuration.
## Events
- Participants joining or leaving the call are handled dynamically, adjusting the chatbot's behavior accordingly.
And a quick video walkthrough of the code: https://www.loom.com/share/13df1967161f4d24ade054e7f8753416
The first time, things might take extra time to get started since VAD (Voice Activity Detection) model needs to be downloaded.

View File

@@ -124,7 +124,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -123,7 +123,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -75,7 +75,7 @@ async def main(room_url: str, token: str):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -81,7 +81,7 @@ async def main(room_url: str, token: str, callId: str, callDomain: str):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -84,7 +84,7 @@ async def main(room_url: str, token: str, callId: str, sipUri: str):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])
@transport.event_handler("on_participant_left")

View File

@@ -9,11 +9,11 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
@@ -36,7 +36,7 @@ async def main():
room_url, None, "Say One Thing", DailyParams(audio_out_enabled=True)
)
tts = CartesiaHttpTTSService(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -50,12 +50,9 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
await task.queue_frame(TextFrame(f"Hello there, {participant_name}!"))
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.queue_frames(
[TTSSpeakFrame(f"Hello there, {participant_name}!"), EndFrame()]
)
await runner.run(task)

View File

@@ -9,7 +9,7 @@ import aiohttp
import os
import sys
from pipecat.frames.frames import TextFrame
from pipecat.frames.frames import EndFrame, TTSSpeakFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
@@ -28,25 +28,24 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
transport = LocalAudioTransport(TransportParams(audio_out_enabled=True))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
pipeline = Pipeline([tts, transport.output()])
pipeline = Pipeline([tts, transport.output()])
task = PipelineTask(pipeline)
task = PipelineTask(pipeline)
async def say_something():
await asyncio.sleep(1)
await task.queue_frame(TextFrame("Hello there!"))
async def say_something():
await asyncio.sleep(1)
await task.queue_frames([TTSSpeakFrame("Hello there, how is it going!"), EndFrame()])
runner = PipelineRunner()
runner = PipelineRunner()
await asyncio.gather(runner.run(task), say_something())
await asyncio.gather(runner.run(task), say_something())
if __name__ == "__main__":

View File

@@ -81,7 +81,7 @@ async def main():
url=url,
token=token,
room_name=room_name,
params=LiveKitParams(audio_out_enabled=True, audio_out_sample_rate=16000),
params=LiveKitParams(audio_out_enabled=True),
)
tts = CartesiaTTSService(

View File

@@ -13,7 +13,7 @@ from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.services.cartesia import CartesiaHttpTTSService
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
@@ -37,7 +37,7 @@ async def main():
room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True)
)
tts = CartesiaHttpTTSService(
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
@@ -57,11 +57,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await task.queue_frame(LLMMessagesFrame(messages))
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.queue_frames([LLMMessagesFrame(messages), EndFrame()])
await runner.run(task)

View File

@@ -5,33 +5,31 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import Frame, LLMMessagesFrame, MetricsFrame
from pipecat.metrics.metrics import (
TTFBMetricsData,
ProcessingMetricsData,
LLMUsageMetricsData,
ProcessingMetricsData,
TTFBMetricsData,
TTSUsageMetricsData,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -105,11 +103,14 @@ async def main():
]
)
task = PipelineTask(pipeline)
task = PipelineTask(
pipeline,
PipelineParams(enable_metrics=True, enable_usage_metrics=True),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -127,7 +127,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
participant_name = participant.get("info", {}).get("userName", "")
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([TextFrame(f"Hi there {participant_name}!")])
runner = PipelineRunner()

View File

@@ -89,7 +89,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -87,7 +87,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -82,7 +82,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -109,7 +109,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
lc.set_participant_id(participant["id"])
# Kick off the conversation.
# the `LLMMessagesFrame` will be picked up by the LangchainProcessor using

View File

@@ -31,11 +31,11 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,

View File

@@ -85,7 +85,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -40,7 +40,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
@@ -89,7 +88,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -41,7 +41,6 @@ async def main():
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
@@ -90,7 +89,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -74,7 +74,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -86,7 +86,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -81,7 +81,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -5,12 +5,16 @@
#
import asyncio
import aiohttp
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
@@ -20,12 +24,6 @@ from pipecat.services.gladia import GladiaSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
@@ -85,11 +83,16 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
runner = PipelineRunner()
await runner.run(task)

View File

@@ -77,7 +77,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -96,7 +96,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -32,15 +32,14 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_sample_rate=16000,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
@@ -85,7 +84,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -32,11 +32,11 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
token,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
@@ -82,7 +82,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -83,7 +83,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -0,0 +1,95 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import os
import sys
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.vad.silero import SileroVADAnalyzer
from pipecat.audio.filters.krisp_filter import KrispFilter
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
audio_in_filter=KrispFilter(),
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = DeepgramTTSService(api_key=os.getenv("DEEPGRAM_API_KEY"), voice="aura-helios-en")
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -63,6 +63,7 @@ async def main():
"Test",
DailyParams(
audio_in_enabled=True,
audio_in_sample_rate=24000,
audio_out_enabled=True,
camera_out_enabled=True,
camera_out_is_live=True,
@@ -73,7 +74,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
await transport.capture_participant_video(participant["id"])
pipeline = Pipeline([transport.input(), MirrorProcessor(), transport.output()])

View File

@@ -65,7 +65,7 @@ async def main():
tk_root.title("Local Mirror")
daily_transport = DailyTransport(
room_url, token, "Test", DailyParams(audio_in_enabled=True)
room_url, token, "Test", DailyParams(audio_in_enabled=True, audio_in_sample_rate=24000)
)
tk_transport = TkLocalTransport(
@@ -81,7 +81,7 @@ async def main():
@daily_transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_video(participant["id"])
await transport.capture_participant_video(participant["id"])
pipeline = Pipeline([daily_transport.input(), MirrorProcessor(), tk_transport.output()])

View File

@@ -82,7 +82,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await tts.say("Hi! If you want to talk to me, just say 'Hey Robot'.")
runner = PipelineRunner()

View File

@@ -134,7 +134,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await tts.say("Hi, I'm listening!")
await transport.send_audio(sounds["ding1.wav"])

View File

@@ -84,8 +84,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -86,8 +86,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -83,8 +83,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -78,16 +78,13 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
params=CartesiaTTSService.InputParams(
sample_rate=16000,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await tts.say("Hi there! Feel free to ask me what I see.")
transport.capture_participant_video(participant["id"], framerate=0)
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
image_requester.set_participant_id(participant["id"])
pipeline = Pipeline(

View File

@@ -127,7 +127,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -105,7 +105,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -67,7 +67,8 @@ async def main():
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3-5-sonnet-20240620",
# model="claude-3-5-sonnet-20240620",
model="claude-3-5-sonnet-latest",
enable_prompt_caching_beta=True,
)
llm.register_function("get_weather", get_weather)
@@ -160,8 +161,8 @@ If you need to use a tool, simply use the tool. Do not tell the user the tool yo
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(video_participant_id)
transport.capture_participant_video(video_participant_id, framerate=0)
await transport.capture_participant_transcription(video_participant_id)
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -123,7 +123,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
# await tts.say("Hi! Ask me about the weather in San Francisco.")

View File

@@ -153,8 +153,8 @@ indicate you should use the get_image tool are:
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
transport.capture_participant_transcription(participant["id"])
transport.capture_participant_video(video_participant_id, framerate=0)
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await tts.say("Hi! Ask me about the weather in San Francisco.")

View File

@@ -0,0 +1,173 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.services.openai import OpenAILLMContext
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
async def get_weather(function_name, tool_call_id, arguments, llm, context, result_callback):
location = arguments["location"]
await result_callback(f"The weather in {location} is currently 72 degrees and sunny.")
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
logger.debug(f"!!! IN get_image {video_participant_id}, {arguments}")
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def main():
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
llm.register_function("get_weather", get_weather)
llm.register_function("get_image", get_image)
tools = [
{
"function_declarations": [
{
"name": "get_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "get_image",
"description": "Get and image from the camera or video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to to use when running inference on the acquired image.",
},
},
"required": ["question"],
},
},
]
}
]
system_prompt = """\
You are a helpful assistant who converses with a user and answers questions. Respond concisely to general questions.
Your response will be turned into speech so use only simple words and punctuation.
You have access to two tools: get_weather and get_image.
You can respond to questions about the weather using the get_weather tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": "Say hello."},
]
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(),
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -141,7 +141,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{

View File

@@ -10,7 +10,7 @@ import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.frames.frames import LLMMessagesFrame, TTSUpdateSettingsFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
@@ -19,7 +19,6 @@ from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.whisper import Model, WhisperSTTService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from openai.types.chat import ChatCompletionToolParam
@@ -61,16 +60,14 @@ async def main():
token,
"Pipecat",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = WhisperSTTService(model=Model.LARGE)
english_tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
@@ -116,7 +113,6 @@ async def main():
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
context_aggregator.user(), # User responses
llm, # LLM
ParallelPipeline( # TTS (bot will speak the chosen language)
@@ -132,7 +128,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append(
{

View File

@@ -92,7 +92,7 @@ async def main():
# bot can "hear" and respond to them.
@transport.event_handler("on_participant_joined")
async def on_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# When the first participant joins, the bot should introduce itself.
@transport.event_handler("on_first_participant_joined")

View File

@@ -99,7 +99,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -166,7 +166,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -223,7 +223,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -249,7 +249,7 @@ Remember, your responses should be short. Just one or two sentences, usually."""
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -98,12 +98,13 @@ async def load_conversation(function_name, tool_call_id, args, llm, context, res
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a succinct, creative and helpful way. Prefer responses that are one sentence long unless you are asked for a longer or more detailed response.",
},
{"role": "user", "content": ""},
{"role": "assistant", "content": []},
{"role": "user", "content": "Tell me"},
{"role": "user", "content": "a joke"},
{"role": "user", "content": "Start the call by saying the word 'hello'. Say only that word."},
# {"role": "user", "content": ""},
# {"role": "assistant", "content": []},
# {"role": "user", "content": "Tell me"},
# {"role": "user", "content": "a joke"},
]
tools = [
{
@@ -183,7 +184,7 @@ async def main():
)
llm = AnthropicLLMService(
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-20240620"
api_key=os.getenv("ANTHROPIC_API_KEY"), model="claude-3-5-sonnet-latest"
)
# you can either register a single function for all function calls, or specific functions
@@ -219,7 +220,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])

View File

@@ -0,0 +1,290 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import glob
import json
import os
import sys
from datetime import datetime
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import (
OpenAILLMContext,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.google import GoogleLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
video_participant_id = None
BASE_FILENAME = "/tmp/pipecat_conversation_"
tts = None
async def fetch_weather_from_api(function_name, tool_call_id, args, llm, context, result_callback):
temperature = 75 if args["format"] == "fahrenheit" else 24
await result_callback(
{
"conditions": "nice",
"temperature": temperature,
"format": args["format"],
"timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"),
}
)
async def get_image(function_name, tool_call_id, arguments, llm, context, result_callback):
question = arguments["question"]
await llm.request_image_frame(user_id=video_participant_id, text_content=question)
async def get_saved_conversation_filenames(
function_name, tool_call_id, args, llm, context, result_callback
):
# Construct the full pattern including the BASE_FILENAME
full_pattern = f"{BASE_FILENAME}*.json"
# Use glob to find all matching files
matching_files = glob.glob(full_pattern)
logger.debug(f"matching files: {matching_files}")
await result_callback({"filenames": matching_files})
async def save_conversation(function_name, tool_call_id, args, llm, context, result_callback):
timestamp = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"{BASE_FILENAME}{timestamp}.json"
logger.debug(
f"writing conversation to {filename}\n{json.dumps(context.get_messages_for_logging(), indent=4)}"
)
try:
with open(filename, "w") as file:
# todo: extract 'system' into the first message in the list
messages = context.get_messages_for_persistent_storage()
# remove the last message (the instruction to save the context)
messages.pop()
json.dump(messages, file, indent=2)
await result_callback({"success": True})
except Exception as e:
logger.debug(f"error saving conversation: {e}")
await result_callback({"success": False, "error": str(e)})
async def load_conversation(function_name, tool_call_id, args, llm, context, result_callback):
global tts
filename = args["filename"]
logger.debug(f"loading conversation from {filename}")
try:
with open(filename, "r") as file:
context.set_messages(json.load(file))
await result_callback(
{
"success": True,
"message": "The most recent conversation has been loaded. Awaiting further instructions.",
}
)
except Exception as e:
await result_callback({"success": False, "error": str(e)})
# Test message munging ...
messages = [
{
"role": "system",
"content": """You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your
capabilities in a succinct way. Your output will be converted to audio so don't include special
characters in your answers. Respond to what the user said in a creative and helpful way.
You have several tools you can use to help you.
You can respond to questions about the weather using the get_weather tool.
You can save the current conversation using the save_conversation tool. This tool allows you to save
the current conversation to external storage. If the user asks you to save the conversation, use this
save_conversation too.
You can load a saved conversation using the load_conversation tool. This tool allows you to load a
conversation from external storage. You can get a list of conversations that have been saved using the
get_saved_conversation_filenames tool.
You can answer questions about the user's video stream using the get_image tool. Some examples of phrases that \
indicate you should use the get_image tool are:
- What do you see?
- What's in the video?
- Can you describe the video?
- Tell me about what you see.
- Tell me something interesting about what you see.
- What's happening in the video?
""",
},
# {"role": "user", "content": ""},
# {"role": "assistant", "content": []},
# {"role": "user", "content": "Tell me"},
# {"role": "user", "content": "a joke"},
]
tools = [
{
"function_declarations": [
{
"name": "get_current_weather",
"description": "Get the current weather",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA",
},
"format": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "The temperature unit to use. Infer this from the users location.",
},
},
"required": ["location", "format"],
},
},
{
"name": "save_conversation",
"description": "Save the current conversation. Use this function to persist the current conversation to external storage.",
"parameters": {
"type": "object",
"properties": {
"user_request_text": {
"type": "string",
"description": "The text of the user's request to save the conversation.",
}
},
"required": ["user_request_text"],
},
},
{
"name": "get_saved_conversation_filenames",
"description": "Get a list of saved conversation histories. Returns a list of filenames. Each filename includes a date and timestamp. Each file is conversation history that can be loaded into this session.",
"parameters": None,
},
{
"name": "load_conversation",
"description": "Load a conversation history. Use this function to load a conversation history into the current session.",
"parameters": {
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the conversation history to load.",
}
},
"required": ["filename"],
},
},
{
"name": "get_image",
"description": "Get and image from the camera or video stream.",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question to to use when running inference on the acquired image.",
},
},
"required": ["question"],
},
},
]
},
]
async def main():
global tts
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.8)),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = GoogleLLMService(model="gemini-1.5-flash-latest", api_key=os.getenv("GOOGLE_API_KEY"))
# you can either register a single function for all function calls, or specific functions
# llm.register_function(None, fetch_weather_from_api)
llm.register_function("get_current_weather", fetch_weather_from_api)
llm.register_function("save_conversation", save_conversation)
llm.register_function("get_saved_conversation_filenames", get_saved_conversation_filenames)
llm.register_function("load_conversation", load_conversation)
llm.register_function("get_image", get_image)
context = OpenAILLMContext(messages, tools)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(),
llm, # LLM
tts,
context_aggregator.assistant(),
transport.output(), # Transport bot output
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
# report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
global video_participant_id
video_participant_id = participant["id"]
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(video_participant_id, framerate=0)
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,133 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from typing import Any, Mapping
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.llm_response import (
LLMAssistantResponseAggregator,
LLMUserResponseAggregator,
)
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.tavus import TavusVideoService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from pipecat.audio.vad.silero import SileroVADAnalyzer
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
tavus = TavusVideoService(
api_key=os.getenv("TAVUS_API_KEY"),
replica_id=os.getenv("TAVUS_REPLICA_ID"),
persona_id=os.getenv("TAVUS_PERSONA_ID", "pipecat0"),
session=session,
)
# get persona, look up persona_name, set this as the bot name to ignore
persona_name = await tavus.get_persona_name()
room_url = await tavus.initialize()
transport = DailyTransport(
room_url=room_url,
token=None,
bot_name="Pipecat bot",
params=DailyParams(
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="a167e0f3-df7e-4d52-a9c3-f949145efdab",
)
llm = OpenAILLMService(model="gpt-4o-mini")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
tma_in = LLMUserResponseAggregator(messages)
tma_out = LLMAssistantResponseAggregator(messages)
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt, # STT
tma_in, # User responses
llm, # LLM
tts, # TTS
tavus, # Tavus output layer
transport.output(), # Transport bot output
tma_out, # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_participant_joined")
async def on_participant_joined(
transport: DailyTransport, participant: Mapping[str, Any]
) -> None:
# Ignore the Tavus replica's microphone
if participant.get("info", {}).get("userName", "") == persona_name:
logger.debug(f"Ignoring {participant['id']}'s microphone")
await transport.update_subscriptions(
participant_settings={
participant["id"]: {
"media": {"microphone": "unsubscribed"},
}
}
)
if participant.get("info", {}).get("userName", "") != persona_name:
# Kick off the conversation.
messages.append(
{"role": "system", "content": "Please introduce yourself to the user."}
)
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,168 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.parallel_pipeline import ParallelPipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.gated_openai_llm_context import GatedOpenAILLMContextAggregator
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.filters.null_filter import NullFilter
from pipecat.processors.filters.wake_notifier_filter import WakeNotifierFilter
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.openai import OpenAILLMService
from pipecat.sync.event_notifier import EventNotifier
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
(room_url, _) = await configure(session)
transport = DailyTransport(
room_url,
None,
"Respond bot",
DailyParams(
audio_out_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
),
)
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
# This is the LLM that will be used to detect if the user has finished a
# statement. This doesn't really need to be an LLM, we could use NLP
# libraries for that, but it was easier as an example because we
# leverage the context aggregators.
statement_llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
statement_messages = [
{
"role": "system",
"content": "Determine if the user's statement is a complete sentence or question, ending in a natural pause or punctuation. Return 'YES' if it is complete and 'NO' if it seems to leave a thought unfinished.",
},
]
statement_context = OpenAILLMContext(statement_messages)
statement_context_aggregator = statement_llm.create_context_aggregator(statement_context)
# This is the regular LLM.
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# We have instructed the LLM to return 'YES' if it thinks the user
# completed a sentence. So, if it's 'YES' we will return true in this
# predicate which will wake up the notifier.
async def wake_check_filter(frame):
return frame.text == "YES"
# This is a notifier that we use to synchronize the two LLMs.
notifier = EventNotifier()
# This a filter that will wake up the notifier if the given predicate
# (wake_check_filter) returns true.
completness_check = WakeNotifierFilter(
notifier, types=(TextFrame,), filter=wake_check_filter
)
# This processor keeps the last context and will let it through once the
# notifier is woken up.
gated_context_aggregator = GatedOpenAILLMContextAggregator(notifier)
# Notify if the user hasn't said anything.
async def user_idle_notifier(frame):
await notifier.notify()
# Sometimes the LLM will fail detecting if a user has completed a
# sentence, this will wake up the notifier if that happens.
user_idle = UserIdleProcessor(callback=user_idle_notifier, timeout=3.0)
# The ParallePipeline input are the user transcripts. We have two
# contexts. The first one will be used to determine if the user finished
# a statement and if so the notifier will be woken up. The second
# context is simply the regular context but it's gated waiting for the
# notifier to be woken up.
pipeline = Pipeline(
[
transport.input(), # Transport user input
stt,
ParallelPipeline(
[
statement_context_aggregator.user(),
statement_llm,
completness_check,
NullFilter(),
],
[context_aggregator.user(), gated_context_aggregator, llm],
),
user_idle,
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,121 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import argparse
import asyncio
import aiohttp
import os
import sys
from pipecat.audio.mixers.soundfile_mixer import SoundfileMixer
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame, MixerUpdateSettingsFrame, MixerEnableFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from runner import configure_with_args
from loguru import logger
from dotenv import load_dotenv
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
async def main():
async with aiohttp.ClientSession() as session:
parser = argparse.ArgumentParser(description="Bot Background Sound")
parser.add_argument("-i", "--input", type=str, required=True, help="Input audio file")
(room_url, token, args) = await configure_with_args(session, parser)
soundfile_mixer = SoundfileMixer(
sound_files={"office": args.input},
default_sound="office",
volume=2.0,
)
transport = DailyTransport(
room_url,
token,
"Respond bot",
DailyParams(
audio_out_enabled=True,
audio_out_mixer=soundfile_mixer,
transcription_enabled=True,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
),
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
)
llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o")
messages = [
{
"role": "system",
"content": "You are a helpful LLM in a WebRTC call. Your goal is to demonstrate your capabilities in a succinct way. Your output will be converted to audio so don't include special characters in your answers. Respond to what the user said in a creative and helpful way.",
},
]
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
pipeline = Pipeline(
[
transport.input(), # Transport user input
context_aggregator.user(), # User responses
llm, # LLM
tts, # TTS
transport.output(), # Transport bot output
context_aggregator.assistant(), # Assistant spoken responses
]
)
task = PipelineTask(
pipeline,
PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
report_only_initial_ttfb=True,
),
)
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await transport.capture_participant_transcription(participant["id"])
# Show how to use mixer control frames.
await asyncio.sleep(10.0)
await task.queue_frame(MixerUpdateSettingsFrame({"volume": 0.5}))
await asyncio.sleep(5.0)
await task.queue_frame(MixerEnableFrame(False))
await asyncio.sleep(5.0)
await task.queue_frame(MixerEnableFrame(True))
await asyncio.sleep(5.0)
# Kick off the conversation.
messages.append({"role": "system", "content": "Please introduce yourself to the user."})
await task.queue_frames([LLMMessagesFrame(messages)])
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -203,8 +203,8 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
transport.capture_participant_video(participant["id"], framerate=0)
await transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_video(participant["id"], framerate=0)
ir.set_participant_id(participant["id"])
await task.queue_frames([LLMMessagesFrame(messages)])

View File

@@ -182,7 +182,7 @@ class IntakeProcessor:
}
)
print(f"!!! about to await llm process frame in start prescrpitions")
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
print(f"!!! past await process frame in start prescriptions")
async def start_allergies(self, function_name, llm, context):
@@ -222,7 +222,7 @@ class IntakeProcessor:
"content": "Now ask the user if they have any medical conditions the doctor should know about. Once they've answered the question, call the list_conditions function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_conditions(self, function_name, llm, context):
print("!!! doing start conditions")
@@ -261,7 +261,7 @@ class IntakeProcessor:
"content": "Finally, ask the user the reason for their doctor visit today. Once they answer, call the list_visit_reasons function.",
}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def start_visit_reasons(self, function_name, llm, context):
print("!!! doing start visit reasons")
@@ -270,7 +270,7 @@ class IntakeProcessor:
context.add_message(
{"role": "system", "content": "Now, thank the user and end the conversation."}
)
await llm.process_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
await llm.queue_frame(OpenAILLMContextFrame(context), FrameDirection.DOWNSTREAM)
async def save_data(self, function_name, tool_call_id, args, llm, context, result_callback):
logger.info(f"!!! Saving data: {args}")
@@ -352,7 +352,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
print(f"Context is: {context}")
await task.queue_frames([OpenAILLMContextFrame(context)])

View File

@@ -17,6 +17,10 @@ from fastapi.responses import JSONResponse, RedirectResponse
from pipecat.transports.services.helpers.daily_rest import DailyRESTHelper, DailyRoomParams
from dotenv import load_dotenv
load_dotenv(override=True)
MAX_BOTS_PER_ROOM = 1
# Bot sub-process dict for status reporting and concurrency control

View File

@@ -102,7 +102,7 @@ async def main(room_url, token=None):
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
logger.debug("Participant joined, storytime commence!")
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
await intro_task.queue_frames(
[
images["book1"],

View File

@@ -165,7 +165,7 @@ Your task is to help the user understand and learn from this article in 2 senten
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
messages.append(
{
"role": "system",

View File

@@ -121,7 +121,7 @@ async def main():
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
transport.capture_participant_transcription(participant["id"])
await transport.capture_participant_transcription(participant["id"])
runner = PipelineRunner()

View File

@@ -12,7 +12,7 @@ from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
@@ -35,6 +35,7 @@ logger.add(sys.stderr, level="DEBUG")
async def main():
transport = WebsocketServerTransport(
params=WebsocketServerParams(
audio_out_sample_rate=16000,
audio_out_enabled=True,
add_wav_header=True,
vad_enabled=True,
@@ -50,6 +51,7 @@ async def main():
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady
sample_rate=16000,
)
messages = [
@@ -74,7 +76,7 @@ async def main():
]
)
task = PipelineTask(pipeline)
task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):

View File

@@ -21,14 +21,14 @@ classifiers = [
]
dependencies = [
"aiohttp~=3.10.3",
"loguru~=0.7.2",
"Markdown~=3.7",
"numpy~=1.26.4",
"loguru~=0.7.2",
"Pillow~=10.4.0",
"protobuf~=4.25.4",
"pydantic~=2.8.2",
"pyloudnorm~=0.1.1",
"scipy~=1.14.1",
"resampy~=0.4.3",
]
[project.urls]
@@ -42,24 +42,27 @@ aws = [ "boto3~=1.35.27" ]
azure = [ "azure-cognitiveservices-speech~=1.40.0" ]
canonical = [ "aiofiles~=24.1.0" ]
cartesia = [ "cartesia~=1.0.13", "websockets~=13.1" ]
daily = [ "daily-python~=0.11.0" ]
daily = [ "daily-python~=0.12.0" ]
deepgram = [ "deepgram-sdk~=3.7.3" ]
elevenlabs = [ "websockets~=13.1" ]
examples = [ "python-dotenv~=1.0.1", "flask~=3.0.3", "flask_cors~=4.0.1" ]
fal = [ "fal-client~=0.4.1" ]
gladia = [ "websockets~=13.1" ]
google = [ "google-generativeai~=0.7.2", "google-cloud-texttospeech~=2.17.2" ]
google = [ "google-generativeai~=0.8.3", "google-cloud-texttospeech~=2.17.2" ]
gstreamer = [ "pygobject~=3.48.2" ]
fireworks = [ "openai~=1.37.2" ]
krisp = [ "pipecat-ai-krisp~=0.2.0" ]
langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ]
livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ]
lmnt = [ "lmnt~=1.1.4" ]
local = [ "pyaudio~=0.2.14" ]
moondream = [ "einops~=0.8.0", "timm~=1.0.8", "transformers~=4.44.0" ]
noisereduce = [ "noisereduce~=3.0.3" ]
openai = [ "openai~=1.50.2", "websockets~=13.1", "python-deepcompare~=1.0.1" ]
openpipe = [ "openpipe~=4.24.0" ]
playht = [ "pyht~=0.1.4", "websockets~=13.1" ]
silero = [ "onnxruntime~=1.19.2" ]
soundfile = [ "soundfile~=0.12.1" ]
together = [ "openai~=1.50.2" ]
websocket = [ "websockets~=13.1", "fastapi~=0.115.0" ]
whisper = [ "faster-whisper~=1.0.3" ]
@@ -74,3 +77,7 @@ pythonpath = ["src"]
[tool.setuptools_scm]
local_scheme = "no-local-version"
fallback_version = "0.0.0-dev"
[tool.ruff]
exclude = ["*_pb2.py"]
line-length = 100

View File

View File

@@ -0,0 +1,47 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from pipecat.frames.frames import FilterControlFrame
class BaseAudioFilter(ABC):
"""This is a base class for input transport audio filters. If an audio
filter is provided to the input transport it will be used to process audio
before VAD and before pushing it downstream. There are control frames to
update filter settings or to enable or disable the filter at runtime.
"""
@abstractmethod
async def start(self, sample_rate: int):
"""This will be called from the input transport when the transport is
started. It can be used to initialize the filter. The input transport
sample rate is provided so the filter can adjust to that sample rate.
"""
pass
@abstractmethod
async def stop(self):
"""This will be called from the input transport when the transport is
stopping.
"""
pass
@abstractmethod
async def process_frame(self, frame: FilterControlFrame):
"""This will be called when the input transport receives a
FilterControlFrame.
"""
pass
@abstractmethod
async def filter(self, audio: bytes) -> bytes:
pass

View File

@@ -0,0 +1,78 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
import os
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from loguru import logger
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
try:
from pipecat_ai_krisp.audio.krisp_processor import KrispAudioProcessor
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error("In order to use the Krisp filter, you need to `pip install pipecat-ai[krisp]`.")
raise Exception(f"Missing module: {e}")
class KrispFilter(BaseAudioFilter):
def __init__(
self, sample_type: str = "PCM_16", channels: int = 1, model_path: str = None
) -> None:
"""
Initializes the KrispAudioProcessor with customizable audio processing settings.
:param sample_type: The type of audio sample, default is 'PCM_16'.
:param channels: Number of audio channels, default is 1.
:param model_path: Path to the Krisp model; defaults to environment variable KRISP_MODEL_PATH if not provided.
"""
super().__init__()
# Set model path, checking environment if not specified
self._model_path = model_path or os.getenv("KRISP_MODEL_PATH")
if not self._model_path:
logger.error(
"Model path for KrispAudioProcessor is not provided and KRISP_MODEL_PATH is not set."
)
raise ValueError("Model path for KrispAudioProcessor must be provided.")
self._sample_type = sample_type
self._channels = channels
self._sample_rate = 0
self._filtering = True
self._krisp_processor = None
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
self._krisp_processor = KrispAudioProcessor(
self._sample_rate, self._sample_type, self._channels, self._model_path
)
async def stop(self):
self._krisp_processor = None
async def process_frame(self, frame: FilterControlFrame):
if isinstance(frame, FilterEnableFrame):
self._filtering = frame.enable
async def filter(self, audio: bytes) -> bytes:
if not self._filtering:
return audio
data = np.frombuffer(audio, dtype=np.int16)
# Add a small epsilon to avoid division by zero.
epsilon = 1e-10
data = data.astype(np.float32) + epsilon
# Process the audio chunk to reduce noise
reduced_noise = self._krisp_processor.process(data)
# Clip and set processed audio back to frame
audio = np.clip(reduced_noise, -32768, 32767).astype(np.int16).tobytes()
return audio

View File

@@ -0,0 +1,54 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import numpy as np
from pipecat.audio.filters.base_audio_filter import BaseAudioFilter
from loguru import logger
from pipecat.frames.frames import FilterControlFrame, FilterEnableFrame
try:
import noisereduce as nr
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the noisereduce filter, you need to `pip install pipecat-ai[noisereduce]`."
)
raise Exception(f"Missing module: {e}")
class NoisereduceFilter(BaseAudioFilter):
def __init__(self) -> None:
self._filtering = True
self._sample_rate = 0
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
async def stop(self):
pass
async def process_frame(self, frame: FilterControlFrame):
if isinstance(frame, FilterEnableFrame):
self._filtering = frame.enable
async def filter(self, audio: bytes) -> bytes:
if not self._filtering:
return audio
data = np.frombuffer(audio, dtype=np.int16)
# Add a small epsilon to avoid division by zero.
epsilon = 1e-10
data = data.astype(np.float32) + epsilon
# Noise reduction
reduced_noise = nr.reduce_noise(y=data, sr=self._sample_rate)
audio = np.clip(reduced_noise, -32768, 32767).astype(np.int16).tobytes()
return audio

View File

View File

@@ -0,0 +1,53 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from abc import ABC, abstractmethod
from pipecat.frames.frames import MixerControlFrame
class BaseAudioMixer(ABC):
"""This is a base class for output transport audio mixers. If an audio mixer
is provided to the output transport it will be used to mix the audio frames
coming into to the transport with the audio generated from the mixer. There
are control frames to update mixer settings or to enable or disable the
mixer at runtime.
"""
@abstractmethod
async def start(self, sample_rate: int):
"""This will be called from the output transport when the transport is
started. It can be used to initialize the mixer. The output transport
sample rate is provided so the mixer can adjust to that sample rate.
"""
pass
@abstractmethod
async def stop(self):
"""This will be called from the output transport when the transport is
stopping.
"""
pass
@abstractmethod
async def process_frame(self, frame: MixerControlFrame):
"""This will be called when the output transport receives a
MixerControlFrame.
"""
pass
@abstractmethod
async def mix(self, audio: bytes) -> bytes:
"""This is called with the audio that is about to be sent from the
output transport and that should be mixed with the mixer audio if the
mixer is enabled.
"""
pass

View File

@@ -0,0 +1,147 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import Any, Dict, Mapping
import numpy as np
from loguru import logger
from pipecat.audio.mixers.base_audio_mixer import BaseAudioMixer
from pipecat.audio.utils import resample_audio
from pipecat.frames.frames import MixerControlFrame, MixerEnableFrame, MixerUpdateSettingsFrame
try:
import soundfile as sf
except ModuleNotFoundError as e:
logger.error(f"Exception: {e}")
logger.error(
"In order to use the soundfile mixer, you need to `pip install pipecat-ai[soundfile]`."
)
raise Exception(f"Missing module: {e}")
class SoundfileMixer(BaseAudioMixer):
"""This is an audio mixer that mixes incoming audio with audio from a
file. It uses the soundfile library to load files so it supports multiple
formats. The audio files need to only have one channel (mono) but they can
have any sample rate that will be resampled to the output transport sample
rate.
Multiple files can be loaded, each with a different name. The
`MixerUpdateSettingsFrame` has the following settings available: `sound`
(str) and `volume` (float) to be able to update to a different sound file or
to change the volume at runtime.
"""
def __init__(
self,
sound_files: Mapping[str, str],
default_sound: str,
volume: float = 0.4,
loop: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self._sound_files = sound_files
self._volume = volume
self._sample_rate = 0
self._sound_pos = 0
self._sounds: Dict[str, Any] = {}
self._current_sound = default_sound
self._mixing = True
self._loop = loop
async def start(self, sample_rate: int):
self._sample_rate = sample_rate
for sound_name, file_name in self._sound_files.items():
await asyncio.to_thread(self._load_sound_file, sound_name, file_name)
async def stop(self):
pass
async def process_frame(self, frame: MixerControlFrame):
if isinstance(frame, MixerUpdateSettingsFrame):
await self._update_settings(frame)
elif isinstance(frame, MixerEnableFrame):
await self._enable_mixing(frame.enable)
pass
async def mix(self, audio: bytes) -> bytes:
return self._mix_with_sound(audio)
async def _enable_mixing(self, enable: bool):
self._mixing = enable
async def _update_settings(self, frame: MixerUpdateSettingsFrame):
for setting, value in frame.settings.items():
match setting:
case "sound":
await self._change_sound(value)
case "volume":
await self._update_volume(value)
case "loop":
await self._update_loop(value)
async def _change_sound(self, sound: str):
if sound in self._sound_files:
self._current_sound = sound
self._sound_pos = 0
else:
logger.error(f"Sound {sound} is not available")
async def _update_volume(self, volume: float):
self._volume = volume
async def _update_loop(self, loop: bool):
self._loop = loop
def _load_sound_file(self, sound_name: str, file_name: str):
try:
logger.debug(f"Loading background sound from {file_name}")
sound, sample_rate = sf.read(file_name, dtype="int16")
audio = sound.tobytes()
if sample_rate != self._sample_rate:
logger.debug(f"Resampling background sound to {self._sample_rate}")
audio = resample_audio(audio, sample_rate, self._sample_rate)
# Convert from np to bytes again.
self._sounds[sound_name] = np.frombuffer(audio, dtype=np.int16)
except Exception as e:
logger.error(f"Unable to open file {file_name}: {e}")
def _mix_with_sound(self, audio: bytes):
"""Mixes raw audio frames with chunks of the same length from the sound
file.
"""
if not self._mixing:
return audio
audio_np = np.frombuffer(audio, dtype=np.int16)
chunk_size = len(audio_np)
# Sound currently playing.
sound = self._sounds[self._current_sound]
# Go back to the beginning if we don't have enough data.
if self._sound_pos + chunk_size > len(sound):
if not self._loop:
return audio
self._sound_pos = 0
start_pos = self._sound_pos
end_pos = self._sound_pos + chunk_size
self._sound_pos = end_pos
sound_np = sound[start_pos:end_pos]
mixed_audio = np.clip(audio_np + sound_np * self._volume, -32768, 32767).astype(np.int16)
return mixed_audio.astype(np.int16).tobytes()

View File

@@ -7,13 +7,14 @@
import audioop
import numpy as np
import pyloudnorm as pyln
from scipy import signal
import resampy
def resample_audio(audio: bytes, original_rate: int, target_rate: int) -> bytes:
if original_rate == target_rate:
return audio
audio_data = np.frombuffer(audio, dtype=np.int16)
num_samples = int(len(audio) * target_rate / original_rate)
resampled_audio = signal.resample(audio_data, num_samples)
resampled_audio = resampy.resample(audio_data, original_rate, target_rate)
return resampled_audio.astype(np.int16).tobytes()

View File

@@ -52,7 +52,7 @@ class SileroOnnxModel:
if sr not in self.sample_rates:
raise ValueError(
f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)"
f"Supported sampling rates: {self.sample_rates} (or multiple of 16000)"
)
if sr / np.shape(x)[1] > 31.25:
raise ValueError("Input audio chunk is too short")

View File

@@ -12,6 +12,11 @@ from pydantic.main import BaseModel
from pipecat.audio.utils import calculate_audio_volume, exp_smoothing
VAD_CONFIDENCE = 0.7
VAD_START_SECS = 0.2
VAD_STOP_SECS = 0.8
VAD_MIN_VOLUME = 0.6
class VADState(Enum):
QUIET = 1
@@ -21,10 +26,10 @@ class VADState(Enum):
class VADParams(BaseModel):
confidence: float = 0.7
start_secs: float = 0.2
stop_secs: float = 0.8
min_volume: float = 0.6
confidence: float = VAD_CONFIDENCE
start_secs: float = VAD_START_SECS
stop_secs: float = VAD_STOP_SECS
min_volume: float = VAD_MIN_VOLUME
class VADAnalyzer:
@@ -41,13 +46,17 @@ class VADAnalyzer:
self._prev_volume = 0
@property
def sample_rate(self):
def sample_rate(self) -> int:
return self._sample_rate
@property
def num_channels(self):
def num_channels(self) -> int:
return self._num_channels
@property
def params(self) -> VADParams:
return self._params
@abstractmethod
def num_frames_required(self) -> int:
pass

View File

@@ -5,7 +5,7 @@
#
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, List, Mapping, Optional, Tuple
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.clocks.base_clock import BaseClock
@@ -557,7 +557,7 @@ class TTSStoppedFrame(ControlFrame):
class ServiceUpdateSettingsFrame(ControlFrame):
"""A control frame containing a request to update service settings."""
settings: Dict[str, Any]
settings: Mapping[str, Any]
@dataclass
@@ -582,3 +582,45 @@ class VADParamsUpdateFrame(ControlFrame):
"""
params: VADParams
@dataclass
class FilterControlFrame(ControlFrame):
"""Base control frame for other audio filter frames."""
pass
@dataclass
class FilterUpdateSettingsFrame(FilterControlFrame):
"""Control frame to update filter settings."""
settings: Mapping[str, Any]
@dataclass
class FilterEnableFrame(FilterControlFrame):
"""Control frame to enable or disable the filter at runtime."""
enable: bool
@dataclass
class MixerControlFrame(ControlFrame):
"""Base control frame for other audio mixer frames."""
pass
@dataclass
class MixerUpdateSettingsFrame(MixerControlFrame):
"""Control frame to update mixer settings."""
settings: Mapping[str, Any]
@dataclass
class MixerEnableFrame(MixerControlFrame):
"""Control frame to enable or disable the mixer at runtime."""
enable: bool

View File

@@ -110,13 +110,13 @@ class ParallelPipeline(BasePipeline):
if direction == FrameDirection.UPSTREAM:
# If we get an upstream frame we process it in each sink.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sinks])
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sinks])
elif direction == FrameDirection.DOWNSTREAM:
# If we get a downstream frame we process it in each source.
# TODO(aleix): We are creating task for each frame. For real-time
# video/audio this might be too slow. We should use an already
# created task instead.
await asyncio.gather(*[s.process_frame(frame, direction) for s in self._sources])
await asyncio.gather(*[s.queue_frame(frame, direction) for s in self._sources])
# If we get an EndFrame we stop our queue processing tasks and wait on
# all the pipelines to finish.

View File

@@ -77,9 +77,9 @@ class Pipeline(BasePipeline):
await super().process_frame(frame, direction)
if direction == FrameDirection.DOWNSTREAM:
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
elif direction == FrameDirection.UPSTREAM:
await self._sink.process_frame(frame, FrameDirection.UPSTREAM)
await self._sink.queue_frame(frame, FrameDirection.UPSTREAM)
async def _cleanup_processors(self):
for p in self._processors:

View File

@@ -160,19 +160,17 @@ class PipelineTask:
report_only_initial_ttfb=self._params.report_only_initial_ttfb,
clock=self._clock,
)
await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM)
await self._source.queue_frame(start_frame, FrameDirection.DOWNSTREAM)
if self._params.enable_metrics and self._params.send_initial_empty_metrics:
await self._source.process_frame(
self._initial_metrics_frame(), FrameDirection.DOWNSTREAM
)
await self._source.queue_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM)
running = True
should_cleanup = True
while running:
try:
frame = await self._push_queue.get()
await self._source.process_frame(frame, FrameDirection.DOWNSTREAM)
await self._source.queue_frame(frame, FrameDirection.DOWNSTREAM)
if isinstance(frame, EndFrame):
await self._wait_for_endframe()
running = not isinstance(frame, (StopTaskFrame, EndFrame))

View File

@@ -0,0 +1,55 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from pipecat.frames.frames import CancelFrame, EndFrame, Frame, StartFrame
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.sync.base_notifier import BaseNotifier
class GatedOpenAILLMContextAggregator(FrameProcessor):
"""This aggregator keeps the last received OpenAI LLM context frame and it
doesn't let it through until the notifier is notified.
"""
def __init__(self, notifier: BaseNotifier, **kwargs):
super().__init__(**kwargs)
self._notifier = notifier
self._last_context_frame = None
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, StartFrame):
await self.push_frame(frame)
await self._start()
if isinstance(frame, (EndFrame, CancelFrame)):
await self._stop()
await self.push_frame(frame)
elif isinstance(frame, OpenAILLMContextFrame):
self._last_context_frame = frame
else:
await self.push_frame(frame, direction)
async def _start(self):
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())
async def _stop(self):
self._gate_task.cancel()
await self._gate_task
async def _gate_task_handler(self):
while True:
try:
await self._notifier.wait()
if self._last_context_frame:
await self.push_frame(self._last_context_frame)
self._last_context_frame = None
except asyncio.CancelledError:
break

View File

@@ -70,6 +70,8 @@ class OpenAILLMContext:
context.add_message(message)
return context
# todo: deprecate from_image_frame. It's only used to create a single-use
# context, which isn't useful for most real-world applications.
@staticmethod
def from_image_frame(frame: VisionImageRawFrame) -> "OpenAILLMContext":
"""
@@ -77,6 +79,10 @@ class OpenAILLMContext:
expects images to be base64 encoded, but other vision models may not.
So we'll store the image as bytes and do the base64 encoding as needed
in the LLM service.
NOTE: the above only applies to the deprecated use of this method. The
add_image_frame_message() below does the base64 encoding as expected
in the OpenAI format.
"""
context = OpenAILLMContext()
buffer = io.BytesIO()

View File

@@ -4,14 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import List
from typing import Tuple, Type
from pipecat.frames.frames import AppFrame, ControlFrame, Frame, SystemFrame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
class FrameFilter(FrameProcessor):
def __init__(self, types: List[type]):
def __init__(self, types: Tuple[Type[Frame]]):
super().__init__()
self._types = types
@@ -20,9 +20,8 @@ class FrameFilter(FrameProcessor):
#
def _should_passthrough_frame(self, frame):
for t in self._types:
if isinstance(frame, t):
return True
if isinstance(frame, self._types):
return True
return (
isinstance(frame, AppFrame)

View File

@@ -0,0 +1,14 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from pipecat.processors.frame_processor import FrameProcessor
class NullFilter(FrameProcessor):
"""This filter doesn't allow passing any frames up or downstream."""
def __init__(self, **kwargs):
super().__init__(**kwargs)

View File

@@ -0,0 +1,40 @@
#
# Copyright (c) 2024, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
from typing import Awaitable, Callable, Tuple, Type
from pipecat.frames.frames import Frame
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.sync.base_notifier import BaseNotifier
class WakeNotifierFilter(FrameProcessor):
"""This processor expects a list of frame types and will execute a given
callback predicate when a frame of any of those type is being processed. If
the callback returns true the notifier will be notified.
"""
def __init__(
self,
notifier: BaseNotifier,
*,
types: Tuple[Type[Frame]],
filter: Callable[[Frame], Awaitable[bool]],
**kwargs,
):
super().__init__(**kwargs)
self._notifier = notifier
self._types = types
self._filter = filter
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, self._types) and await self._filter(frame):
await self._notifier.notify()
await self.push_frame(frame, direction)

View File

@@ -8,6 +8,7 @@ import asyncio
import inspect
from enum import Enum
from typing import Awaitable, Callable, Optional
from pipecat.clocks.base_clock import BaseClock
from pipecat.frames.frames import (
@@ -62,6 +63,13 @@ class FrameProcessor:
self._metrics = metrics or FrameProcessorMetrics()
self._metrics.set_processor_name(self.name)
# Processors have an input queue. The input queue will be processed
# immediately (default) or it will block if `pause_processing_frames()`
# is called. To resume processing frames we need to call
# `resume_processing_frames()`.
self.__should_block_frames = False
self.__create_input_task()
# Every processor in Pipecat should only output frames from a single
# task. This avoid problems like audio overlapping. System frames are
# the exception to this rule. This create this task.
@@ -126,7 +134,8 @@ class FrameProcessor:
await self.stop_processing_metrics()
async def cleanup(self):
pass
await self.__cancel_input_task()
await self.__cancel_push_task()
def link(self, processor: "FrameProcessor"):
self._next = processor
@@ -145,6 +154,28 @@ class FrameProcessor:
def get_clock(self) -> BaseClock:
return self._clock
async def queue_frame(
self,
frame: Frame,
direction: FrameDirection = FrameDirection.DOWNSTREAM,
callback: Optional[
Callable[["FrameProcessor", Frame, FrameDirection], Awaitable[None]]
] = None,
):
if isinstance(frame, SystemFrame):
# We don't want to queue system frames.
await self.process_frame(frame, direction)
else:
# We queue everything else.
await self.__input_queue.put((frame, direction, callback))
async def pause_processing_frames(self):
self.__should_block_frames = True
async def resume_processing_frames(self):
self.__input_event.set()
self.__should_block_frames = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
if isinstance(frame, StartFrame):
self._clock = frame.clock
@@ -189,11 +220,16 @@ class FrameProcessor:
#
async def _start_interruption(self):
# Cancel the task. This will stop pushing frames downstream.
self.__push_frame_task.cancel()
await self.__push_frame_task
# Cancel the push frame task. This will stop pushing frames downstream.
await self.__cancel_push_task()
# Create a new queue and task.
# Cancel the input task. This will stop processing queued frames.
await self.__cancel_input_task()
# Create a new input queue and task.
self.__create_input_task()
# Create a new output queue and task.
self.__create_push_task()
async def _stop_interruption(self):
@@ -204,17 +240,55 @@ class FrameProcessor:
try:
if direction == FrameDirection.DOWNSTREAM and self._next:
logger.trace(f"Pushing {frame} from {self} to {self._next}")
await self._next.process_frame(frame, direction)
await self._next.queue_frame(frame, direction)
elif direction == FrameDirection.UPSTREAM and self._prev:
logger.trace(f"Pushing {frame} upstream from {self} to {self._prev}")
await self._prev.process_frame(frame, direction)
await self._prev.queue_frame(frame, direction)
except Exception as e:
logger.exception(f"Uncaught exception in {self}: {e}")
def __create_input_task(self):
self.__input_queue = asyncio.Queue()
self.__input_frame_task = self.get_event_loop().create_task(
self.__input_frame_task_handler()
)
self.__input_event = asyncio.Event()
async def __cancel_input_task(self):
self.__input_frame_task.cancel()
await self.__input_frame_task
async def __input_frame_task_handler(self):
running = True
while running:
try:
if self.__should_block_frames:
await self.__input_event.wait()
self.__input_event.clear()
(frame, direction, callback) = await self.__input_queue.get()
# Process the frame.
await self.process_frame(frame, direction)
# If this frame has an associated callback, call it now.
if callback:
await callback(self, frame, direction)
running = not isinstance(frame, EndFrame)
self.__input_queue.task_done()
except asyncio.CancelledError:
break
def __create_push_task(self):
self.__push_queue = asyncio.Queue()
self.__push_frame_task = self.get_event_loop().create_task(self.__push_frame_task_handler())
async def __cancel_push_task(self):
self.__push_frame_task.cancel()
await self.__push_frame_task
async def __push_frame_task_handler(self):
running = True
while running:

View File

@@ -366,10 +366,6 @@ class RTVIMetricsMessage(BaseModel):
data: Mapping[str, Any]
class RTVIProcessorParams(BaseModel):
send_bot_ready: bool = True
class RTVIFrameProcessor(FrameProcessor):
def __init__(self, direction: FrameDirection = FrameDirection.DOWNSTREAM, **kwargs):
super().__init__(**kwargs)
@@ -573,16 +569,14 @@ class RTVIProcessor(FrameProcessor):
self,
*,
config: RTVIConfig = RTVIConfig(config=[]),
params: RTVIProcessorParams = RTVIProcessorParams(),
**kwargs,
):
super().__init__(**kwargs)
self._config = config
self._params = params
self._pipeline: FrameProcessor | None = None
self._pipeline_started = False
self._bot_ready = False
self._client_ready = False
self._client_ready_id = ""
@@ -590,14 +584,14 @@ class RTVIProcessor(FrameProcessor):
self._registered_services: Dict[str, RTVIService] = {}
# A task to process incoming action frames.
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
self._action_queue = asyncio.Queue()
self._action_task = self.get_event_loop().create_task(self._action_task_handler())
# A task to process incoming transport messages.
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._message_queue = asyncio.Queue()
self._message_task = self.get_event_loop().create_task(self._message_task_handler())
self._register_event_handler("on_bot_ready")
self._register_event_handler("on_client_ready")
def register_action(self, action: RTVIAction):
id = self._action_id(action.service, action.action)
@@ -606,6 +600,15 @@ class RTVIProcessor(FrameProcessor):
def register_service(self, service: RTVIService):
self._registered_services[service.name] = service
async def set_client_ready(self):
self._client_ready = True
await self._call_event_handler("on_client_ready")
async def set_bot_ready(self):
self._bot_ready = True
await self._update_config(self._config, False)
await self._send_bot_ready()
async def interrupt_bot(self):
await self.push_frame(BotInterruptionFrame(), FrameDirection.UPSTREAM)
@@ -613,11 +616,6 @@ class RTVIProcessor(FrameProcessor):
message = RTVIError(data=RTVIErrorData(error=error, fatal=False))
await self._push_transport_message(message)
async def set_client_ready(self):
if not self._client_ready:
self._client_ready = True
await self._maybe_send_bot_ready()
async def handle_message(self, message: RTVIMessage):
await self._message_queue.put(message)
@@ -681,21 +679,15 @@ class RTVIProcessor(FrameProcessor):
await self._pipeline.cleanup()
async def _start(self, frame: StartFrame):
self._pipeline_started = True
await self._maybe_send_bot_ready()
pass
async def _stop(self, frame: EndFrame):
if self._action_task:
self._action_task.cancel()
await self._action_task
self._action_task = None
if self._message_task:
self._message_task.cancel()
await self._message_task
self._message_task = None
await self._cancel_tasks()
async def _cancel(self, frame: CancelFrame):
await self._cancel_tasks()
async def _cancel_tasks(self):
if self._action_task:
self._action_task.cancel()
await self._action_task
@@ -769,9 +761,8 @@ class RTVIProcessor(FrameProcessor):
logger.warning(f"Exception processing message: {e}")
async def _handle_client_ready(self, request_id: str):
self._client_ready = True
self._client_ready_id = request_id
await self._maybe_send_bot_ready()
await self.set_client_ready()
async def _handle_describe_config(self, request_id: str):
services = list(self._registered_services.values())
@@ -841,16 +832,7 @@ class RTVIProcessor(FrameProcessor):
message = RTVIActionResponse(id=request_id, data=RTVIActionResponseData(result=result))
await self._push_transport_message(message)
async def _maybe_send_bot_ready(self):
if self._pipeline_started and self._client_ready:
await self._update_config(self._config, False)
await self._send_bot_ready()
await self._call_event_handler("on_bot_ready")
async def _send_bot_ready(self):
if not self._params.send_bot_ready:
return
message = RTVIBotReady(
id=self._client_ready_id,
data=RTVIBotReadyData(version=RTVI_PROTOCOL_VERSION, config=self._config.config),

View File

@@ -205,7 +205,7 @@ class TTSService(AIService):
# if push_stop_frames is True, wait for this idle period before pushing TTSStoppedFrame
stop_frame_timeout_s: float = 1.0,
# TTS output sample rate
sample_rate: int = 16000,
sample_rate: int = 24000,
text_filter: Optional[BaseTextFilter] = None,
**kwargs,
):
@@ -284,11 +284,7 @@ class TTSService(AIService):
logger.warning(f"Unknown setting for TTS service: {key}")
async def say(self, text: str):
aggregate_sentences = self._aggregate_sentences
self._aggregate_sentences = False
await self.process_frame(TextFrame(text=text), FrameDirection.DOWNSTREAM)
self._aggregate_sentences = aggregate_sentences
await self.flush_audio()
await self.queue_frame(TTSSpeakFrame(text))
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
@@ -395,7 +391,6 @@ class WordTTSService(TTSService):
def reset_word_timestamps(self):
self._initial_word_timestamp = -1
self._word_timestamps = []
async def add_word_timestamps(self, word_times: List[Tuple[str, float]]):
for word, timestamp in word_times:
@@ -430,7 +425,10 @@ class WordTTSService(TTSService):
while True:
try:
(word, timestamp) = await self._words_queue.get()
if word == "LLMFullResponseEndFrame" and timestamp == 0:
if word == "Reset" and timestamp == 0:
self.reset_word_timestamps()
frame = None
elif word == "LLMFullResponseEndFrame" and timestamp == 0:
frame = LLMFullResponseEndFrame()
frame.pts = last_pts
elif word == "TTSStoppedFrame" and timestamp == 0:
@@ -439,8 +437,9 @@ class WordTTSService(TTSService):
else:
frame = TextFrame(word)
frame.pts = self._initial_word_timestamp + timestamp
last_pts = frame.pts
await self.push_frame(frame)
if frame:
last_pts = frame.pts
await self.push_frame(frame)
self._words_queue.task_done()
except asyncio.CancelledError:
break
@@ -514,7 +513,7 @@ class SegmentedSTTService(STTService):
min_volume: float = 0.6,
max_silence_secs: float = 0.3,
max_buffer_secs: float = 1.5,
sample_rate: int = 16000,
sample_rate: int = 24000,
num_channels: int = 1,
**kwargs,
):

View File

@@ -671,6 +671,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
):
self._function_call_in_progress = None
self._function_call_result = frame
await self._push_aggregation()
else:
logger.warning(
"FunctionCallResultFrame tool_call_id != InProgressFrame tool_call_id"
@@ -679,9 +680,12 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
self._function_call_result = None
elif isinstance(frame, AnthropicImageMessageFrame):
self._pending_image_frame_message = frame
await self._push_aggregation()
async def _push_aggregation(self):
if not self._aggregation:
if not (
self._aggregation or self._function_call_result or self._pending_image_frame_message
):
return
run_llm = False
@@ -694,20 +698,18 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
frame = self._function_call_result
self._function_call_result = None
if frame.result:
self._context.add_message(
assistant_message = {"role": "assistant", "content": []}
if aggregation:
assistant_message["content"].append({"type": "text", "text": aggregation})
assistant_message["content"].append(
{
"role": "assistant",
"content": [
{"type": "text", "text": aggregation},
{
"type": "tool_use",
"id": frame.tool_call_id,
"name": frame.function_name,
"input": frame.arguments,
},
],
"type": "tool_use",
"id": frame.tool_call_id,
"name": frame.function_name,
"input": frame.arguments,
}
)
self._context.add_message(assistant_message)
self._context.add_message(
{
"role": "user",
@@ -721,7 +723,7 @@ class AnthropicAssistantContextAggregator(LLMAssistantContextAggregator):
}
)
run_llm = True
else:
elif aggregation:
self._context.add_message({"role": "assistant", "content": aggregation})
if self._pending_image_frame_message:

View File

@@ -53,8 +53,6 @@ class AssemblyAISTTService(STTService):
async def set_language(self, language: Language):
logger.info(f"Switching STT language to: [{language}]")
self._settings["language"] = language
await self._disconnect()
await self._connect()
async def start(self, frame: StartFrame):
await super().start(frame)

View File

@@ -4,11 +4,14 @@
# SPDX-License-Identifier: BSD 2-Clause License
#
import asyncio
from typing import AsyncGenerator, Optional
from loguru import logger
from pydantic import BaseModel
from pipecat.audio.utils import resample_audio
from pipecat.frames.frames import (
ErrorFrame,
Frame,
@@ -45,7 +48,7 @@ class AWSTTSService(TTSService):
aws_access_key_id: str,
region: str,
voice_id: str = "Joanna",
sample_rate: int = 16000,
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
):
@@ -164,6 +167,14 @@ class AWSTTSService(TTSService):
return ssml
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
def read_audio_data(**args):
response = self._polly_client.synthesize_speech(**args)
if "AudioStream" in response:
audio_data = response["AudioStream"].read()
resampled = resample_audio(audio_data, 16000, self._settings["sample_rate"])
return resampled
return None
logger.debug(f"Generating TTS: [{text}]")
try:
@@ -178,28 +189,31 @@ class AWSTTSService(TTSService):
"OutputFormat": "pcm",
"VoiceId": self._voice_id,
"Engine": self._settings["engine"],
"SampleRate": str(self._settings["sample_rate"]),
# AWS only supports 8000 and 16000 for PCM. We select 16000.
"SampleRate": "16000",
}
# Filter out None values
filtered_params = {k: v for k, v in params.items() if v is not None}
response = self._polly_client.synthesize_speech(**filtered_params)
audio_data = await asyncio.to_thread(read_audio_data, **filtered_params)
if not audio_data:
logger.error(f"{self} No audio data returned")
yield None
return
await self.start_tts_usage_metrics(text)
yield TTSStartedFrame()
if "AudioStream" in response:
with response["AudioStream"] as stream:
audio_data = stream.read()
chunk_size = 8192
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield frame
chunk_size = 8192
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i : i + chunk_size]
if len(chunk) > 0:
await self.stop_ttfb_metrics()
frame = TTSAudioRawFrame(chunk, self._settings["sample_rate"], 1)
yield frame
yield TTSStoppedFrame()

View File

@@ -25,8 +25,14 @@ from pipecat.frames.frames import (
TTSStoppedFrame,
URLImageRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.ai_services import ImageGenService, STTService, TTSService
from pipecat.services.openai import BaseOpenAILLMService
from pipecat.services.openai import (
BaseOpenAILLMService,
OpenAIAssistantContextAggregator,
OpenAIContextAggregatorPair,
OpenAIUserContextAggregator,
)
from pipecat.transcriptions.language import Language
from pipecat.utils.time import time_now_iso8601
@@ -35,8 +41,10 @@ try:
from azure.cognitiveservices.speech import (
CancellationReason,
ResultReason,
ServicePropertyChannel,
SpeechConfig,
SpeechRecognizer,
SpeechSynthesisOutputFormat,
SpeechSynthesizer,
)
from azure.cognitiveservices.speech.audio import (
@@ -70,8 +78,35 @@ class AzureLLMService(BaseOpenAILLMService):
api_version=self._api_version,
)
@staticmethod
def create_context_aggregator(
context: OpenAILLMContext, *, assistant_expect_stripped_words: bool = True
) -> OpenAIContextAggregatorPair:
user = OpenAIUserContextAggregator(context)
assistant = OpenAIAssistantContextAggregator(
user, expect_stripped_words=assistant_expect_stripped_words
)
return OpenAIContextAggregatorPair(_user=user, _assistant=assistant)
class AzureTTSService(TTSService):
def sample_rate_to_output_format(sample_rate: int) -> SpeechSynthesisOutputFormat:
match sample_rate:
case 8000:
return SpeechSynthesisOutputFormat.Raw8Khz16BitMonoPcm
case 16000:
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
case 22050:
return SpeechSynthesisOutputFormat.Raw22050Hz16BitMonoPcm
case 24000:
return SpeechSynthesisOutputFormat.Raw24Khz16BitMonoPcm
case 44100:
return SpeechSynthesisOutputFormat.Raw44100Hz16BitMonoPcm
case 48000:
return SpeechSynthesisOutputFormat.Raw48Khz16BitMonoPcm
return SpeechSynthesisOutputFormat.Raw16Khz16BitMonoPcm
class AzureBaseTTSService(TTSService):
class InputParams(BaseModel):
emphasis: Optional[str] = None
language: Optional[Language] = Language.EN_US
@@ -88,15 +123,12 @@ class AzureTTSService(TTSService):
api_key: str,
region: str,
voice="en-US-SaraNeural",
sample_rate: int = 16000,
sample_rate: int = 24000,
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(sample_rate=sample_rate, **kwargs)
speech_config = SpeechConfig(subscription=api_key, region=region)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
self._settings = {
"sample_rate": sample_rate,
"emphasis": params.emphasis,
@@ -111,7 +143,10 @@ class AzureTTSService(TTSService):
"volume": params.volume,
}
self.set_voice(voice)
self._api_key = api_key
self._region = region
self._voice_id = voice
self._speech_synthesizer = None
def can_generate_metrics(self) -> bool:
return True
@@ -249,6 +284,97 @@ class AzureTTSService(TTSService):
return ssml
class AzureTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
)
speech_config.set_service_property(
"synthesizer.synthesis.connection.synthesisConnectionImpl",
"websocket",
ServicePropertyChannel.UriQueryParameter,
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
# Set up event handlers
self._audio_queue = asyncio.Queue()
self._speech_synthesizer.synthesizing.connect(self._handle_synthesizing)
self._speech_synthesizer.synthesis_completed.connect(self._handle_completed)
self._speech_synthesizer.synthesis_canceled.connect(self._handle_canceled)
def _handle_synthesizing(self, evt):
"""Handle audio chunks as they arrive"""
if evt.result and evt.result.audio_data:
self._audio_queue.put_nowait(evt.result.audio_data)
def _handle_completed(self, evt):
"""Handle synthesis completion"""
self._audio_queue.put_nowait(None) # Signal completion
def _handle_canceled(self, evt):
"""Handle synthesis cancellation"""
logger.error(f"Speech synthesis canceled: {evt.result.cancellation_details.reason}")
self._audio_queue.put_nowait(None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
try:
await self.start_ttfb_metrics()
yield TTSStartedFrame()
ssml = self._construct_ssml(text)
# Start synthesis
self._speech_synthesizer.speak_ssml_async(ssml)
await self.start_tts_usage_metrics(text)
# Stream audio chunks as they arrive
while True:
chunk = await self._audio_queue.get()
if chunk is None: # End of stream
break
await self.stop_ttfb_metrics()
yield TTSAudioRawFrame(
audio=chunk,
sample_rate=self._settings["sample_rate"],
num_channels=1,
)
yield TTSStoppedFrame()
except Exception as e:
logger.error(f"{self} error generating TTS: {e}")
yield ErrorFrame(f"{self} error: {str(e)}")
class AzureHttpTTSService(AzureBaseTTSService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
speech_config = SpeechConfig(
subscription=self._api_key,
region=self._region,
speech_recognition_language=self._settings["language"],
)
speech_config.set_speech_synthesis_output_format(
sample_rate_to_output_format(self._settings["sample_rate"])
)
self._speech_synthesizer = SpeechSynthesizer(speech_config=speech_config, audio_config=None)
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -256,7 +382,7 @@ class AzureTTSService(TTSService):
ssml = self._construct_ssml(text)
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, (ssml))
result = await asyncio.to_thread(self._speech_synthesizer.speak_ssml, ssml)
if result.reason == ResultReason.SynthesizingAudioCompleted:
await self.start_tts_usage_metrics(text)
@@ -283,7 +409,7 @@ class AzureSTTService(STTService):
api_key: str,
region: str,
language=Language.EN_US,
sample_rate=16000,
sample_rate=24000,
channels=1,
**kwargs,
):

View File

@@ -14,13 +14,16 @@ from loguru import logger
from pydantic.main import BaseModel
from pipecat.frames.frames import (
BotStoppedSpeakingFrame,
CancelFrame,
EndFrame,
ErrorFrame,
Frame,
LLMFullResponseEndFrame,
StartFrame,
StartInterruptionFrame,
TTSAudioRawFrame,
TTSSpeakFrame,
TTSStartedFrame,
TTSStoppedFrame,
)
@@ -68,9 +71,6 @@ def language_to_cartesia_language(language: Language) -> str | None:
class CartesiaTTSService(WordTTSService):
class InputParams(BaseModel):
encoding: Optional[str] = "pcm_s16le"
sample_rate: Optional[int] = 16000
container: Optional[str] = "raw"
language: Optional[Language] = Language.EN
speed: Optional[Union[str, float]] = ""
emotion: Optional[List[str]] = []
@@ -83,6 +83,9 @@ class CartesiaTTSService(WordTTSService):
cartesia_version: str = "2024-06-10",
url: str = "wss://api.cartesia.ai/tts/websocket",
model: str = "sonic-english",
sample_rate: int = 24000,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
**kwargs,
):
@@ -99,7 +102,7 @@ class CartesiaTTSService(WordTTSService):
super().__init__(
aggregate_sentences=True,
push_text_frames=False,
sample_rate=params.sample_rate,
sample_rate=sample_rate,
**kwargs,
)
@@ -108,9 +111,9 @@ class CartesiaTTSService(WordTTSService):
self._url = url
self._settings = {
"output_format": {
"container": params.container,
"encoding": params.encoding,
"sample_rate": params.sample_rate,
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
},
"language": self.language_to_service_language(params.language)
if params.language
@@ -225,14 +228,13 @@ class CartesiaTTSService(WordTTSService):
if not msg or msg["context_id"] != self._context_id:
continue
if msg["type"] == "done":
await self.push_frame(TTSStoppedFrame())
await self.stop_ttfb_metrics()
# Unset _context_id but not the _context_id_start_timestamp
# because we are likely still playing out audio and need the
# timestamp to set send context frames.
self._context_id = None
await self.add_word_timestamps(
[("TTSStoppedFrame", 0), ("LLMFullResponseEndFrame", 0)]
)
await self.add_word_timestamps([("LLMFullResponseEndFrame", 0), ("Reset", 0)])
elif msg["type"] == "timestamps":
await self.add_word_timestamps(
list(zip(msg["word_timestamps"]["words"], msg["word_timestamps"]["start"]))
@@ -258,6 +260,19 @@ class CartesiaTTSService(WordTTSService):
except Exception as e:
logger.error(f"{self} exception: {e}")
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
# If we received a TTSSpeakFrame and the LLM response included text (it
# might be that it's only a function calling response) we pause
# processing more frames until we receive a BotStoppedSpeakingFrame.
if isinstance(frame, TTSSpeakFrame):
await self.pause_processing_frames()
elif isinstance(frame, LLMFullResponseEndFrame) and self._context_id:
await self.pause_processing_frames()
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.resume_processing_frames()
async def run_tts(self, text: str) -> AsyncGenerator[Frame, None]:
logger.debug(f"Generating TTS: [{text}]")
@@ -288,9 +303,6 @@ class CartesiaTTSService(WordTTSService):
class CartesiaHttpTTSService(TTSService):
class InputParams(BaseModel):
encoding: Optional[str] = "pcm_s16le"
sample_rate: Optional[int] = 16000
container: Optional[str] = "raw"
language: Optional[Language] = Language.EN
speed: Optional[Union[str, float]] = ""
emotion: Optional[List[str]] = []
@@ -302,17 +314,20 @@ class CartesiaHttpTTSService(TTSService):
voice_id: str,
model: str = "sonic-english",
base_url: str = "https://api.cartesia.ai",
sample_rate: int = 24000,
encoding: str = "pcm_s16le",
container: str = "raw",
params: InputParams = InputParams(),
**kwargs,
):
super().__init__(**kwargs)
super().__init__(sample_rate=sample_rate, **kwargs)
self._api_key = api_key
self._settings = {
"output_format": {
"container": params.container,
"encoding": params.encoding,
"sample_rate": params.sample_rate,
"container": container,
"encoding": encoding,
"sample_rate": sample_rate,
},
"language": self.language_to_service_language(params.language)
if params.language

Some files were not shown because too many files have changed in this diff Show More